You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Bilal Al Fartakh <al...@gmail.com> on 2014/05/06 12:38:37 UTC
duplicated result
I'm using a bolt that receives tuples from another bolt (exclamation bolt )
and writes it on a file , the problem I got is that I have duplicated
results four times , like when I emit a word , I found the word Written four
times . where's the problem possibly could be ?
public class PrinterBolty extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
try {
BufferedWriter output;
output = new BufferedWriter(new
FileWriter("/root/src/storm-starter/hh.txt", true));
output.newLine();
output.append(tuple.getString(0));
output.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer ofd) {
}
}
--
*Al Fartakh Bilal*
Re: duplicated result
Posted by Bilal Al Fartakh <al...@gmail.com>.
Thank you :D
specifying 1 spout was the solution .
thank you Padma and Nathan again for your help !
2014-05-06 14:40 GMT+01:00 Nathan Leung <nc...@gmail.com>:
> Good point, but it shouldn't matter how many exclamation bolts there are.
> The number of spouts does because they are all reading the same file.
> On May 6, 2014 9:37 AM, "padma priya chitturi" <pa...@gmail.com>
> wrote:
>
>> Hi,
>>
>> The issue lies with the number of tasks/executors specified for spout.
>> Try specifying 1 spout and see if you could see duplicates. I suppose there
>> would be no duplicates in specifying 1 spout and 1 exclamtion bolt.
>>
>>
>> On Tue, May 6, 2014 at 5:23 PM, Bilal Al Fartakh <
>> alfartaj.bilal@gmail.com> wrote:
>>
>>> HI ,Nathan and thank you for responding ,I appreciate it !
>>> no I'm not , I just run this topology for the first time
>>>
>>>
>>>
>>>
>>>
>>>
>>> public class Ex {
>>>
>>> public static class ExclamationBolt extends BaseRichBolt {
>>> OutputCollector _collector;
>>>
>>> @Override
>>> public void prepare(Map conf, TopologyContext context,
>>> OutputCollector collector) {
>>> _collector = collector;
>>> }
>>>
>>> @Override
>>> public void execute(Tuple tuple) {
>>> _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
>>> _collector.ack(tuple);
>>> }
>>>
>>> @Override
>>> public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>> declarer.declare(new Fields("word"));
>>> }
>>>
>>>
>>> }
>>>
>>> public static void main(String[] args) throws Exception {
>>> TopologyBuilder builder = new TopologyBuilder();
>>> TryRead T = new TryRead();
>>> PrinterBolty P = new PrinterBolty();
>>> builder.setSpout("word", T, 10);
>>> builder.setBolt("exclaim1", new ExclamationBolt(),
>>> 3).shuffleGrouping("word");
>>> builder.setBolt("exclaim2", P , 2).shuffleGrouping("exclaim1");
>>>
>>>
>>>
>>> -------------------------------------------------------------------------------------
>>> my spout Tryread
>>>
>>>
>>> public class TryRead extends BaseRichSpout {
>>> SpoutOutputCollector _collector;
>>> Random _rand;
>>> BufferedReader fileReader;
>>> FileSystem f;
>>> WatchService watcher ;
>>> String S;
>>> Path dir;
>>>
>>> @Override
>>> public void open(Map conf, TopologyContext context,
>>> SpoutOutputCollector collector) {
>>> _collector = collector;
>>>
>>>
>>> try{
>>> f = FileSystems.getDefault();
>>> watcher = f.newWatchService();
>>>
>>> S="/root/src/storm-starter/src/jvm/storm/starter";
>>>
>>> dir = f.getPath(S);
>>>
>>> dir.register(watcher, ENTRY_CREATE);
>>>
>>> } catch (IOException e) {
>>>
>>> e.printStackTrace();
>>> }
>>>
>>> }
>>> }
>>>
>>> @Override
>>> public void nextTuple() {
>>>
>>>
>>> Utils.sleep(2000);
>>> for (;;) {
>>>
>>> // wait for key to be signaled
>>> WatchKey key;
>>> try {
>>> key = watcher.take();
>>> } catch (InterruptedException x) {
>>> return;
>>> }
>>>
>>> for (WatchEvent<?> event: key.pollEvents()) {
>>> WatchEvent.Kind kind = event.kind();
>>> if (kind == OVERFLOW) {
>>> continue;
>>> }
>>>
>>>
>>> WatchEvent<Path> ev = (WatchEvent<Path>)event;
>>> Path filename = ev.context();
>>>
>>> System.out.format("Emailing file %s%n",
>>> filename);
>>> try {
>>>
>>>
>>> fileReader = new
>>> BufferedReader(new FileReader(new File(S+"/"+filename)));
>>>
>>> RandomAccessFile access
>>> = null;
>>> String line = null;
>>> try
>>> {
>>> while ((line = fileReader.readLine())
>>> != null)
>>> {
>>> if (line !=null)
>>> {
>>>
>>> _collector.emit(new
>>> Values(line));
>>> }
>>> }
>>> } catch (IOException e) {
>>> // TODO Auto-generated
>>> catch block
>>> e.printStackTrace();
>>> }
>>> } catch (FileNotFoundException
>>> e) {
>>> // TODO Auto-generated
>>> catch block
>>> e.printStackTrace();
>>> }
>>> }
>>> }
>>>
>>> boolean valid = key.reset();
>>> if (!valid) {
>>> break;
>>> }
>>> }
>>>
>>>
>>> }
>>>
>>> @Override
>>> public void ack(Object id) {
>>> }
>>>
>>> @Override
>>> public void fail(Object id) {
>>> }
>>>
>>> @Override
>>> public void declareOutputFields(OutputFieldsDeclarer declarer)
>>> {
>>> declarer.declare(new Fields("word"));
>>> }
>>>
>>>
>>>
>>> 2014-05-06 12:35 GMT+01:00 Nathan Leung <nc...@gmail.com>:
>>>
>>> You are creating your file writer with append set to true. It's it
>>>> possible your topology was run more than once?
>>>> On May 6, 2014 6:39 AM, "Bilal Al Fartakh" <al...@gmail.com>
>>>> wrote:
>>>>
>>>>> I'm using a bolt that receives tuples from another bolt (exclamation
>>>>> bolt ) and writes it on a file , the problem I got is that I have
>>>>> duplicated results four times , like when I emit a word , I found the word
>>>>> Written four times . where's the problem possibly could be ?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> public class PrinterBolty extends BaseBasicBolt {
>>>>>
>>>>> @Override
>>>>> public void execute(Tuple tuple, BasicOutputCollector collector) {
>>>>>
>>>>> try {
>>>>>
>>>>> BufferedWriter output;
>>>>> output = new BufferedWriter(new
>>>>> FileWriter("/root/src/storm-starter/hh.txt", true));
>>>>> output.newLine();
>>>>> output.append(tuple.getString(0));
>>>>> output.close();
>>>>>
>>>>> } catch (IOException e) {
>>>>> // TODO Auto-generated catch block
>>>>> e.printStackTrace();
>>>>> }
>>>>> }
>>>>>
>>>>> @Override
>>>>> public void declareOutputFields(OutputFieldsDeclarer ofd) {
>>>>> }
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>> --
>>>>> *Al Fartakh Bilal*
>>>>>
>>>>
>>>
>>>
>>> --
>>> *Al Fartakh Bilal*
>>>
>>
>>
--
*Al Fartakh Bilal*
Re: duplicated result
Posted by Nathan Leung <nc...@gmail.com>.
Good point, but it shouldn't matter how many exclamation bolts there are.
The number of spouts does because they are all reading the same file.
On May 6, 2014 9:37 AM, "padma priya chitturi" <pa...@gmail.com>
wrote:
> Hi,
>
> The issue lies with the number of tasks/executors specified for spout. Try
> specifying 1 spout and see if you could see duplicates. I suppose there
> would be no duplicates in specifying 1 spout and 1 exclamtion bolt.
>
>
> On Tue, May 6, 2014 at 5:23 PM, Bilal Al Fartakh <alfartaj.bilal@gmail.com
> > wrote:
>
>> HI ,Nathan and thank you for responding ,I appreciate it !
>> no I'm not , I just run this topology for the first time
>>
>>
>>
>>
>>
>>
>> public class Ex {
>>
>> public static class ExclamationBolt extends BaseRichBolt {
>> OutputCollector _collector;
>>
>> @Override
>> public void prepare(Map conf, TopologyContext context,
>> OutputCollector collector) {
>> _collector = collector;
>> }
>>
>> @Override
>> public void execute(Tuple tuple) {
>> _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
>> _collector.ack(tuple);
>> }
>>
>> @Override
>> public void declareOutputFields(OutputFieldsDeclarer declarer) {
>> declarer.declare(new Fields("word"));
>> }
>>
>>
>> }
>>
>> public static void main(String[] args) throws Exception {
>> TopologyBuilder builder = new TopologyBuilder();
>> TryRead T = new TryRead();
>> PrinterBolty P = new PrinterBolty();
>> builder.setSpout("word", T, 10);
>> builder.setBolt("exclaim1", new ExclamationBolt(),
>> 3).shuffleGrouping("word");
>> builder.setBolt("exclaim2", P , 2).shuffleGrouping("exclaim1");
>>
>>
>>
>> -------------------------------------------------------------------------------------
>> my spout Tryread
>>
>>
>> public class TryRead extends BaseRichSpout {
>> SpoutOutputCollector _collector;
>> Random _rand;
>> BufferedReader fileReader;
>> FileSystem f;
>> WatchService watcher ;
>> String S;
>> Path dir;
>>
>> @Override
>> public void open(Map conf, TopologyContext context,
>> SpoutOutputCollector collector) {
>> _collector = collector;
>>
>>
>> try{
>> f = FileSystems.getDefault();
>> watcher = f.newWatchService();
>>
>> S="/root/src/storm-starter/src/jvm/storm/starter";
>>
>> dir = f.getPath(S);
>>
>> dir.register(watcher, ENTRY_CREATE);
>>
>> } catch (IOException e) {
>>
>> e.printStackTrace();
>> }
>>
>> }
>> }
>>
>> @Override
>> public void nextTuple() {
>>
>>
>> Utils.sleep(2000);
>> for (;;) {
>>
>> // wait for key to be signaled
>> WatchKey key;
>> try {
>> key = watcher.take();
>> } catch (InterruptedException x) {
>> return;
>> }
>>
>> for (WatchEvent<?> event: key.pollEvents()) {
>> WatchEvent.Kind kind = event.kind();
>> if (kind == OVERFLOW) {
>> continue;
>> }
>>
>>
>> WatchEvent<Path> ev = (WatchEvent<Path>)event;
>> Path filename = ev.context();
>>
>> System.out.format("Emailing file %s%n", filename);
>> try {
>>
>>
>> fileReader = new
>> BufferedReader(new FileReader(new File(S+"/"+filename)));
>>
>> RandomAccessFile access =
>> null;
>> String line = null;
>> try
>> {
>> while ((line = fileReader.readLine())
>> != null)
>> {
>> if (line !=null)
>> {
>>
>> _collector.emit(new
>> Values(line));
>> }
>> }
>> } catch (IOException e) {
>> // TODO Auto-generated
>> catch block
>> e.printStackTrace();
>> }
>> } catch (FileNotFoundException e)
>> {
>> // TODO Auto-generated
>> catch block
>> e.printStackTrace();
>> }
>> }
>> }
>>
>> boolean valid = key.reset();
>> if (!valid) {
>> break;
>> }
>> }
>>
>>
>> }
>>
>> @Override
>> public void ack(Object id) {
>> }
>>
>> @Override
>> public void fail(Object id) {
>> }
>>
>> @Override
>> public void declareOutputFields(OutputFieldsDeclarer declarer) {
>> declarer.declare(new Fields("word"));
>> }
>>
>>
>>
>> 2014-05-06 12:35 GMT+01:00 Nathan Leung <nc...@gmail.com>:
>>
>> You are creating your file writer with append set to true. It's it
>>> possible your topology was run more than once?
>>> On May 6, 2014 6:39 AM, "Bilal Al Fartakh" <al...@gmail.com>
>>> wrote:
>>>
>>>> I'm using a bolt that receives tuples from another bolt (exclamation
>>>> bolt ) and writes it on a file , the problem I got is that I have
>>>> duplicated results four times , like when I emit a word , I found the word
>>>> Written four times . where's the problem possibly could be ?
>>>>
>>>>
>>>>
>>>>
>>>> public class PrinterBolty extends BaseBasicBolt {
>>>>
>>>> @Override
>>>> public void execute(Tuple tuple, BasicOutputCollector collector) {
>>>>
>>>> try {
>>>>
>>>> BufferedWriter output;
>>>> output = new BufferedWriter(new
>>>> FileWriter("/root/src/storm-starter/hh.txt", true));
>>>> output.newLine();
>>>> output.append(tuple.getString(0));
>>>> output.close();
>>>>
>>>> } catch (IOException e) {
>>>> // TODO Auto-generated catch block
>>>> e.printStackTrace();
>>>> }
>>>> }
>>>>
>>>> @Override
>>>> public void declareOutputFields(OutputFieldsDeclarer ofd) {
>>>> }
>>>>
>>>> }
>>>>
>>>>
>>>> --
>>>> *Al Fartakh Bilal*
>>>>
>>>
>>
>>
>> --
>> *Al Fartakh Bilal*
>>
>
>
Re: duplicated result
Posted by padma priya chitturi <pa...@gmail.com>.
Hi,
The issue lies with the number of tasks/executors specified for spout. Try
specifying 1 spout and see if you could see duplicates. I suppose there
would be no duplicates in specifying 1 spout and 1 exclamtion bolt.
On Tue, May 6, 2014 at 5:23 PM, Bilal Al Fartakh
<al...@gmail.com>wrote:
> HI ,Nathan and thank you for responding ,I appreciate it !
> no I'm not , I just run this topology for the first time
>
>
>
>
>
>
> public class Ex {
>
> public static class ExclamationBolt extends BaseRichBolt {
> OutputCollector _collector;
>
> @Override
> public void prepare(Map conf, TopologyContext context, OutputCollector
> collector) {
> _collector = collector;
> }
>
> @Override
> public void execute(Tuple tuple) {
> _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
> _collector.ack(tuple);
> }
>
> @Override
> public void declareOutputFields(OutputFieldsDeclarer declarer) {
> declarer.declare(new Fields("word"));
> }
>
>
> }
>
> public static void main(String[] args) throws Exception {
> TopologyBuilder builder = new TopologyBuilder();
> TryRead T = new TryRead();
> PrinterBolty P = new PrinterBolty();
> builder.setSpout("word", T, 10);
> builder.setBolt("exclaim1", new ExclamationBolt(),
> 3).shuffleGrouping("word");
> builder.setBolt("exclaim2", P , 2).shuffleGrouping("exclaim1");
>
>
>
> -------------------------------------------------------------------------------------
> my spout Tryread
>
>
> public class TryRead extends BaseRichSpout {
> SpoutOutputCollector _collector;
> Random _rand;
> BufferedReader fileReader;
> FileSystem f;
> WatchService watcher ;
> String S;
> Path dir;
>
> @Override
> public void open(Map conf, TopologyContext context,
> SpoutOutputCollector collector) {
> _collector = collector;
>
>
> try{
> f = FileSystems.getDefault();
> watcher = f.newWatchService();
>
> S="/root/src/storm-starter/src/jvm/storm/starter";
>
> dir = f.getPath(S);
>
> dir.register(watcher, ENTRY_CREATE);
>
> } catch (IOException e) {
>
> e.printStackTrace();
> }
>
> }
> }
>
> @Override
> public void nextTuple() {
>
>
> Utils.sleep(2000);
> for (;;) {
>
> // wait for key to be signaled
> WatchKey key;
> try {
> key = watcher.take();
> } catch (InterruptedException x) {
> return;
> }
>
> for (WatchEvent<?> event: key.pollEvents()) {
> WatchEvent.Kind kind = event.kind();
> if (kind == OVERFLOW) {
> continue;
> }
>
>
> WatchEvent<Path> ev = (WatchEvent<Path>)event;
> Path filename = ev.context();
>
> System.out.format("Emailing file %s%n", filename);
> try {
>
>
> fileReader = new
> BufferedReader(new FileReader(new File(S+"/"+filename)));
>
> RandomAccessFile access =
> null;
> String line = null;
> try
> {
> while ((line = fileReader.readLine())
> != null)
> {
> if (line !=null)
> {
>
> _collector.emit(new
> Values(line));
> }
> }
> } catch (IOException e) {
> // TODO Auto-generated
> catch block
> e.printStackTrace();
> }
> } catch (FileNotFoundException e) {
> // TODO Auto-generated
> catch block
> e.printStackTrace();
> }
> }
> }
>
> boolean valid = key.reset();
> if (!valid) {
> break;
> }
> }
>
>
> }
>
> @Override
> public void ack(Object id) {
> }
>
> @Override
> public void fail(Object id) {
> }
>
> @Override
> public void declareOutputFields(OutputFieldsDeclarer declarer) {
> declarer.declare(new Fields("word"));
> }
>
>
>
> 2014-05-06 12:35 GMT+01:00 Nathan Leung <nc...@gmail.com>:
>
> You are creating your file writer with append set to true. It's it
>> possible your topology was run more than once?
>> On May 6, 2014 6:39 AM, "Bilal Al Fartakh" <al...@gmail.com>
>> wrote:
>>
>>> I'm using a bolt that receives tuples from another bolt (exclamation
>>> bolt ) and writes it on a file , the problem I got is that I have
>>> duplicated results four times , like when I emit a word , I found the word
>>> Written four times . where's the problem possibly could be ?
>>>
>>>
>>>
>>>
>>> public class PrinterBolty extends BaseBasicBolt {
>>>
>>> @Override
>>> public void execute(Tuple tuple, BasicOutputCollector collector) {
>>>
>>> try {
>>>
>>> BufferedWriter output;
>>> output = new BufferedWriter(new
>>> FileWriter("/root/src/storm-starter/hh.txt", true));
>>> output.newLine();
>>> output.append(tuple.getString(0));
>>> output.close();
>>>
>>> } catch (IOException e) {
>>> // TODO Auto-generated catch block
>>> e.printStackTrace();
>>> }
>>> }
>>>
>>> @Override
>>> public void declareOutputFields(OutputFieldsDeclarer ofd) {
>>> }
>>>
>>> }
>>>
>>>
>>> --
>>> *Al Fartakh Bilal*
>>>
>>
>
>
> --
> *Al Fartakh Bilal*
>
Re: duplicated result
Posted by Nathan Leung <nc...@gmail.com>.
Can you share the contents of your input and output files?
On May 6, 2014 7:54 AM, "Bilal Al Fartakh" <al...@gmail.com> wrote:
> HI ,Nathan and thank you for responding ,I appreciate it !
> no I'm not , I just run this topology for the first time
>
>
>
>
>
>
> public class Ex {
>
> public static class ExclamationBolt extends BaseRichBolt {
> OutputCollector _collector;
>
> @Override
> public void prepare(Map conf, TopologyContext context, OutputCollector
> collector) {
> _collector = collector;
> }
>
> @Override
> public void execute(Tuple tuple) {
> _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
> _collector.ack(tuple);
> }
>
> @Override
> public void declareOutputFields(OutputFieldsDeclarer declarer) {
> declarer.declare(new Fields("word"));
> }
>
>
> }
>
> public static void main(String[] args) throws Exception {
> TopologyBuilder builder = new TopologyBuilder();
> TryRead T = new TryRead();
> PrinterBolty P = new PrinterBolty();
> builder.setSpout("word", T, 10);
> builder.setBolt("exclaim1", new ExclamationBolt(),
> 3).shuffleGrouping("word");
> builder.setBolt("exclaim2", P , 2).shuffleGrouping("exclaim1");
>
>
>
> -------------------------------------------------------------------------------------
> my spout Tryread
>
>
> public class TryRead extends BaseRichSpout {
> SpoutOutputCollector _collector;
> Random _rand;
> BufferedReader fileReader;
> FileSystem f;
> WatchService watcher ;
> String S;
> Path dir;
>
> @Override
> public void open(Map conf, TopologyContext context,
> SpoutOutputCollector collector) {
> _collector = collector;
>
>
> try{
> f = FileSystems.getDefault();
> watcher = f.newWatchService();
>
> S="/root/src/storm-starter/src/jvm/storm/starter";
>
> dir = f.getPath(S);
>
> dir.register(watcher, ENTRY_CREATE);
>
> } catch (IOException e) {
>
> e.printStackTrace();
> }
>
> }
> }
>
> @Override
> public void nextTuple() {
>
>
> Utils.sleep(2000);
> for (;;) {
>
> // wait for key to be signaled
> WatchKey key;
> try {
> key = watcher.take();
> } catch (InterruptedException x) {
> return;
> }
>
> for (WatchEvent<?> event: key.pollEvents()) {
> WatchEvent.Kind kind = event.kind();
> if (kind == OVERFLOW) {
> continue;
> }
>
>
> WatchEvent<Path> ev = (WatchEvent<Path>)event;
> Path filename = ev.context();
>
> System.out.format("Emailing file %s%n", filename);
> try {
>
>
> fileReader = new
> BufferedReader(new FileReader(new File(S+"/"+filename)));
>
> RandomAccessFile access =
> null;
> String line = null;
> try
> {
> while ((line = fileReader.readLine())
> != null)
> {
> if (line !=null)
> {
>
> _collector.emit(new
> Values(line));
> }
> }
> } catch (IOException e) {
> // TODO Auto-generated
> catch block
> e.printStackTrace();
> }
> } catch (FileNotFoundException e) {
> // TODO Auto-generated
> catch block
> e.printStackTrace();
> }
> }
> }
>
> boolean valid = key.reset();
> if (!valid) {
> break;
> }
> }
>
>
> }
>
> @Override
> public void ack(Object id) {
> }
>
> @Override
> public void fail(Object id) {
> }
>
> @Override
> public void declareOutputFields(OutputFieldsDeclarer declarer) {
> declarer.declare(new Fields("word"));
> }
>
>
>
> 2014-05-06 12:35 GMT+01:00 Nathan Leung <nc...@gmail.com>:
>
>> You are creating your file writer with append set to true. It's it
>> possible your topology was run more than once?
>> On May 6, 2014 6:39 AM, "Bilal Al Fartakh" <al...@gmail.com>
>> wrote:
>>
>>> I'm using a bolt that receives tuples from another bolt (exclamation
>>> bolt ) and writes it on a file , the problem I got is that I have
>>> duplicated results four times , like when I emit a word , I found the word
>>> Written four times . where's the problem possibly could be ?
>>>
>>>
>>>
>>>
>>> public class PrinterBolty extends BaseBasicBolt {
>>>
>>> @Override
>>> public void execute(Tuple tuple, BasicOutputCollector collector) {
>>>
>>> try {
>>>
>>> BufferedWriter output;
>>> output = new BufferedWriter(new
>>> FileWriter("/root/src/storm-starter/hh.txt", true));
>>> output.newLine();
>>> output.append(tuple.getString(0));
>>> output.close();
>>>
>>> } catch (IOException e) {
>>> // TODO Auto-generated catch block
>>> e.printStackTrace();
>>> }
>>> }
>>>
>>> @Override
>>> public void declareOutputFields(OutputFieldsDeclarer ofd) {
>>> }
>>>
>>> }
>>>
>>>
>>> --
>>> *Al Fartakh Bilal*
>>>
>>
>
>
> --
> *Al Fartakh Bilal*
>
Re: duplicated result
Posted by Bilal Al Fartakh <al...@gmail.com>.
HI ,Nathan and thank you for responding ,I appreciate it !
no I'm not , I just run this topology for the first time
public class Ex {
public static class ExclamationBolt extends BaseRichBolt {
OutputCollector _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector
collector) {
_collector = collector;
}
@Override
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
TryRead T = new TryRead();
PrinterBolty P = new PrinterBolty();
builder.setSpout("word", T, 10);
builder.setBolt("exclaim1", new ExclamationBolt(),
3).shuffleGrouping("word");
builder.setBolt("exclaim2", P , 2).shuffleGrouping("exclaim1");
-------------------------------------------------------------------------------------
my spout Tryread
public class TryRead extends BaseRichSpout {
SpoutOutputCollector _collector;
Random _rand;
BufferedReader fileReader;
FileSystem f;
WatchService watcher ;
String S;
Path dir;
@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
_collector = collector;
try{
f = FileSystems.getDefault();
watcher = f.newWatchService();
S="/root/src/storm-starter/src/jvm/storm/starter";
dir = f.getPath(S);
dir.register(watcher, ENTRY_CREATE);
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Override
public void nextTuple() {
Utils.sleep(2000);
for (;;) {
// wait for key to be signaled
WatchKey key;
try {
key = watcher.take();
} catch (InterruptedException x) {
return;
}
for (WatchEvent<?> event: key.pollEvents()) {
WatchEvent.Kind kind = event.kind();
if (kind == OVERFLOW) {
continue;
}
WatchEvent<Path> ev = (WatchEvent<Path>)event;
Path filename = ev.context();
System.out.format("Emailing file %s%n", filename);
try {
fileReader = new
BufferedReader(new FileReader(new File(S+"/"+filename)));
RandomAccessFile access =
null;
String line = null;
try
{
while ((line = fileReader.readLine()) !=
null)
{
if (line !=null)
{
_collector.emit(new
Values(line));
}
}
} catch (IOException e) {
// TODO Auto-generated
catch block
e.printStackTrace();
}
} catch (FileNotFoundException e) {
// TODO Auto-generated
catch block
e.printStackTrace();
}
}
}
boolean valid = key.reset();
if (!valid) {
break;
}
}
}
@Override
public void ack(Object id) {
}
@Override
public void fail(Object id) {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
2014-05-06 12:35 GMT+01:00 Nathan Leung <nc...@gmail.com>:
> You are creating your file writer with append set to true. It's it
> possible your topology was run more than once?
> On May 6, 2014 6:39 AM, "Bilal Al Fartakh" <al...@gmail.com>
> wrote:
>
>> I'm using a bolt that receives tuples from another bolt (exclamation bolt
>> ) and writes it on a file , the problem I got is that I have duplicated
>> results four times , like when I emit a word , I found the word Written four
>> times . where's the problem possibly could be ?
>>
>>
>>
>>
>> public class PrinterBolty extends BaseBasicBolt {
>>
>> @Override
>> public void execute(Tuple tuple, BasicOutputCollector collector) {
>>
>> try {
>>
>> BufferedWriter output;
>> output = new BufferedWriter(new
>> FileWriter("/root/src/storm-starter/hh.txt", true));
>> output.newLine();
>> output.append(tuple.getString(0));
>> output.close();
>>
>> } catch (IOException e) {
>> // TODO Auto-generated catch block
>> e.printStackTrace();
>> }
>> }
>>
>> @Override
>> public void declareOutputFields(OutputFieldsDeclarer ofd) {
>> }
>>
>> }
>>
>>
>> --
>> *Al Fartakh Bilal*
>>
>
--
*Al Fartakh Bilal*
Re: duplicated result
Posted by Nathan Leung <nc...@gmail.com>.
You are creating your file writer with append set to true. It's it possible
your topology was run more than once?
On May 6, 2014 6:39 AM, "Bilal Al Fartakh" <al...@gmail.com> wrote:
> I'm using a bolt that receives tuples from another bolt (exclamation bolt
> ) and writes it on a file , the problem I got is that I have duplicated
> results four times , like when I emit a word , I found the word Written four
> times . where's the problem possibly could be ?
>
>
>
>
> public class PrinterBolty extends BaseBasicBolt {
>
> @Override
> public void execute(Tuple tuple, BasicOutputCollector collector) {
>
> try {
>
> BufferedWriter output;
> output = new BufferedWriter(new
> FileWriter("/root/src/storm-starter/hh.txt", true));
> output.newLine();
> output.append(tuple.getString(0));
> output.close();
>
> } catch (IOException e) {
> // TODO Auto-generated catch block
> e.printStackTrace();
> }
> }
>
> @Override
> public void declareOutputFields(OutputFieldsDeclarer ofd) {
> }
>
> }
>
>
> --
> *Al Fartakh Bilal*
>