You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Bilal Al Fartakh <al...@gmail.com> on 2014/05/06 12:38:37 UTC

duplicated result

I'm using a bolt that receives tuples from another bolt (exclamation bolt )
and writes it on a file , the problem I got is that I have duplicated
results four times , like when I emit a word , I found the word Written four
times . where's the problem possibly could be ?




public class PrinterBolty extends BaseBasicBolt {

  @Override
  public void execute(Tuple tuple, BasicOutputCollector collector) {

try {

                        BufferedWriter output;
                        output = new BufferedWriter(new
FileWriter("/root/src/storm-starter/hh.txt", true));
                        output.newLine();
                        output.append(tuple.getString(0));
                        output.close();

                } catch (IOException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                }
 }

  @Override
  public void declareOutputFields(OutputFieldsDeclarer ofd) {
  }

}


-- 
*Al Fartakh Bilal*

Re: duplicated result

Posted by Bilal Al Fartakh <al...@gmail.com>.
Thank you  :D
specifying 1 spout was the solution .
thank you Padma and Nathan again for  your help !


2014-05-06 14:40 GMT+01:00 Nathan Leung <nc...@gmail.com>:

> Good point, but it shouldn't matter how many exclamation bolts there are.
> The number of spouts does because they are all reading the same file.
> On May 6, 2014 9:37 AM, "padma priya chitturi" <pa...@gmail.com>
> wrote:
>
>> Hi,
>>
>> The issue lies with the number of tasks/executors specified for spout.
>> Try specifying 1 spout and see if you could see duplicates. I suppose there
>> would be no duplicates in specifying 1 spout and 1 exclamtion bolt.
>>
>>
>> On Tue, May 6, 2014 at 5:23 PM, Bilal Al Fartakh <
>> alfartaj.bilal@gmail.com> wrote:
>>
>>> HI ,Nathan and thank you for responding ,I appreciate it  !
>>> no I'm not , I just run this topology for the first time
>>>
>>>
>>>
>>>
>>>
>>>
>>> public class Ex {
>>>
>>>   public static class ExclamationBolt extends BaseRichBolt {
>>>     OutputCollector _collector;
>>>
>>>     @Override
>>>     public void prepare(Map conf, TopologyContext context,
>>> OutputCollector collector) {
>>>       _collector = collector;
>>>     }
>>>
>>>     @Override
>>>      public void execute(Tuple tuple) {
>>>       _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
>>>       _collector.ack(tuple);
>>>     }
>>>
>>>     @Override
>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>       declarer.declare(new Fields("word"));
>>>     }
>>>
>>>
>>>   }
>>>
>>>   public static void main(String[] args) throws Exception {
>>>     TopologyBuilder builder = new TopologyBuilder();
>>> TryRead T = new TryRead();
>>> PrinterBolty P = new PrinterBolty();
>>>     builder.setSpout("word", T, 10);
>>>     builder.setBolt("exclaim1", new ExclamationBolt(),
>>> 3).shuffleGrouping("word");
>>>     builder.setBolt("exclaim2", P , 2).shuffleGrouping("exclaim1");
>>>
>>>
>>>
>>> -------------------------------------------------------------------------------------
>>> my spout Tryread
>>>
>>>
>>> public class TryRead extends BaseRichSpout {
>>>           SpoutOutputCollector _collector;
>>>           Random _rand;
>>>           BufferedReader fileReader;
>>>           FileSystem f;
>>>           WatchService watcher ;
>>>           String S;
>>>           Path dir;
>>>
>>>           @Override
>>>           public void open(Map conf, TopologyContext context,
>>> SpoutOutputCollector collector) {
>>>                   _collector = collector;
>>>
>>>
>>> try{
>>>                         f = FileSystems.getDefault();
>>>                                  watcher = f.newWatchService();
>>>
>>>  S="/root/src/storm-starter/src/jvm/storm/starter";
>>>
>>>                                  dir = f.getPath(S);
>>>
>>>                                 dir.register(watcher, ENTRY_CREATE);
>>>
>>> } catch (IOException e) {
>>>
>>>                                                 e.printStackTrace();
>>>                                         }
>>>
>>>           }
>>>  }
>>>
>>>           @Override
>>>           public void nextTuple() {
>>>
>>>
>>>                   Utils.sleep(2000);
>>>                    for (;;) {
>>>
>>>                     // wait for key to be signaled
>>>                     WatchKey key;
>>>                     try {
>>>                         key = watcher.take();
>>>                     } catch (InterruptedException x) {
>>>                         return;
>>>                     }
>>>
>>>                     for (WatchEvent<?> event: key.pollEvents()) {
>>>                         WatchEvent.Kind kind = event.kind();
>>>  if (kind == OVERFLOW) {
>>>                             continue;
>>>                         }
>>>
>>>
>>>                         WatchEvent<Path> ev = (WatchEvent<Path>)event;
>>>                         Path filename = ev.context();
>>>
>>>                         System.out.format("Emailing file %s%n",
>>> filename);
>>>                         try {
>>>
>>>
>>>                                                  fileReader = new
>>> BufferedReader(new FileReader(new File(S+"/"+filename)));
>>>
>>>                                                 RandomAccessFile access
>>> = null;
>>>                             String line = null;
>>>                                try
>>>                                {
>>>                                    while ((line = fileReader.readLine())
>>> != null)
>>>                                    {
>>>                                        if (line !=null)
>>>                                        {
>>>
>>>                                            _collector.emit(new
>>> Values(line));
>>>                                         }
>>>                                    }
>>>                                } catch (IOException e) {
>>>                                                 // TODO Auto-generated
>>> catch block
>>>                                                 e.printStackTrace();
>>>                                         }
>>>                                         } catch (FileNotFoundException
>>> e) {
>>>                                                  // TODO Auto-generated
>>> catch block
>>>                                                 e.printStackTrace();
>>>                                         }
>>>                     }
>>>    }
>>>
>>>                     boolean valid = key.reset();
>>>                     if (!valid) {
>>>                             break;
>>>                     }
>>>                 }
>>>
>>>
>>>           }
>>>
>>>           @Override
>>>           public void ack(Object id) {
>>>           }
>>>
>>>           @Override
>>>           public void fail(Object id) {
>>>           }
>>>
>>>           @Override
>>>           public void declareOutputFields(OutputFieldsDeclarer declarer)
>>> {
>>>             declarer.declare(new Fields("word"));
>>>           }
>>>
>>>
>>>
>>> 2014-05-06 12:35 GMT+01:00 Nathan Leung <nc...@gmail.com>:
>>>
>>> You are creating your file writer with append set to true. It's it
>>>> possible your topology was run more than once?
>>>> On May 6, 2014 6:39 AM, "Bilal Al Fartakh" <al...@gmail.com>
>>>> wrote:
>>>>
>>>>> I'm using a bolt that receives tuples from another bolt (exclamation
>>>>> bolt ) and writes it on a file , the problem I got is that I have
>>>>> duplicated results four times , like when I emit a word , I found the word
>>>>> Written four times . where's the problem possibly could be ?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> public class PrinterBolty extends BaseBasicBolt {
>>>>>
>>>>>   @Override
>>>>>   public void execute(Tuple tuple, BasicOutputCollector collector) {
>>>>>
>>>>> try {
>>>>>
>>>>>                         BufferedWriter output;
>>>>>                         output = new BufferedWriter(new
>>>>> FileWriter("/root/src/storm-starter/hh.txt", true));
>>>>>                         output.newLine();
>>>>>                         output.append(tuple.getString(0));
>>>>>                         output.close();
>>>>>
>>>>>                 } catch (IOException e) {
>>>>>                         // TODO Auto-generated catch block
>>>>>                         e.printStackTrace();
>>>>>                 }
>>>>>  }
>>>>>
>>>>>   @Override
>>>>>   public void declareOutputFields(OutputFieldsDeclarer ofd) {
>>>>>   }
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>> --
>>>>> *Al Fartakh Bilal*
>>>>>
>>>>
>>>
>>>
>>> --
>>> *Al Fartakh Bilal*
>>>
>>
>>


-- 
*Al Fartakh Bilal*

Re: duplicated result

Posted by Nathan Leung <nc...@gmail.com>.
Good point, but it shouldn't matter how many exclamation bolts there are.
The number of spouts does because they are all reading the same file.
On May 6, 2014 9:37 AM, "padma priya chitturi" <pa...@gmail.com>
wrote:

> Hi,
>
> The issue lies with the number of tasks/executors specified for spout. Try
> specifying 1 spout and see if you could see duplicates. I suppose there
> would be no duplicates in specifying 1 spout and 1 exclamtion bolt.
>
>
> On Tue, May 6, 2014 at 5:23 PM, Bilal Al Fartakh <alfartaj.bilal@gmail.com
> > wrote:
>
>> HI ,Nathan and thank you for responding ,I appreciate it  !
>> no I'm not , I just run this topology for the first time
>>
>>
>>
>>
>>
>>
>> public class Ex {
>>
>>   public static class ExclamationBolt extends BaseRichBolt {
>>     OutputCollector _collector;
>>
>>     @Override
>>     public void prepare(Map conf, TopologyContext context,
>> OutputCollector collector) {
>>       _collector = collector;
>>     }
>>
>>     @Override
>>      public void execute(Tuple tuple) {
>>       _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
>>       _collector.ack(tuple);
>>     }
>>
>>     @Override
>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>       declarer.declare(new Fields("word"));
>>     }
>>
>>
>>   }
>>
>>   public static void main(String[] args) throws Exception {
>>     TopologyBuilder builder = new TopologyBuilder();
>> TryRead T = new TryRead();
>> PrinterBolty P = new PrinterBolty();
>>     builder.setSpout("word", T, 10);
>>     builder.setBolt("exclaim1", new ExclamationBolt(),
>> 3).shuffleGrouping("word");
>>     builder.setBolt("exclaim2", P , 2).shuffleGrouping("exclaim1");
>>
>>
>>
>> -------------------------------------------------------------------------------------
>> my spout Tryread
>>
>>
>> public class TryRead extends BaseRichSpout {
>>           SpoutOutputCollector _collector;
>>           Random _rand;
>>           BufferedReader fileReader;
>>           FileSystem f;
>>           WatchService watcher ;
>>           String S;
>>           Path dir;
>>
>>           @Override
>>           public void open(Map conf, TopologyContext context,
>> SpoutOutputCollector collector) {
>>                   _collector = collector;
>>
>>
>> try{
>>                         f = FileSystems.getDefault();
>>                                  watcher = f.newWatchService();
>>
>>  S="/root/src/storm-starter/src/jvm/storm/starter";
>>
>>                                  dir = f.getPath(S);
>>
>>                                 dir.register(watcher, ENTRY_CREATE);
>>
>> } catch (IOException e) {
>>
>>                                                 e.printStackTrace();
>>                                         }
>>
>>           }
>>  }
>>
>>           @Override
>>           public void nextTuple() {
>>
>>
>>                   Utils.sleep(2000);
>>                    for (;;) {
>>
>>                     // wait for key to be signaled
>>                     WatchKey key;
>>                     try {
>>                         key = watcher.take();
>>                     } catch (InterruptedException x) {
>>                         return;
>>                     }
>>
>>                     for (WatchEvent<?> event: key.pollEvents()) {
>>                         WatchEvent.Kind kind = event.kind();
>>  if (kind == OVERFLOW) {
>>                             continue;
>>                         }
>>
>>
>>                         WatchEvent<Path> ev = (WatchEvent<Path>)event;
>>                         Path filename = ev.context();
>>
>>                         System.out.format("Emailing file %s%n", filename);
>>                         try {
>>
>>
>>                                                  fileReader = new
>> BufferedReader(new FileReader(new File(S+"/"+filename)));
>>
>>                                                 RandomAccessFile access =
>> null;
>>                             String line = null;
>>                                try
>>                                {
>>                                    while ((line = fileReader.readLine())
>> != null)
>>                                    {
>>                                        if (line !=null)
>>                                        {
>>
>>                                            _collector.emit(new
>> Values(line));
>>                                         }
>>                                    }
>>                                } catch (IOException e) {
>>                                                 // TODO Auto-generated
>> catch block
>>                                                 e.printStackTrace();
>>                                         }
>>                                         } catch (FileNotFoundException e)
>> {
>>                                                  // TODO Auto-generated
>> catch block
>>                                                 e.printStackTrace();
>>                                         }
>>                     }
>>    }
>>
>>                     boolean valid = key.reset();
>>                     if (!valid) {
>>                             break;
>>                     }
>>                 }
>>
>>
>>           }
>>
>>           @Override
>>           public void ack(Object id) {
>>           }
>>
>>           @Override
>>           public void fail(Object id) {
>>           }
>>
>>           @Override
>>           public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>             declarer.declare(new Fields("word"));
>>           }
>>
>>
>>
>> 2014-05-06 12:35 GMT+01:00 Nathan Leung <nc...@gmail.com>:
>>
>> You are creating your file writer with append set to true. It's it
>>> possible your topology was run more than once?
>>> On May 6, 2014 6:39 AM, "Bilal Al Fartakh" <al...@gmail.com>
>>> wrote:
>>>
>>>> I'm using a bolt that receives tuples from another bolt (exclamation
>>>> bolt ) and writes it on a file , the problem I got is that I have
>>>> duplicated results four times , like when I emit a word , I found the word
>>>> Written four times . where's the problem possibly could be ?
>>>>
>>>>
>>>>
>>>>
>>>> public class PrinterBolty extends BaseBasicBolt {
>>>>
>>>>   @Override
>>>>   public void execute(Tuple tuple, BasicOutputCollector collector) {
>>>>
>>>> try {
>>>>
>>>>                         BufferedWriter output;
>>>>                         output = new BufferedWriter(new
>>>> FileWriter("/root/src/storm-starter/hh.txt", true));
>>>>                         output.newLine();
>>>>                         output.append(tuple.getString(0));
>>>>                         output.close();
>>>>
>>>>                 } catch (IOException e) {
>>>>                         // TODO Auto-generated catch block
>>>>                         e.printStackTrace();
>>>>                 }
>>>>  }
>>>>
>>>>   @Override
>>>>   public void declareOutputFields(OutputFieldsDeclarer ofd) {
>>>>   }
>>>>
>>>> }
>>>>
>>>>
>>>> --
>>>> *Al Fartakh Bilal*
>>>>
>>>
>>
>>
>> --
>> *Al Fartakh Bilal*
>>
>
>

Re: duplicated result

Posted by padma priya chitturi <pa...@gmail.com>.
Hi,

The issue lies with the number of tasks/executors specified for spout. Try
specifying 1 spout and see if you could see duplicates. I suppose there
would be no duplicates in specifying 1 spout and 1 exclamtion bolt.


On Tue, May 6, 2014 at 5:23 PM, Bilal Al Fartakh
<al...@gmail.com>wrote:

> HI ,Nathan and thank you for responding ,I appreciate it  !
> no I'm not , I just run this topology for the first time
>
>
>
>
>
>
> public class Ex {
>
>   public static class ExclamationBolt extends BaseRichBolt {
>     OutputCollector _collector;
>
>     @Override
>     public void prepare(Map conf, TopologyContext context, OutputCollector
> collector) {
>       _collector = collector;
>     }
>
>     @Override
>     public void execute(Tuple tuple) {
>       _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
>       _collector.ack(tuple);
>     }
>
>     @Override
>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>       declarer.declare(new Fields("word"));
>     }
>
>
>   }
>
>   public static void main(String[] args) throws Exception {
>     TopologyBuilder builder = new TopologyBuilder();
> TryRead T = new TryRead();
> PrinterBolty P = new PrinterBolty();
>     builder.setSpout("word", T, 10);
>     builder.setBolt("exclaim1", new ExclamationBolt(),
> 3).shuffleGrouping("word");
>     builder.setBolt("exclaim2", P , 2).shuffleGrouping("exclaim1");
>
>
>
> -------------------------------------------------------------------------------------
> my spout Tryread
>
>
> public class TryRead extends BaseRichSpout {
>           SpoutOutputCollector _collector;
>           Random _rand;
>           BufferedReader fileReader;
>           FileSystem f;
>           WatchService watcher ;
>           String S;
>           Path dir;
>
>           @Override
>           public void open(Map conf, TopologyContext context,
> SpoutOutputCollector collector) {
>                   _collector = collector;
>
>
> try{
>                         f = FileSystems.getDefault();
>                                  watcher = f.newWatchService();
>
>  S="/root/src/storm-starter/src/jvm/storm/starter";
>
>                                  dir = f.getPath(S);
>
>                                 dir.register(watcher, ENTRY_CREATE);
>
> } catch (IOException e) {
>
>                                                 e.printStackTrace();
>                                         }
>
>           }
>  }
>
>           @Override
>           public void nextTuple() {
>
>
>                   Utils.sleep(2000);
>                    for (;;) {
>
>                     // wait for key to be signaled
>                     WatchKey key;
>                     try {
>                         key = watcher.take();
>                     } catch (InterruptedException x) {
>                         return;
>                     }
>
>                     for (WatchEvent<?> event: key.pollEvents()) {
>                         WatchEvent.Kind kind = event.kind();
>  if (kind == OVERFLOW) {
>                             continue;
>                         }
>
>
>                         WatchEvent<Path> ev = (WatchEvent<Path>)event;
>                         Path filename = ev.context();
>
>                         System.out.format("Emailing file %s%n", filename);
>                         try {
>
>
>                                                  fileReader = new
> BufferedReader(new FileReader(new File(S+"/"+filename)));
>
>                                                 RandomAccessFile access =
> null;
>                             String line = null;
>                                try
>                                {
>                                    while ((line = fileReader.readLine())
> != null)
>                                    {
>                                        if (line !=null)
>                                        {
>
>                                            _collector.emit(new
> Values(line));
>                                        }
>                                    }
>                                } catch (IOException e) {
>                                                 // TODO Auto-generated
> catch block
>                                                 e.printStackTrace();
>                                         }
>                                         } catch (FileNotFoundException e) {
>                                                 // TODO Auto-generated
> catch block
>                                                 e.printStackTrace();
>                                         }
>                     }
>    }
>
>                     boolean valid = key.reset();
>                     if (!valid) {
>                             break;
>                     }
>                 }
>
>
>           }
>
>           @Override
>           public void ack(Object id) {
>           }
>
>           @Override
>           public void fail(Object id) {
>           }
>
>           @Override
>           public void declareOutputFields(OutputFieldsDeclarer declarer) {
>             declarer.declare(new Fields("word"));
>           }
>
>
>
> 2014-05-06 12:35 GMT+01:00 Nathan Leung <nc...@gmail.com>:
>
> You are creating your file writer with append set to true. It's it
>> possible your topology was run more than once?
>> On May 6, 2014 6:39 AM, "Bilal Al Fartakh" <al...@gmail.com>
>> wrote:
>>
>>> I'm using a bolt that receives tuples from another bolt (exclamation
>>> bolt ) and writes it on a file , the problem I got is that I have
>>> duplicated results four times , like when I emit a word , I found the word
>>> Written four times . where's the problem possibly could be ?
>>>
>>>
>>>
>>>
>>> public class PrinterBolty extends BaseBasicBolt {
>>>
>>>   @Override
>>>   public void execute(Tuple tuple, BasicOutputCollector collector) {
>>>
>>> try {
>>>
>>>                         BufferedWriter output;
>>>                         output = new BufferedWriter(new
>>> FileWriter("/root/src/storm-starter/hh.txt", true));
>>>                         output.newLine();
>>>                         output.append(tuple.getString(0));
>>>                         output.close();
>>>
>>>                 } catch (IOException e) {
>>>                         // TODO Auto-generated catch block
>>>                         e.printStackTrace();
>>>                 }
>>>  }
>>>
>>>   @Override
>>>   public void declareOutputFields(OutputFieldsDeclarer ofd) {
>>>   }
>>>
>>> }
>>>
>>>
>>> --
>>> *Al Fartakh Bilal*
>>>
>>
>
>
> --
> *Al Fartakh Bilal*
>

Re: duplicated result

Posted by Nathan Leung <nc...@gmail.com>.
Can you share the contents of your input and output files?
On May 6, 2014 7:54 AM, "Bilal Al Fartakh" <al...@gmail.com> wrote:

> HI ,Nathan and thank you for responding ,I appreciate it  !
> no I'm not , I just run this topology for the first time
>
>
>
>
>
>
> public class Ex {
>
>   public static class ExclamationBolt extends BaseRichBolt {
>     OutputCollector _collector;
>
>     @Override
>     public void prepare(Map conf, TopologyContext context, OutputCollector
> collector) {
>       _collector = collector;
>     }
>
>     @Override
>     public void execute(Tuple tuple) {
>       _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
>       _collector.ack(tuple);
>     }
>
>     @Override
>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>       declarer.declare(new Fields("word"));
>     }
>
>
>   }
>
>   public static void main(String[] args) throws Exception {
>     TopologyBuilder builder = new TopologyBuilder();
> TryRead T = new TryRead();
> PrinterBolty P = new PrinterBolty();
>     builder.setSpout("word", T, 10);
>     builder.setBolt("exclaim1", new ExclamationBolt(),
> 3).shuffleGrouping("word");
>     builder.setBolt("exclaim2", P , 2).shuffleGrouping("exclaim1");
>
>
>
> -------------------------------------------------------------------------------------
> my spout Tryread
>
>
> public class TryRead extends BaseRichSpout {
>           SpoutOutputCollector _collector;
>           Random _rand;
>           BufferedReader fileReader;
>           FileSystem f;
>           WatchService watcher ;
>           String S;
>           Path dir;
>
>           @Override
>           public void open(Map conf, TopologyContext context,
> SpoutOutputCollector collector) {
>                   _collector = collector;
>
>
> try{
>                         f = FileSystems.getDefault();
>                                  watcher = f.newWatchService();
>
>  S="/root/src/storm-starter/src/jvm/storm/starter";
>
>                                  dir = f.getPath(S);
>
>                                 dir.register(watcher, ENTRY_CREATE);
>
> } catch (IOException e) {
>
>                                                 e.printStackTrace();
>                                         }
>
>           }
>  }
>
>           @Override
>           public void nextTuple() {
>
>
>                   Utils.sleep(2000);
>                    for (;;) {
>
>                     // wait for key to be signaled
>                     WatchKey key;
>                     try {
>                         key = watcher.take();
>                     } catch (InterruptedException x) {
>                         return;
>                     }
>
>                     for (WatchEvent<?> event: key.pollEvents()) {
>                         WatchEvent.Kind kind = event.kind();
>  if (kind == OVERFLOW) {
>                             continue;
>                         }
>
>
>                         WatchEvent<Path> ev = (WatchEvent<Path>)event;
>                         Path filename = ev.context();
>
>                         System.out.format("Emailing file %s%n", filename);
>                         try {
>
>
>                                                  fileReader = new
> BufferedReader(new FileReader(new File(S+"/"+filename)));
>
>                                                 RandomAccessFile access =
> null;
>                             String line = null;
>                                try
>                                {
>                                    while ((line = fileReader.readLine())
> != null)
>                                    {
>                                        if (line !=null)
>                                        {
>
>                                            _collector.emit(new
> Values(line));
>                                        }
>                                    }
>                                } catch (IOException e) {
>                                                 // TODO Auto-generated
> catch block
>                                                 e.printStackTrace();
>                                         }
>                                         } catch (FileNotFoundException e) {
>                                                 // TODO Auto-generated
> catch block
>                                                 e.printStackTrace();
>                                         }
>                     }
>    }
>
>                     boolean valid = key.reset();
>                     if (!valid) {
>                             break;
>                     }
>                 }
>
>
>           }
>
>           @Override
>           public void ack(Object id) {
>           }
>
>           @Override
>           public void fail(Object id) {
>           }
>
>           @Override
>           public void declareOutputFields(OutputFieldsDeclarer declarer) {
>             declarer.declare(new Fields("word"));
>           }
>
>
>
> 2014-05-06 12:35 GMT+01:00 Nathan Leung <nc...@gmail.com>:
>
>> You are creating your file writer with append set to true. It's it
>> possible your topology was run more than once?
>> On May 6, 2014 6:39 AM, "Bilal Al Fartakh" <al...@gmail.com>
>> wrote:
>>
>>> I'm using a bolt that receives tuples from another bolt (exclamation
>>> bolt ) and writes it on a file , the problem I got is that I have
>>> duplicated results four times , like when I emit a word , I found the word
>>> Written four times . where's the problem possibly could be ?
>>>
>>>
>>>
>>>
>>> public class PrinterBolty extends BaseBasicBolt {
>>>
>>>   @Override
>>>   public void execute(Tuple tuple, BasicOutputCollector collector) {
>>>
>>> try {
>>>
>>>                         BufferedWriter output;
>>>                         output = new BufferedWriter(new
>>> FileWriter("/root/src/storm-starter/hh.txt", true));
>>>                         output.newLine();
>>>                         output.append(tuple.getString(0));
>>>                         output.close();
>>>
>>>                 } catch (IOException e) {
>>>                         // TODO Auto-generated catch block
>>>                         e.printStackTrace();
>>>                 }
>>>  }
>>>
>>>   @Override
>>>   public void declareOutputFields(OutputFieldsDeclarer ofd) {
>>>   }
>>>
>>> }
>>>
>>>
>>> --
>>> *Al Fartakh Bilal*
>>>
>>
>
>
> --
> *Al Fartakh Bilal*
>

Re: duplicated result

Posted by Bilal Al Fartakh <al...@gmail.com>.
HI ,Nathan and thank you for responding ,I appreciate it  !
no I'm not , I just run this topology for the first time






public class Ex {

  public static class ExclamationBolt extends BaseRichBolt {
    OutputCollector _collector;

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector
collector) {
      _collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
      _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
      _collector.ack(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
    }


  }

  public static void main(String[] args) throws Exception {
    TopologyBuilder builder = new TopologyBuilder();
TryRead T = new TryRead();
PrinterBolty P = new PrinterBolty();
    builder.setSpout("word", T, 10);
    builder.setBolt("exclaim1", new ExclamationBolt(),
3).shuffleGrouping("word");
    builder.setBolt("exclaim2", P , 2).shuffleGrouping("exclaim1");


-------------------------------------------------------------------------------------
my spout Tryread


public class TryRead extends BaseRichSpout {
          SpoutOutputCollector _collector;
          Random _rand;
          BufferedReader fileReader;
          FileSystem f;
          WatchService watcher ;
          String S;
          Path dir;

          @Override
          public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
                  _collector = collector;


try{
                        f = FileSystems.getDefault();
                                 watcher = f.newWatchService();

 S="/root/src/storm-starter/src/jvm/storm/starter";

                                 dir = f.getPath(S);

                                dir.register(watcher, ENTRY_CREATE);

} catch (IOException e) {

                                                e.printStackTrace();
                                        }

          }
 }

          @Override
          public void nextTuple() {


                  Utils.sleep(2000);
                   for (;;) {

                    // wait for key to be signaled
                    WatchKey key;
                    try {
                        key = watcher.take();
                    } catch (InterruptedException x) {
                        return;
                    }

                    for (WatchEvent<?> event: key.pollEvents()) {
                        WatchEvent.Kind kind = event.kind();
 if (kind == OVERFLOW) {
                            continue;
                        }


                        WatchEvent<Path> ev = (WatchEvent<Path>)event;
                        Path filename = ev.context();

                        System.out.format("Emailing file %s%n", filename);
                        try {


                                                 fileReader = new
BufferedReader(new FileReader(new File(S+"/"+filename)));

                                                RandomAccessFile access =
null;
                            String line = null;
                               try
                               {
                                   while ((line = fileReader.readLine()) !=
null)
                                   {
                                       if (line !=null)
                                       {

                                           _collector.emit(new
Values(line));
                                       }
                                   }
                               } catch (IOException e) {
                                                // TODO Auto-generated
catch block
                                                e.printStackTrace();
                                        }
                                        } catch (FileNotFoundException e) {
                                                // TODO Auto-generated
catch block
                                                e.printStackTrace();
                                        }
                    }
   }

                    boolean valid = key.reset();
                    if (!valid) {
                            break;
                    }
                }


          }

          @Override
          public void ack(Object id) {
          }

          @Override
          public void fail(Object id) {
          }

          @Override
          public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
          }



2014-05-06 12:35 GMT+01:00 Nathan Leung <nc...@gmail.com>:

> You are creating your file writer with append set to true. It's it
> possible your topology was run more than once?
> On May 6, 2014 6:39 AM, "Bilal Al Fartakh" <al...@gmail.com>
> wrote:
>
>> I'm using a bolt that receives tuples from another bolt (exclamation bolt
>> ) and writes it on a file , the problem I got is that I have duplicated
>> results four times , like when I emit a word , I found the word Written four
>> times . where's the problem possibly could be ?
>>
>>
>>
>>
>> public class PrinterBolty extends BaseBasicBolt {
>>
>>   @Override
>>   public void execute(Tuple tuple, BasicOutputCollector collector) {
>>
>> try {
>>
>>                         BufferedWriter output;
>>                         output = new BufferedWriter(new
>> FileWriter("/root/src/storm-starter/hh.txt", true));
>>                         output.newLine();
>>                         output.append(tuple.getString(0));
>>                         output.close();
>>
>>                 } catch (IOException e) {
>>                         // TODO Auto-generated catch block
>>                         e.printStackTrace();
>>                 }
>>  }
>>
>>   @Override
>>   public void declareOutputFields(OutputFieldsDeclarer ofd) {
>>   }
>>
>> }
>>
>>
>> --
>> *Al Fartakh Bilal*
>>
>


-- 
*Al Fartakh Bilal*

Re: duplicated result

Posted by Nathan Leung <nc...@gmail.com>.
You are creating your file writer with append set to true. It's it possible
your topology was run more than once?
On May 6, 2014 6:39 AM, "Bilal Al Fartakh" <al...@gmail.com> wrote:

> I'm using a bolt that receives tuples from another bolt (exclamation bolt
> ) and writes it on a file , the problem I got is that I have duplicated
> results four times , like when I emit a word , I found the word Written four
> times . where's the problem possibly could be ?
>
>
>
>
> public class PrinterBolty extends BaseBasicBolt {
>
>   @Override
>   public void execute(Tuple tuple, BasicOutputCollector collector) {
>
> try {
>
>                         BufferedWriter output;
>                         output = new BufferedWriter(new
> FileWriter("/root/src/storm-starter/hh.txt", true));
>                         output.newLine();
>                         output.append(tuple.getString(0));
>                         output.close();
>
>                 } catch (IOException e) {
>                         // TODO Auto-generated catch block
>                         e.printStackTrace();
>                 }
>  }
>
>   @Override
>   public void declareOutputFields(OutputFieldsDeclarer ofd) {
>   }
>
> }
>
>
> --
> *Al Fartakh Bilal*
>