You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Margusja <ma...@roo.ee> on 2014/06/02 12:36:52 UTC
Worker dies (bolt)
Hi
I am using apache-storm-0.9.1-incubating.
I have simple topology: Spout reads from kafka topic and Bolt writes
lines from spout to HBase.
recently we did a test - we send 300 000 000 messages over kafka-rest ->
kafka-queue -> storm topology -> hbase.
I noticed that around one hour and around 2500 messages worker died. PID
is there and process is up but bolt's execute method hangs.
Bolts code is:
package storm;
2
3 import java.util.Map;
4 import java.util.UUID;
5
6 import org.apache.hadoop.conf.Configuration;
7 import org.apache.hadoop.hbase.HBaseConfiguration;
8 import org.apache.hadoop.hbase.client.HTableInterface;
9 import org.apache.hadoop.hbase.client.HTablePool;
10 import org.apache.hadoop.hbase.client.Put;
11 import org.apache.hadoop.hbase.util.Bytes;
12
13 import backtype.storm.task.TopologyContext;
14 import backtype.storm.topology.BasicOutputCollector;
15 import backtype.storm.topology.OutputFieldsDeclarer;
16 import backtype.storm.topology.base.BaseBasicBolt;
17 import backtype.storm.tuple.Fields;
18 import backtype.storm.tuple.Tuple;
19 import backtype.storm.tuple.Values;
public class HBaseWriterBolt extends BaseBasicBolt
22 {
23
24 HTableInterface usersTable;
25 HTablePool pool;
26 int count = 0;
27
28 @Override
29 public void prepare(Map stormConf, TopologyContext context) {
30 Configuration conf = HBaseConfiguration.create();
31 conf.set("hbase.defaults.for.version","0.96.0.2.0.6.0-76-hadoop2");
32 conf.set("hbase.defaults.for.version.skip","true");
33 conf.set("hbase.zookeeper.quorum", "vm24,vm37,vm38");
34 conf.set("hbase.zookeeper.property.clientPort",
"2181");
35 conf.set("hbase.rootdir",
"hdfs://vm38:8020/user/hbase/data");
36 //conf.set("zookeeper.znode.parent",
"/hbase-unsecure");
37
38 pool = new HTablePool(conf, 1);
39 usersTable = pool.getTable("kafkademo1");
40 }
41
42 @Override
43 public void execute(Tuple tuple, BasicOutputCollector
collector)
44 {
45 String line = tuple.getString(0);
46
47 Put p = new
Put(Bytes.toBytes(UUID.randomUUID().toString()));
48 p.add(Bytes.toBytes("info"), Bytes.toBytes("line"),
Bytes.toBytes(line));
49
50 try {
51 usersTable.put(p);
52 count ++;
53 System.out.println("Count: "+ count);
54 }
55 catch (Exception e){
56 e.printStackTrace();
57 }
58 collector.emit(new Values(line));
59
60 }
61
62 @Override
63 public void declareOutputFields(OutputFieldsDeclarer declarer)
64 {
65 declarer.declare(new Fields("line"));
66 }
67
68 @Override
69 public void cleanup()
70 {
71 try {
72 usersTable.close();
73 }
74 catch (Exception e){
75 e.printStackTrace();
76 }
77 }
78
79 }
line in execute method: System.out.println("Count: "+ count); added in
debug purpose to see in log that bolt is running.
In to Spout in method nextTuple()
I added debug line: System.out.println("Message from the Topic ...");
After some time around 50minutes in log file I can see that Spout is
working but Bolt is died.
Any ideas?
--
Best regards, Margus (Margusja) Roo
+372 51 48 780
http://margus.roo.ee
http://ee.linkedin.com/in/margusroo
skype: margusja
ldapsearch -x -h ldap.sk.ee -b c=EE "(serialNumber=37303140314)"
Re: Worker dies (bolt)
Posted by Margusja <ma...@roo.ee>.
Ok got more info.
Looks like the problem is related with spout.
I changed spout:
32 public void open(Map conf, TopologyContext
context,SpoutOutputCollector collector)
33 {
34 this.collector = collector;
35
36 Properties props = new Properties();
37 props.put("zookeeper.connect",
"vm24:2181,vm37:2181,vm38:2181");
38 props.put("group.id", "testgroup");
39 props.put("zookeeper.session.timeout.ms", "500");
40 props.put("zookeeper.sync.time.ms", "250");
41 props.put("auto.commit.interval.ms", "1000");
42 consumer = Consumer.createJavaConsumerConnector(new
ConsumerConfig(props));
43 this.topic = "kafkademo1";
44
45
46 }
to
32 public void open(Map conf, TopologyContext
context,SpoutOutputCollector collector)
33 {
34 this.collector = collector;
35
36 Properties props = new Properties();
37 props.put("zookeeper.connect",
"vm24:2181,vm37:2181,vm38:2181");
38 props.put("group.id", "testgroup");
39 //props.put("zookeeper.session.timeout.ms", "500");
40 //props.put("zookeeper.sync.time.ms", "250");
41 //props.put("auto.commit.interval.ms", "1000");
42 consumer = Consumer.createJavaConsumerConnector(new
ConsumerConfig(props));
43 this.topic = "kafkademo1";
44
45
46 }
and
48 public void nextTuple()
49 {
50
55
56 Map<String, Integer> topicCount = new HashMap<String,
Integer>();
57 // Define single thread for topic
58 topicCount.put(topic, new Integer(1));
59 Map<String, List<KafkaStream<byte[], byte[]>>>
consumerStreams = consumer.createMessageStreams(topicCount);
60 List<KafkaStream<byte[], byte[]>> streams =
consumerStreams.get(topic);
61 for (final KafkaStream stream : streams) {
62 ConsumerIterator<byte[], byte[]> consumerIte =
stream.iterator();
63 while (consumerIte.hasNext())
64 {
65 // System.out.println("Message from the Topic ...");
66 String line = new
String(consumerIte.next().message());
67 this.collector.emit(new Values(line), line);
69 }
70
71
72 }
73 if (consumer != null)
74 consumer.shutdown();
75 }
to
48 public void nextTuple()
49 {
50
55
56 Map<String, Integer> topicCount = new HashMap<String,
Integer>();
57 // Define single thread for topic
58 topicCount.put(topic, new Integer(1));
59 Map<String, List<KafkaStream<byte[], byte[]>>>
consumerStreams = consumer.createMessageStreams(topicCount);
60 List<KafkaStream<byte[], byte[]>> streams =
consumerStreams.get(topic);
61 for (final KafkaStream stream : streams) {
62 ConsumerIterator<byte[], byte[]> consumerIte =
stream.iterator();
63 while (consumerIte.hasNext())
64 {
65 // System.out.println("Message from the Topic ...");
66 String line = new
String(consumerIte.next().message());
67 //this.collector.emit(new Values(line), line);
68 this.collector.emit(new Values(line));
69 }
70
71
72 }
73 if (consumer != null)
74 consumer.shutdown();
75 }
And now it is running.
Strange because when worker died then I see log rows from spout. But I
think it is related somehow with the internal stuff in storm.
Best regards, Margus (Margusja) Roo
+372 51 48 780
http://margus.roo.ee
http://ee.linkedin.com/in/margusroo
skype: margusja
ldapsearch -x -h ldap.sk.ee -b c=EE "(serialNumber=37303140314)"
On 03/06/14 11:09, Margusja wrote:
> Some new information.
> Set debug true and from active worker log I can see:
> if worker is ok:
> 2014-06-03 11:04:55 b.s.d.task [INFO] Emitting: hbasewriter __ack_ack
> [7197822474056634252 -608920652033678418]
> 2014-06-03 11:04:55 b.s.d.executor [INFO] Processing received message
> source: hbasewriter:3, stream: __ack_ack, id: {}, [7197822474056634252
> -608920652033678418]
> 2014-06-03 11:04:55 b.s.d.task [INFO] Emitting direct: 1; __acker
> __ack_ack [7197822474056634252]
> 2014-06-03 11:04:55 b.s.d.executor [INFO] Processing received message
> source: KafkaConsumerSpout:1, stream: default, id:
> {4344988213623161794=-5214435544383558411}, my message...
>
> and after worker dies there are only rows about spout like:
> 2014-06-03 11:06:30 b.s.d.task [INFO] Emitting: KafkaConsumerSpout
> __ack_init [3399515592775976300 5357635772515085965 1]
> 2014-06-03 11:06:30 b.s.d.task [INFO] Emitting: KafkaConsumerSpout
> default
>
> Best regards, Margus (Margusja) Roo
> +372 51 48 780
> http://margus.roo.ee
> http://ee.linkedin.com/in/margusroo
> skype: margusja
> ldapsearch -x -h ldap.sk.ee -b c=EE "(serialNumber=37303140314)"
>
> On 03/06/14 09:58, Margusja wrote:
>> Hei
>>
>> I have made a new test and discovered that in my environment a very
>> simple bolt will die too after around 2500 cycle.
>>
>> Bolt's code:
>>
>> 1 package storm;
>> 2
>> 3 import backtype.storm.task.TopologyContext;
>> 4 import backtype.storm.topology.BasicOutputCollector;
>> 5 import backtype.storm.topology.OutputFieldsDeclarer;
>> 6 import backtype.storm.topology.base.BaseBasicBolt;
>> 7 import backtype.storm.tuple.Fields;
>> 8 import backtype.storm.tuple.Tuple;
>> 9 import backtype.storm.tuple.Values;
>> 10
>> 11 import java.util.Map;
>> 12 import java.util.UUID;
>> 13
>> 14 public class DummyBolt extends BaseBasicBolt
>> 15 {
>> 16 int count = 0;
>> 17
>> 18 @Override
>> 19 public void prepare(Map stormConf, TopologyContext
>> context) {
>> 20 }
>> 21
>> 22 @Override
>> 23 public void execute(Tuple tuple, BasicOutputCollector
>> collector)
>> 24 {
>> 25 String line = tuple.getString(0);
>> 26
>> 27 count ++;
>> 28 System.out.println("Dummy count: "+ count);
>> 29 collector.emit(new Values(line));
>> 30
>> 31 }
>> 32
>> 33 @Override
>> 34 public void declareOutputFields(OutputFieldsDeclarer
>> declarer)
>> 35 {
>> 36 declarer.declare(new Fields("line"));
>> 37 }
>> 38
>> 39 @Override
>> 40 public void cleanup()
>> 41 {
>> 42 }
>> 43
>> 44 }
>>
>> after around 2500 cycles there is no output from execute methods.
>> What I do after this.
>> [root@dlvm2 sysconfig]# ls /var/lib/storm/workers/*/pids
>> [root@dlvm2 sysconfig]# kill -9 4179
>> after it new worker is coming up and it works again around 2500
>> cycles and stops and I have to kill pid again.
>>
>> Any ideas?
>>
>> Best regards, Margus (Margusja) Roo
>> +372 51 48 780
>> http://margus.roo.ee
>> http://ee.linkedin.com/in/margusroo
>> skype: margusja
>> ldapsearch -x -h ldap.sk.ee -b c=EE "(serialNumber=37303140314)"
>>
>> On 02/06/14 13:36, Margusja wrote:
>>> Hi
>>>
>>> I am using apache-storm-0.9.1-incubating.
>>> I have simple topology: Spout reads from kafka topic and Bolt writes
>>> lines from spout to HBase.
>>>
>>> recently we did a test - we send 300 000 000 messages over
>>> kafka-rest -> kafka-queue -> storm topology -> hbase.
>>>
>>> I noticed that around one hour and around 2500 messages worker died.
>>> PID is there and process is up but bolt's execute method hangs.
>>>
>>> Bolts code is:
>>> package storm;
>>> 2
>>> 3 import java.util.Map;
>>> 4 import java.util.UUID;
>>> 5
>>> 6 import org.apache.hadoop.conf.Configuration;
>>> 7 import org.apache.hadoop.hbase.HBaseConfiguration;
>>> 8 import org.apache.hadoop.hbase.client.HTableInterface;
>>> 9 import org.apache.hadoop.hbase.client.HTablePool;
>>> 10 import org.apache.hadoop.hbase.client.Put;
>>> 11 import org.apache.hadoop.hbase.util.Bytes;
>>> 12
>>> 13 import backtype.storm.task.TopologyContext;
>>> 14 import backtype.storm.topology.BasicOutputCollector;
>>> 15 import backtype.storm.topology.OutputFieldsDeclarer;
>>> 16 import backtype.storm.topology.base.BaseBasicBolt;
>>> 17 import backtype.storm.tuple.Fields;
>>> 18 import backtype.storm.tuple.Tuple;
>>> 19 import backtype.storm.tuple.Values;
>>>
>>> public class HBaseWriterBolt extends BaseBasicBolt
>>> 22 {
>>> 23
>>> 24 HTableInterface usersTable;
>>> 25 HTablePool pool;
>>> 26 int count = 0;
>>> 27
>>> 28 @Override
>>> 29 public void prepare(Map stormConf, TopologyContext
>>> context) {
>>> 30 Configuration conf = HBaseConfiguration.create();
>>> 31 conf.set("hbase.defaults.for.version","0.96.0.2.0.6.0-76-hadoop2");
>>> 32 conf.set("hbase.defaults.for.version.skip","true");
>>> 33 conf.set("hbase.zookeeper.quorum",
>>> "vm24,vm37,vm38");
>>> 34 conf.set("hbase.zookeeper.property.clientPort", "2181");
>>> 35 conf.set("hbase.rootdir",
>>> "hdfs://vm38:8020/user/hbase/data");
>>> 36 //conf.set("zookeeper.znode.parent",
>>> "/hbase-unsecure");
>>> 37
>>> 38 pool = new HTablePool(conf, 1);
>>> 39 usersTable = pool.getTable("kafkademo1");
>>> 40 }
>>> 41
>>> 42 @Override
>>> 43 public void execute(Tuple tuple, BasicOutputCollector
>>> collector)
>>> 44 {
>>> 45 String line = tuple.getString(0);
>>> 46
>>> 47 Put p = new
>>> Put(Bytes.toBytes(UUID.randomUUID().toString()));
>>> 48 p.add(Bytes.toBytes("info"),
>>> Bytes.toBytes("line"), Bytes.toBytes(line));
>>> 49
>>> 50 try {
>>> 51 usersTable.put(p);
>>> 52 count ++;
>>> 53 System.out.println("Count: "+ count);
>>> 54 }
>>> 55 catch (Exception e){
>>> 56 e.printStackTrace();
>>> 57 }
>>> 58 collector.emit(new Values(line));
>>> 59
>>> 60 }
>>> 61
>>> 62 @Override
>>> 63 public void declareOutputFields(OutputFieldsDeclarer
>>> declarer)
>>> 64 {
>>> 65 declarer.declare(new Fields("line"));
>>> 66 }
>>> 67
>>> 68 @Override
>>> 69 public void cleanup()
>>> 70 {
>>> 71 try {
>>> 72 usersTable.close();
>>> 73 }
>>> 74 catch (Exception e){
>>> 75 e.printStackTrace();
>>> 76 }
>>> 77 }
>>> 78
>>> 79 }
>>>
>>> line in execute method: System.out.println("Count: "+ count); added
>>> in debug purpose to see in log that bolt is running.
>>>
>>> In to Spout in method nextTuple()
>>> I added debug line: System.out.println("Message from the Topic ...");
>>>
>>> After some time around 50minutes in log file I can see that Spout is
>>> working but Bolt is died.
>>>
>>> Any ideas?
>>>
>>
>
Re: Worker dies (bolt)
Posted by Margusja <ma...@roo.ee>.
Some new information.
Set debug true and from active worker log I can see:
if worker is ok:
2014-06-03 11:04:55 b.s.d.task [INFO] Emitting: hbasewriter __ack_ack
[7197822474056634252 -608920652033678418]
2014-06-03 11:04:55 b.s.d.executor [INFO] Processing received message
source: hbasewriter:3, stream: __ack_ack, id: {}, [7197822474056634252
-608920652033678418]
2014-06-03 11:04:55 b.s.d.task [INFO] Emitting direct: 1; __acker
__ack_ack [7197822474056634252]
2014-06-03 11:04:55 b.s.d.executor [INFO] Processing received message
source: KafkaConsumerSpout:1, stream: default, id:
{4344988213623161794=-5214435544383558411}, my message...
and after worker dies there are only rows about spout like:
2014-06-03 11:06:30 b.s.d.task [INFO] Emitting: KafkaConsumerSpout
__ack_init [3399515592775976300 5357635772515085965 1]
2014-06-03 11:06:30 b.s.d.task [INFO] Emitting: KafkaConsumerSpout default
Best regards, Margus (Margusja) Roo
+372 51 48 780
http://margus.roo.ee
http://ee.linkedin.com/in/margusroo
skype: margusja
ldapsearch -x -h ldap.sk.ee -b c=EE "(serialNumber=37303140314)"
On 03/06/14 09:58, Margusja wrote:
> Hei
>
> I have made a new test and discovered that in my environment a very
> simple bolt will die too after around 2500 cycle.
>
> Bolt's code:
>
> 1 package storm;
> 2
> 3 import backtype.storm.task.TopologyContext;
> 4 import backtype.storm.topology.BasicOutputCollector;
> 5 import backtype.storm.topology.OutputFieldsDeclarer;
> 6 import backtype.storm.topology.base.BaseBasicBolt;
> 7 import backtype.storm.tuple.Fields;
> 8 import backtype.storm.tuple.Tuple;
> 9 import backtype.storm.tuple.Values;
> 10
> 11 import java.util.Map;
> 12 import java.util.UUID;
> 13
> 14 public class DummyBolt extends BaseBasicBolt
> 15 {
> 16 int count = 0;
> 17
> 18 @Override
> 19 public void prepare(Map stormConf, TopologyContext
> context) {
> 20 }
> 21
> 22 @Override
> 23 public void execute(Tuple tuple, BasicOutputCollector
> collector)
> 24 {
> 25 String line = tuple.getString(0);
> 26
> 27 count ++;
> 28 System.out.println("Dummy count: "+ count);
> 29 collector.emit(new Values(line));
> 30
> 31 }
> 32
> 33 @Override
> 34 public void declareOutputFields(OutputFieldsDeclarer
> declarer)
> 35 {
> 36 declarer.declare(new Fields("line"));
> 37 }
> 38
> 39 @Override
> 40 public void cleanup()
> 41 {
> 42 }
> 43
> 44 }
>
> after around 2500 cycles there is no output from execute methods.
> What I do after this.
> [root@dlvm2 sysconfig]# ls /var/lib/storm/workers/*/pids
> [root@dlvm2 sysconfig]# kill -9 4179
> after it new worker is coming up and it works again around 2500
> cycles and stops and I have to kill pid again.
>
> Any ideas?
>
> Best regards, Margus (Margusja) Roo
> +372 51 48 780
> http://margus.roo.ee
> http://ee.linkedin.com/in/margusroo
> skype: margusja
> ldapsearch -x -h ldap.sk.ee -b c=EE "(serialNumber=37303140314)"
>
> On 02/06/14 13:36, Margusja wrote:
>> Hi
>>
>> I am using apache-storm-0.9.1-incubating.
>> I have simple topology: Spout reads from kafka topic and Bolt writes
>> lines from spout to HBase.
>>
>> recently we did a test - we send 300 000 000 messages over kafka-rest
>> -> kafka-queue -> storm topology -> hbase.
>>
>> I noticed that around one hour and around 2500 messages worker died.
>> PID is there and process is up but bolt's execute method hangs.
>>
>> Bolts code is:
>> package storm;
>> 2
>> 3 import java.util.Map;
>> 4 import java.util.UUID;
>> 5
>> 6 import org.apache.hadoop.conf.Configuration;
>> 7 import org.apache.hadoop.hbase.HBaseConfiguration;
>> 8 import org.apache.hadoop.hbase.client.HTableInterface;
>> 9 import org.apache.hadoop.hbase.client.HTablePool;
>> 10 import org.apache.hadoop.hbase.client.Put;
>> 11 import org.apache.hadoop.hbase.util.Bytes;
>> 12
>> 13 import backtype.storm.task.TopologyContext;
>> 14 import backtype.storm.topology.BasicOutputCollector;
>> 15 import backtype.storm.topology.OutputFieldsDeclarer;
>> 16 import backtype.storm.topology.base.BaseBasicBolt;
>> 17 import backtype.storm.tuple.Fields;
>> 18 import backtype.storm.tuple.Tuple;
>> 19 import backtype.storm.tuple.Values;
>>
>> public class HBaseWriterBolt extends BaseBasicBolt
>> 22 {
>> 23
>> 24 HTableInterface usersTable;
>> 25 HTablePool pool;
>> 26 int count = 0;
>> 27
>> 28 @Override
>> 29 public void prepare(Map stormConf, TopologyContext
>> context) {
>> 30 Configuration conf = HBaseConfiguration.create();
>> 31 conf.set("hbase.defaults.for.version","0.96.0.2.0.6.0-76-hadoop2");
>> 32 conf.set("hbase.defaults.for.version.skip","true");
>> 33 conf.set("hbase.zookeeper.quorum",
>> "vm24,vm37,vm38");
>> 34 conf.set("hbase.zookeeper.property.clientPort", "2181");
>> 35 conf.set("hbase.rootdir",
>> "hdfs://vm38:8020/user/hbase/data");
>> 36 //conf.set("zookeeper.znode.parent",
>> "/hbase-unsecure");
>> 37
>> 38 pool = new HTablePool(conf, 1);
>> 39 usersTable = pool.getTable("kafkademo1");
>> 40 }
>> 41
>> 42 @Override
>> 43 public void execute(Tuple tuple, BasicOutputCollector
>> collector)
>> 44 {
>> 45 String line = tuple.getString(0);
>> 46
>> 47 Put p = new
>> Put(Bytes.toBytes(UUID.randomUUID().toString()));
>> 48 p.add(Bytes.toBytes("info"),
>> Bytes.toBytes("line"), Bytes.toBytes(line));
>> 49
>> 50 try {
>> 51 usersTable.put(p);
>> 52 count ++;
>> 53 System.out.println("Count: "+ count);
>> 54 }
>> 55 catch (Exception e){
>> 56 e.printStackTrace();
>> 57 }
>> 58 collector.emit(new Values(line));
>> 59
>> 60 }
>> 61
>> 62 @Override
>> 63 public void declareOutputFields(OutputFieldsDeclarer
>> declarer)
>> 64 {
>> 65 declarer.declare(new Fields("line"));
>> 66 }
>> 67
>> 68 @Override
>> 69 public void cleanup()
>> 70 {
>> 71 try {
>> 72 usersTable.close();
>> 73 }
>> 74 catch (Exception e){
>> 75 e.printStackTrace();
>> 76 }
>> 77 }
>> 78
>> 79 }
>>
>> line in execute method: System.out.println("Count: "+ count); added
>> in debug purpose to see in log that bolt is running.
>>
>> In to Spout in method nextTuple()
>> I added debug line: System.out.println("Message from the Topic ...");
>>
>> After some time around 50minutes in log file I can see that Spout is
>> working but Bolt is died.
>>
>> Any ideas?
>>
>
Re: Worker dies (bolt)
Posted by Margusja <ma...@roo.ee>.
Hei
I have made a new test and discovered that in my environment a very
simple bolt will die too after around 2500 cycle.
Bolt's code:
1 package storm;
2
3 import backtype.storm.task.TopologyContext;
4 import backtype.storm.topology.BasicOutputCollector;
5 import backtype.storm.topology.OutputFieldsDeclarer;
6 import backtype.storm.topology.base.BaseBasicBolt;
7 import backtype.storm.tuple.Fields;
8 import backtype.storm.tuple.Tuple;
9 import backtype.storm.tuple.Values;
10
11 import java.util.Map;
12 import java.util.UUID;
13
14 public class DummyBolt extends BaseBasicBolt
15 {
16 int count = 0;
17
18 @Override
19 public void prepare(Map stormConf, TopologyContext context) {
20 }
21
22 @Override
23 public void execute(Tuple tuple, BasicOutputCollector
collector)
24 {
25 String line = tuple.getString(0);
26
27 count ++;
28 System.out.println("Dummy count: "+ count);
29 collector.emit(new Values(line));
30
31 }
32
33 @Override
34 public void declareOutputFields(OutputFieldsDeclarer declarer)
35 {
36 declarer.declare(new Fields("line"));
37 }
38
39 @Override
40 public void cleanup()
41 {
42 }
43
44 }
after around 2500 cycles there is no output from execute methods.
What I do after this.
[root@dlvm2 sysconfig]# ls /var/lib/storm/workers/*/pids
[root@dlvm2 sysconfig]# kill -9 4179
after it new worker is coming up and it works again around 2500 cycles
and stops and I have to kill pid again.
Any ideas?
Best regards, Margus (Margusja) Roo
+372 51 48 780
http://margus.roo.ee
http://ee.linkedin.com/in/margusroo
skype: margusja
ldapsearch -x -h ldap.sk.ee -b c=EE "(serialNumber=37303140314)"
On 02/06/14 13:36, Margusja wrote:
> Hi
>
> I am using apache-storm-0.9.1-incubating.
> I have simple topology: Spout reads from kafka topic and Bolt writes
> lines from spout to HBase.
>
> recently we did a test - we send 300 000 000 messages over kafka-rest
> -> kafka-queue -> storm topology -> hbase.
>
> I noticed that around one hour and around 2500 messages worker died.
> PID is there and process is up but bolt's execute method hangs.
>
> Bolts code is:
> package storm;
> 2
> 3 import java.util.Map;
> 4 import java.util.UUID;
> 5
> 6 import org.apache.hadoop.conf.Configuration;
> 7 import org.apache.hadoop.hbase.HBaseConfiguration;
> 8 import org.apache.hadoop.hbase.client.HTableInterface;
> 9 import org.apache.hadoop.hbase.client.HTablePool;
> 10 import org.apache.hadoop.hbase.client.Put;
> 11 import org.apache.hadoop.hbase.util.Bytes;
> 12
> 13 import backtype.storm.task.TopologyContext;
> 14 import backtype.storm.topology.BasicOutputCollector;
> 15 import backtype.storm.topology.OutputFieldsDeclarer;
> 16 import backtype.storm.topology.base.BaseBasicBolt;
> 17 import backtype.storm.tuple.Fields;
> 18 import backtype.storm.tuple.Tuple;
> 19 import backtype.storm.tuple.Values;
>
> public class HBaseWriterBolt extends BaseBasicBolt
> 22 {
> 23
> 24 HTableInterface usersTable;
> 25 HTablePool pool;
> 26 int count = 0;
> 27
> 28 @Override
> 29 public void prepare(Map stormConf, TopologyContext
> context) {
> 30 Configuration conf = HBaseConfiguration.create();
> 31 conf.set("hbase.defaults.for.version","0.96.0.2.0.6.0-76-hadoop2");
> 32 conf.set("hbase.defaults.for.version.skip","true");
> 33 conf.set("hbase.zookeeper.quorum",
> "vm24,vm37,vm38");
> 34 conf.set("hbase.zookeeper.property.clientPort", "2181");
> 35 conf.set("hbase.rootdir",
> "hdfs://vm38:8020/user/hbase/data");
> 36 //conf.set("zookeeper.znode.parent",
> "/hbase-unsecure");
> 37
> 38 pool = new HTablePool(conf, 1);
> 39 usersTable = pool.getTable("kafkademo1");
> 40 }
> 41
> 42 @Override
> 43 public void execute(Tuple tuple, BasicOutputCollector
> collector)
> 44 {
> 45 String line = tuple.getString(0);
> 46
> 47 Put p = new
> Put(Bytes.toBytes(UUID.randomUUID().toString()));
> 48 p.add(Bytes.toBytes("info"),
> Bytes.toBytes("line"), Bytes.toBytes(line));
> 49
> 50 try {
> 51 usersTable.put(p);
> 52 count ++;
> 53 System.out.println("Count: "+ count);
> 54 }
> 55 catch (Exception e){
> 56 e.printStackTrace();
> 57 }
> 58 collector.emit(new Values(line));
> 59
> 60 }
> 61
> 62 @Override
> 63 public void declareOutputFields(OutputFieldsDeclarer
> declarer)
> 64 {
> 65 declarer.declare(new Fields("line"));
> 66 }
> 67
> 68 @Override
> 69 public void cleanup()
> 70 {
> 71 try {
> 72 usersTable.close();
> 73 }
> 74 catch (Exception e){
> 75 e.printStackTrace();
> 76 }
> 77 }
> 78
> 79 }
>
> line in execute method: System.out.println("Count: "+ count); added in
> debug purpose to see in log that bolt is running.
>
> In to Spout in method nextTuple()
> I added debug line: System.out.println("Message from the Topic ...");
>
> After some time around 50minutes in log file I can see that Spout is
> working but Bolt is died.
>
> Any ideas?
>