You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Margusja <ma...@roo.ee> on 2014/06/02 12:36:52 UTC

Worker dies (bolt)

Hi

I am using apache-storm-0.9.1-incubating.
I have simple topology: Spout reads from kafka topic and Bolt writes 
lines from spout to HBase.

recently we did a test - we send 300 000 000 messages over kafka-rest -> 
kafka-queue -> storm topology -> hbase.

I noticed that around one hour and around 2500 messages worker died. PID 
is there and process is up but bolt's execute method hangs.

Bolts code is:
package storm;
   2
   3 import java.util.Map;
   4 import java.util.UUID;
   5
   6 import org.apache.hadoop.conf.Configuration;
   7 import org.apache.hadoop.hbase.HBaseConfiguration;
   8 import org.apache.hadoop.hbase.client.HTableInterface;
   9 import org.apache.hadoop.hbase.client.HTablePool;
  10 import org.apache.hadoop.hbase.client.Put;
  11 import org.apache.hadoop.hbase.util.Bytes;
  12
  13 import backtype.storm.task.TopologyContext;
  14 import backtype.storm.topology.BasicOutputCollector;
  15 import backtype.storm.topology.OutputFieldsDeclarer;
  16 import backtype.storm.topology.base.BaseBasicBolt;
  17 import backtype.storm.tuple.Fields;
  18 import backtype.storm.tuple.Tuple;
  19 import backtype.storm.tuple.Values;

public class HBaseWriterBolt extends BaseBasicBolt
  22 {
  23
  24     HTableInterface usersTable;
  25     HTablePool pool;
  26     int count = 0;
  27
  28          @Override
  29          public void prepare(Map stormConf, TopologyContext context) {
  30                  Configuration conf = HBaseConfiguration.create();
  31 conf.set("hbase.defaults.for.version","0.96.0.2.0.6.0-76-hadoop2");
  32 conf.set("hbase.defaults.for.version.skip","true");
  33                  conf.set("hbase.zookeeper.quorum", "vm24,vm37,vm38");
  34                  conf.set("hbase.zookeeper.property.clientPort", 
"2181");
  35                  conf.set("hbase.rootdir", 
"hdfs://vm38:8020/user/hbase/data");
  36                  //conf.set("zookeeper.znode.parent", 
"/hbase-unsecure");
  37
  38                  pool = new HTablePool(conf, 1);
  39                  usersTable = pool.getTable("kafkademo1");
  40          }
41
  42         @Override
  43         public void execute(Tuple tuple, BasicOutputCollector 
collector)
  44         {
  45                 String line  = tuple.getString(0);
  46
  47                 Put p = new 
Put(Bytes.toBytes(UUID.randomUUID().toString()));
  48                 p.add(Bytes.toBytes("info"), Bytes.toBytes("line"), 
Bytes.toBytes(line));
  49
  50                 try {
  51                         usersTable.put(p);
  52                         count ++;
  53                         System.out.println("Count: "+ count);
  54                 }
  55                 catch (Exception e){
  56                         e.printStackTrace();
  57                 }
  58                 collector.emit(new Values(line));
  59
  60         }
  61
  62         @Override
  63         public void declareOutputFields(OutputFieldsDeclarer declarer)
  64         {
  65                 declarer.declare(new Fields("line"));
  66         }
  67
  68         @Override
  69         public void cleanup()
  70         {
  71                 try {
  72                         usersTable.close();
  73                 }
  74                 catch (Exception e){
  75                         e.printStackTrace();
  76                 }
  77         }
  78
  79 }

line in execute method: System.out.println("Count: "+ count); added in 
debug purpose to see in log that bolt is running.

In to Spout in method  nextTuple()
I added debug line: System.out.println("Message from the Topic ...");

After some time around 50minutes in log file I can see that Spout is 
working but Bolt is died.

Any ideas?

-- 
Best regards, Margus (Margusja) Roo
+372 51 48 780
http://margus.roo.ee
http://ee.linkedin.com/in/margusroo
skype: margusja
ldapsearch -x -h ldap.sk.ee -b c=EE "(serialNumber=37303140314)"


Re: Worker dies (bolt)

Posted by Margusja <ma...@roo.ee>.
Ok got more info.
Looks like the problem is related with spout.

I changed spout:

  32         public void open(Map conf, TopologyContext 
context,SpoutOutputCollector collector)
  33         {
  34                 this.collector = collector;
  35
  36                 Properties props = new Properties();
  37                 props.put("zookeeper.connect", 
"vm24:2181,vm37:2181,vm38:2181");
  38                 props.put("group.id", "testgroup");
  39                 props.put("zookeeper.session.timeout.ms", "500");
  40                 props.put("zookeeper.sync.time.ms", "250");
  41                 props.put("auto.commit.interval.ms", "1000");
  42                 consumer = Consumer.createJavaConsumerConnector(new 
ConsumerConfig(props));
  43                 this.topic = "kafkademo1";
  44
  45
  46         }

to

  32         public void open(Map conf, TopologyContext 
context,SpoutOutputCollector collector)
  33         {
  34                 this.collector = collector;
  35
  36                 Properties props = new Properties();
  37                 props.put("zookeeper.connect", 
"vm24:2181,vm37:2181,vm38:2181");
  38                 props.put("group.id", "testgroup");
  39                 //props.put("zookeeper.session.timeout.ms", "500");
  40                 //props.put("zookeeper.sync.time.ms", "250");
  41                 //props.put("auto.commit.interval.ms", "1000");
  42                 consumer = Consumer.createJavaConsumerConnector(new 
ConsumerConfig(props));
  43                 this.topic = "kafkademo1";
  44
  45
  46         }

and
  48         public void nextTuple()
  49         {
  50
  55
  56             Map<String, Integer> topicCount = new HashMap<String, 
Integer>();
  57             // Define single thread for topic
  58             topicCount.put(topic, new Integer(1));
  59             Map<String, List<KafkaStream<byte[], byte[]>>> 
consumerStreams = consumer.createMessageStreams(topicCount);
  60             List<KafkaStream<byte[], byte[]>> streams = 
consumerStreams.get(topic);
  61             for (final KafkaStream stream : streams) {
  62               ConsumerIterator<byte[], byte[]> consumerIte = 
stream.iterator();
  63               while (consumerIte.hasNext())
  64               {
  65                  // System.out.println("Message from the Topic ...");
  66                   String line = new 
String(consumerIte.next().message());
  67                   this.collector.emit(new Values(line), line);
  69               }
  70
  71
  72             }
  73             if (consumer != null)
  74               consumer.shutdown();
  75         }

to

  48         public void nextTuple()
  49         {
  50
  55
  56             Map<String, Integer> topicCount = new HashMap<String, 
Integer>();
  57             // Define single thread for topic
  58             topicCount.put(topic, new Integer(1));
  59             Map<String, List<KafkaStream<byte[], byte[]>>> 
consumerStreams = consumer.createMessageStreams(topicCount);
  60             List<KafkaStream<byte[], byte[]>> streams = 
consumerStreams.get(topic);
  61             for (final KafkaStream stream : streams) {
  62               ConsumerIterator<byte[], byte[]> consumerIte = 
stream.iterator();
  63               while (consumerIte.hasNext())
  64               {
  65                  // System.out.println("Message from the Topic ...");
  66                   String line = new 
String(consumerIte.next().message());
  67                   //this.collector.emit(new Values(line), line);
  68                   this.collector.emit(new Values(line));
  69               }
  70
  71
  72             }
  73             if (consumer != null)
  74               consumer.shutdown();
  75         }


And now it is running.

Strange because when worker died then I see log rows from spout. But I 
think it is related somehow with the internal stuff in storm.

Best regards, Margus (Margusja) Roo
+372 51 48 780
http://margus.roo.ee
http://ee.linkedin.com/in/margusroo
skype: margusja
ldapsearch -x -h ldap.sk.ee -b c=EE "(serialNumber=37303140314)"

On 03/06/14 11:09, Margusja wrote:
> Some new information.
> Set debug true and from active worker log I can see:
> if worker is ok:
> 2014-06-03 11:04:55 b.s.d.task [INFO] Emitting: hbasewriter __ack_ack 
> [7197822474056634252 -608920652033678418]
> 2014-06-03 11:04:55 b.s.d.executor [INFO] Processing received message 
> source: hbasewriter:3, stream: __ack_ack, id: {}, [7197822474056634252 
> -608920652033678418]
> 2014-06-03 11:04:55 b.s.d.task [INFO] Emitting direct: 1; __acker 
> __ack_ack [7197822474056634252]
> 2014-06-03 11:04:55 b.s.d.executor [INFO] Processing received message 
> source: KafkaConsumerSpout:1, stream: default, id: 
> {4344988213623161794=-5214435544383558411},  my message...
>
> and after worker dies there are only rows about spout like:
> 2014-06-03 11:06:30 b.s.d.task [INFO] Emitting: KafkaConsumerSpout 
> __ack_init [3399515592775976300 5357635772515085965 1]
> 2014-06-03 11:06:30 b.s.d.task [INFO] Emitting: KafkaConsumerSpout 
> default
>
> Best regards, Margus (Margusja) Roo
> +372 51 48 780
> http://margus.roo.ee
> http://ee.linkedin.com/in/margusroo
> skype: margusja
> ldapsearch -x -h ldap.sk.ee -b c=EE "(serialNumber=37303140314)"
>
> On 03/06/14 09:58, Margusja wrote:
>> Hei
>>
>> I have made a new test and discovered that in my environment a very 
>> simple bolt will die too after around 2500 cycle.
>>
>> Bolt's code:
>>
>>   1 package storm;
>>   2
>>   3 import backtype.storm.task.TopologyContext;
>>   4 import backtype.storm.topology.BasicOutputCollector;
>>   5 import backtype.storm.topology.OutputFieldsDeclarer;
>>   6 import backtype.storm.topology.base.BaseBasicBolt;
>>   7 import backtype.storm.tuple.Fields;
>>   8 import backtype.storm.tuple.Tuple;
>>   9 import backtype.storm.tuple.Values;
>>  10
>>  11 import java.util.Map;
>>  12 import java.util.UUID;
>>  13
>>  14 public class DummyBolt extends BaseBasicBolt
>>  15 {
>>  16     int count = 0;
>>  17
>>  18          @Override
>>  19          public void prepare(Map stormConf, TopologyContext 
>> context) {
>>  20          }
>>  21
>>  22         @Override
>>  23         public void execute(Tuple tuple, BasicOutputCollector 
>> collector)
>>  24         {
>>  25                 String line  = tuple.getString(0);
>>  26
>>  27                 count ++;
>>  28                 System.out.println("Dummy count: "+ count);
>>  29                 collector.emit(new Values(line));
>>  30
>>  31         }
>>  32
>>  33         @Override
>>  34         public void declareOutputFields(OutputFieldsDeclarer 
>> declarer)
>>  35         {
>>  36                 declarer.declare(new Fields("line"));
>>  37         }
>>  38
>>  39         @Override
>>  40         public void cleanup()
>>  41         {
>>  42         }
>>  43
>>  44 }
>>
>> after around 2500 cycles there is no output from execute methods.
>> What I do after this.
>> [root@dlvm2 sysconfig]# ls /var/lib/storm/workers/*/pids
>> [root@dlvm2 sysconfig]# kill -9 4179
>> after it  new worker is coming up and it works again around 2500 
>> cycles and stops and I have to kill pid again.
>>
>> Any ideas?
>>
>> Best regards, Margus (Margusja) Roo
>> +372 51 48 780
>> http://margus.roo.ee
>> http://ee.linkedin.com/in/margusroo
>> skype: margusja
>> ldapsearch -x -h ldap.sk.ee -b c=EE "(serialNumber=37303140314)"
>>
>> On 02/06/14 13:36, Margusja wrote:
>>> Hi
>>>
>>> I am using apache-storm-0.9.1-incubating.
>>> I have simple topology: Spout reads from kafka topic and Bolt writes 
>>> lines from spout to HBase.
>>>
>>> recently we did a test - we send 300 000 000 messages over 
>>> kafka-rest -> kafka-queue -> storm topology -> hbase.
>>>
>>> I noticed that around one hour and around 2500 messages worker died. 
>>> PID is there and process is up but bolt's execute method hangs.
>>>
>>> Bolts code is:
>>> package storm;
>>>   2
>>>   3 import java.util.Map;
>>>   4 import java.util.UUID;
>>>   5
>>>   6 import org.apache.hadoop.conf.Configuration;
>>>   7 import org.apache.hadoop.hbase.HBaseConfiguration;
>>>   8 import org.apache.hadoop.hbase.client.HTableInterface;
>>>   9 import org.apache.hadoop.hbase.client.HTablePool;
>>>  10 import org.apache.hadoop.hbase.client.Put;
>>>  11 import org.apache.hadoop.hbase.util.Bytes;
>>>  12
>>>  13 import backtype.storm.task.TopologyContext;
>>>  14 import backtype.storm.topology.BasicOutputCollector;
>>>  15 import backtype.storm.topology.OutputFieldsDeclarer;
>>>  16 import backtype.storm.topology.base.BaseBasicBolt;
>>>  17 import backtype.storm.tuple.Fields;
>>>  18 import backtype.storm.tuple.Tuple;
>>>  19 import backtype.storm.tuple.Values;
>>>
>>> public class HBaseWriterBolt extends BaseBasicBolt
>>>  22 {
>>>  23
>>>  24     HTableInterface usersTable;
>>>  25     HTablePool pool;
>>>  26     int count = 0;
>>>  27
>>>  28          @Override
>>>  29          public void prepare(Map stormConf, TopologyContext 
>>> context) {
>>>  30                  Configuration conf = HBaseConfiguration.create();
>>>  31 conf.set("hbase.defaults.for.version","0.96.0.2.0.6.0-76-hadoop2");
>>>  32 conf.set("hbase.defaults.for.version.skip","true");
>>>  33                  conf.set("hbase.zookeeper.quorum", 
>>> "vm24,vm37,vm38");
>>>  34 conf.set("hbase.zookeeper.property.clientPort", "2181");
>>>  35                  conf.set("hbase.rootdir", 
>>> "hdfs://vm38:8020/user/hbase/data");
>>>  36                  //conf.set("zookeeper.znode.parent", 
>>> "/hbase-unsecure");
>>>  37
>>>  38                  pool = new HTablePool(conf, 1);
>>>  39                  usersTable = pool.getTable("kafkademo1");
>>>  40          }
>>> 41
>>>  42         @Override
>>>  43         public void execute(Tuple tuple, BasicOutputCollector 
>>> collector)
>>>  44         {
>>>  45                 String line  = tuple.getString(0);
>>>  46
>>>  47                 Put p = new 
>>> Put(Bytes.toBytes(UUID.randomUUID().toString()));
>>>  48                 p.add(Bytes.toBytes("info"), 
>>> Bytes.toBytes("line"), Bytes.toBytes(line));
>>>  49
>>>  50                 try {
>>>  51                         usersTable.put(p);
>>>  52                         count ++;
>>>  53                         System.out.println("Count: "+ count);
>>>  54                 }
>>>  55                 catch (Exception e){
>>>  56                         e.printStackTrace();
>>>  57                 }
>>>  58                 collector.emit(new Values(line));
>>>  59
>>>  60         }
>>>  61
>>>  62         @Override
>>>  63         public void declareOutputFields(OutputFieldsDeclarer 
>>> declarer)
>>>  64         {
>>>  65                 declarer.declare(new Fields("line"));
>>>  66         }
>>>  67
>>>  68         @Override
>>>  69         public void cleanup()
>>>  70         {
>>>  71                 try {
>>>  72                         usersTable.close();
>>>  73                 }
>>>  74                 catch (Exception e){
>>>  75                         e.printStackTrace();
>>>  76                 }
>>>  77         }
>>>  78
>>>  79 }
>>>
>>> line in execute method: System.out.println("Count: "+ count); added 
>>> in debug purpose to see in log that bolt is running.
>>>
>>> In to Spout in method  nextTuple()
>>> I added debug line: System.out.println("Message from the Topic ...");
>>>
>>> After some time around 50minutes in log file I can see that Spout is 
>>> working but Bolt is died.
>>>
>>> Any ideas?
>>>
>>
>


Re: Worker dies (bolt)

Posted by Margusja <ma...@roo.ee>.
Some new information.
Set debug true and from active worker log I can see:
if worker is ok:
2014-06-03 11:04:55 b.s.d.task [INFO] Emitting: hbasewriter __ack_ack 
[7197822474056634252 -608920652033678418]
2014-06-03 11:04:55 b.s.d.executor [INFO] Processing received message 
source: hbasewriter:3, stream: __ack_ack, id: {}, [7197822474056634252 
-608920652033678418]
2014-06-03 11:04:55 b.s.d.task [INFO] Emitting direct: 1; __acker 
__ack_ack [7197822474056634252]
2014-06-03 11:04:55 b.s.d.executor [INFO] Processing received message 
source: KafkaConsumerSpout:1, stream: default, id: 
{4344988213623161794=-5214435544383558411},  my message...

and after worker dies there are only rows about spout like:
2014-06-03 11:06:30 b.s.d.task [INFO] Emitting: KafkaConsumerSpout 
__ack_init [3399515592775976300 5357635772515085965 1]
2014-06-03 11:06:30 b.s.d.task [INFO] Emitting: KafkaConsumerSpout default

Best regards, Margus (Margusja) Roo
+372 51 48 780
http://margus.roo.ee
http://ee.linkedin.com/in/margusroo
skype: margusja
ldapsearch -x -h ldap.sk.ee -b c=EE "(serialNumber=37303140314)"

On 03/06/14 09:58, Margusja wrote:
> Hei
>
> I have made a new test and discovered that in my environment a very 
> simple bolt will die too after around 2500 cycle.
>
> Bolt's code:
>
>   1 package storm;
>   2
>   3 import backtype.storm.task.TopologyContext;
>   4 import backtype.storm.topology.BasicOutputCollector;
>   5 import backtype.storm.topology.OutputFieldsDeclarer;
>   6 import backtype.storm.topology.base.BaseBasicBolt;
>   7 import backtype.storm.tuple.Fields;
>   8 import backtype.storm.tuple.Tuple;
>   9 import backtype.storm.tuple.Values;
>  10
>  11 import java.util.Map;
>  12 import java.util.UUID;
>  13
>  14 public class DummyBolt extends BaseBasicBolt
>  15 {
>  16     int count = 0;
>  17
>  18          @Override
>  19          public void prepare(Map stormConf, TopologyContext 
> context) {
>  20          }
>  21
>  22         @Override
>  23         public void execute(Tuple tuple, BasicOutputCollector 
> collector)
>  24         {
>  25                 String line  = tuple.getString(0);
>  26
>  27                 count ++;
>  28                 System.out.println("Dummy count: "+ count);
>  29                 collector.emit(new Values(line));
>  30
>  31         }
>  32
>  33         @Override
>  34         public void declareOutputFields(OutputFieldsDeclarer 
> declarer)
>  35         {
>  36                 declarer.declare(new Fields("line"));
>  37         }
>  38
>  39         @Override
>  40         public void cleanup()
>  41         {
>  42         }
>  43
>  44 }
>
> after around 2500 cycles there is no output from execute methods.
> What I do after this.
> [root@dlvm2 sysconfig]# ls /var/lib/storm/workers/*/pids
> [root@dlvm2 sysconfig]# kill -9 4179
> after it  new worker is coming up and it works again around 2500 
> cycles and stops and I have to kill pid again.
>
> Any ideas?
>
> Best regards, Margus (Margusja) Roo
> +372 51 48 780
> http://margus.roo.ee
> http://ee.linkedin.com/in/margusroo
> skype: margusja
> ldapsearch -x -h ldap.sk.ee -b c=EE "(serialNumber=37303140314)"
>
> On 02/06/14 13:36, Margusja wrote:
>> Hi
>>
>> I am using apache-storm-0.9.1-incubating.
>> I have simple topology: Spout reads from kafka topic and Bolt writes 
>> lines from spout to HBase.
>>
>> recently we did a test - we send 300 000 000 messages over kafka-rest 
>> -> kafka-queue -> storm topology -> hbase.
>>
>> I noticed that around one hour and around 2500 messages worker died. 
>> PID is there and process is up but bolt's execute method hangs.
>>
>> Bolts code is:
>> package storm;
>>   2
>>   3 import java.util.Map;
>>   4 import java.util.UUID;
>>   5
>>   6 import org.apache.hadoop.conf.Configuration;
>>   7 import org.apache.hadoop.hbase.HBaseConfiguration;
>>   8 import org.apache.hadoop.hbase.client.HTableInterface;
>>   9 import org.apache.hadoop.hbase.client.HTablePool;
>>  10 import org.apache.hadoop.hbase.client.Put;
>>  11 import org.apache.hadoop.hbase.util.Bytes;
>>  12
>>  13 import backtype.storm.task.TopologyContext;
>>  14 import backtype.storm.topology.BasicOutputCollector;
>>  15 import backtype.storm.topology.OutputFieldsDeclarer;
>>  16 import backtype.storm.topology.base.BaseBasicBolt;
>>  17 import backtype.storm.tuple.Fields;
>>  18 import backtype.storm.tuple.Tuple;
>>  19 import backtype.storm.tuple.Values;
>>
>> public class HBaseWriterBolt extends BaseBasicBolt
>>  22 {
>>  23
>>  24     HTableInterface usersTable;
>>  25     HTablePool pool;
>>  26     int count = 0;
>>  27
>>  28          @Override
>>  29          public void prepare(Map stormConf, TopologyContext 
>> context) {
>>  30                  Configuration conf = HBaseConfiguration.create();
>>  31 conf.set("hbase.defaults.for.version","0.96.0.2.0.6.0-76-hadoop2");
>>  32 conf.set("hbase.defaults.for.version.skip","true");
>>  33                  conf.set("hbase.zookeeper.quorum", 
>> "vm24,vm37,vm38");
>>  34 conf.set("hbase.zookeeper.property.clientPort", "2181");
>>  35                  conf.set("hbase.rootdir", 
>> "hdfs://vm38:8020/user/hbase/data");
>>  36                  //conf.set("zookeeper.znode.parent", 
>> "/hbase-unsecure");
>>  37
>>  38                  pool = new HTablePool(conf, 1);
>>  39                  usersTable = pool.getTable("kafkademo1");
>>  40          }
>> 41
>>  42         @Override
>>  43         public void execute(Tuple tuple, BasicOutputCollector 
>> collector)
>>  44         {
>>  45                 String line  = tuple.getString(0);
>>  46
>>  47                 Put p = new 
>> Put(Bytes.toBytes(UUID.randomUUID().toString()));
>>  48                 p.add(Bytes.toBytes("info"), 
>> Bytes.toBytes("line"), Bytes.toBytes(line));
>>  49
>>  50                 try {
>>  51                         usersTable.put(p);
>>  52                         count ++;
>>  53                         System.out.println("Count: "+ count);
>>  54                 }
>>  55                 catch (Exception e){
>>  56                         e.printStackTrace();
>>  57                 }
>>  58                 collector.emit(new Values(line));
>>  59
>>  60         }
>>  61
>>  62         @Override
>>  63         public void declareOutputFields(OutputFieldsDeclarer 
>> declarer)
>>  64         {
>>  65                 declarer.declare(new Fields("line"));
>>  66         }
>>  67
>>  68         @Override
>>  69         public void cleanup()
>>  70         {
>>  71                 try {
>>  72                         usersTable.close();
>>  73                 }
>>  74                 catch (Exception e){
>>  75                         e.printStackTrace();
>>  76                 }
>>  77         }
>>  78
>>  79 }
>>
>> line in execute method: System.out.println("Count: "+ count); added 
>> in debug purpose to see in log that bolt is running.
>>
>> In to Spout in method  nextTuple()
>> I added debug line: System.out.println("Message from the Topic ...");
>>
>> After some time around 50minutes in log file I can see that Spout is 
>> working but Bolt is died.
>>
>> Any ideas?
>>
>


Re: Worker dies (bolt)

Posted by Margusja <ma...@roo.ee>.
Hei

I have made a new test and discovered that in my environment a very 
simple bolt will die too after around 2500 cycle.

Bolt's code:

   1 package storm;
   2
   3 import backtype.storm.task.TopologyContext;
   4 import backtype.storm.topology.BasicOutputCollector;
   5 import backtype.storm.topology.OutputFieldsDeclarer;
   6 import backtype.storm.topology.base.BaseBasicBolt;
   7 import backtype.storm.tuple.Fields;
   8 import backtype.storm.tuple.Tuple;
   9 import backtype.storm.tuple.Values;
  10
  11 import java.util.Map;
  12 import java.util.UUID;
  13
  14 public class DummyBolt extends BaseBasicBolt
  15 {
  16     int count = 0;
  17
  18          @Override
  19          public void prepare(Map stormConf, TopologyContext context) {
  20          }
  21
  22         @Override
  23         public void execute(Tuple tuple, BasicOutputCollector 
collector)
  24         {
  25                 String line  = tuple.getString(0);
  26
  27                 count ++;
  28                 System.out.println("Dummy count: "+ count);
  29                 collector.emit(new Values(line));
  30
  31         }
  32
  33         @Override
  34         public void declareOutputFields(OutputFieldsDeclarer declarer)
  35         {
  36                 declarer.declare(new Fields("line"));
  37         }
  38
  39         @Override
  40         public void cleanup()
  41         {
  42         }
  43
  44 }

after around 2500 cycles there is no output from execute methods.
What I do after this.
[root@dlvm2 sysconfig]# ls /var/lib/storm/workers/*/pids
[root@dlvm2 sysconfig]# kill -9 4179
after it  new worker is coming up and it works again around 2500 cycles 
and stops and I have to kill pid again.

Any ideas?

Best regards, Margus (Margusja) Roo
+372 51 48 780
http://margus.roo.ee
http://ee.linkedin.com/in/margusroo
skype: margusja
ldapsearch -x -h ldap.sk.ee -b c=EE "(serialNumber=37303140314)"

On 02/06/14 13:36, Margusja wrote:
> Hi
>
> I am using apache-storm-0.9.1-incubating.
> I have simple topology: Spout reads from kafka topic and Bolt writes 
> lines from spout to HBase.
>
> recently we did a test - we send 300 000 000 messages over kafka-rest 
> -> kafka-queue -> storm topology -> hbase.
>
> I noticed that around one hour and around 2500 messages worker died. 
> PID is there and process is up but bolt's execute method hangs.
>
> Bolts code is:
> package storm;
>   2
>   3 import java.util.Map;
>   4 import java.util.UUID;
>   5
>   6 import org.apache.hadoop.conf.Configuration;
>   7 import org.apache.hadoop.hbase.HBaseConfiguration;
>   8 import org.apache.hadoop.hbase.client.HTableInterface;
>   9 import org.apache.hadoop.hbase.client.HTablePool;
>  10 import org.apache.hadoop.hbase.client.Put;
>  11 import org.apache.hadoop.hbase.util.Bytes;
>  12
>  13 import backtype.storm.task.TopologyContext;
>  14 import backtype.storm.topology.BasicOutputCollector;
>  15 import backtype.storm.topology.OutputFieldsDeclarer;
>  16 import backtype.storm.topology.base.BaseBasicBolt;
>  17 import backtype.storm.tuple.Fields;
>  18 import backtype.storm.tuple.Tuple;
>  19 import backtype.storm.tuple.Values;
>
> public class HBaseWriterBolt extends BaseBasicBolt
>  22 {
>  23
>  24     HTableInterface usersTable;
>  25     HTablePool pool;
>  26     int count = 0;
>  27
>  28          @Override
>  29          public void prepare(Map stormConf, TopologyContext 
> context) {
>  30                  Configuration conf = HBaseConfiguration.create();
>  31 conf.set("hbase.defaults.for.version","0.96.0.2.0.6.0-76-hadoop2");
>  32 conf.set("hbase.defaults.for.version.skip","true");
>  33                  conf.set("hbase.zookeeper.quorum", 
> "vm24,vm37,vm38");
>  34 conf.set("hbase.zookeeper.property.clientPort", "2181");
>  35                  conf.set("hbase.rootdir", 
> "hdfs://vm38:8020/user/hbase/data");
>  36                  //conf.set("zookeeper.znode.parent", 
> "/hbase-unsecure");
>  37
>  38                  pool = new HTablePool(conf, 1);
>  39                  usersTable = pool.getTable("kafkademo1");
>  40          }
> 41
>  42         @Override
>  43         public void execute(Tuple tuple, BasicOutputCollector 
> collector)
>  44         {
>  45                 String line  = tuple.getString(0);
>  46
>  47                 Put p = new 
> Put(Bytes.toBytes(UUID.randomUUID().toString()));
>  48                 p.add(Bytes.toBytes("info"), 
> Bytes.toBytes("line"), Bytes.toBytes(line));
>  49
>  50                 try {
>  51                         usersTable.put(p);
>  52                         count ++;
>  53                         System.out.println("Count: "+ count);
>  54                 }
>  55                 catch (Exception e){
>  56                         e.printStackTrace();
>  57                 }
>  58                 collector.emit(new Values(line));
>  59
>  60         }
>  61
>  62         @Override
>  63         public void declareOutputFields(OutputFieldsDeclarer 
> declarer)
>  64         {
>  65                 declarer.declare(new Fields("line"));
>  66         }
>  67
>  68         @Override
>  69         public void cleanup()
>  70         {
>  71                 try {
>  72                         usersTable.close();
>  73                 }
>  74                 catch (Exception e){
>  75                         e.printStackTrace();
>  76                 }
>  77         }
>  78
>  79 }
>
> line in execute method: System.out.println("Count: "+ count); added in 
> debug purpose to see in log that bolt is running.
>
> In to Spout in method  nextTuple()
> I added debug line: System.out.println("Message from the Topic ...");
>
> After some time around 50minutes in log file I can see that Spout is 
> working but Bolt is died.
>
> Any ideas?
>