You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by "Yang R.X." <ye...@hotmail.com> on 2016/07/01 08:34:11 UTC
答复: Bolt keep shutting down
Hi Michael,
I missed your words below the log before. It really helps via “-Xmx1024m” !
Thanks a lot!
________________________________
发件人: Kazansky, Michael <mi...@jpmchase.com>
发送时间: 2016年6月28日 23:14
收件人: user@storm.apache.org
主题: RE: Bolt keep shutting down
I would look at other pieces of your code as well as analyze heapdump. Looks like you have memory leak
569 2016-06-28 19:10:55.218 STDERR [INFO] java.lang.OutOfMemoryError: GC overhead limit exceeded
570 2016-06-28 19:10:55.221 STDERR [INFO] Dumping heap to artifacts/heapdump ...
571 2016-06-28 19:10:55.222 STDERR [INFO] Unable to create artifacts/heapdump: File exists
572 2016-06-28 19:10:56.893 o.a.s.s.o.a.c.f.s.ConnectionStateManager [INFO] State change: SUSPENDED
573 2016-06-28 19:10:58.505 o.a.s.s.o.a.z.ClientCnxn [INFO] Opening socket connection to server cherry02.db.ict.ac.cn/10.12.0.82:2181. Will not attempt to authenticate using SASL ( unknown error)
574 2016-06-28 19:10:58.505 o.a.s.s.o.a.z.ClientCnxn [INFO] Socket connection established to cherry02.db.ict.ac.cn/10.12.0.82:2181, initiating session
575 2016-06-28 19:10:59.571 o.a.s.c.zookeeper-state-factory [WARN] Received event :disconnected::none: with disconnected Writer Zookeeper.
576 2016-06-28 19:11:00.514 o.a.s.s.o.a.z.ClientCnxn [INFO] Session establishment complete on server cherry02.db.ict.ac.cn/10.12.0.82:2181, sessionid = 0x2558b61a1be0057, negotiate d timeout = 20000
577 2016-06-28 19:11:00.515 o.a.s.s.o.a.c.f.s.ConnectionStateManager [INFO] State change: RECONNECTED
578 2016-06-28 19:10:55.212 o.a.s.m.n.StormServerHandler [ERROR] server errors in handling the request
579 java.lang.OutOfMemoryError: GC overhead limit exceeded
java.lang.OutOfMemoryError: GC overhead limit exceeded
the error happens "if too much time is being spent in garbage collection: if more than 98% of the total time is spent in garbage collection and less than 2% of the heap is recovered, an OutOfMemoryError will be thrown.".
Try to increase the heap size, via "-Xmx1024m" (or more)
Thanks,
Michael Kazansky
From: Yang Ruoxue [mailto:yesimsure@hotmail.com]
Sent: Tuesday, June 28, 2016 10:51 AM
To: user@storm.apache.org
Subject: 答复: Bolt keep shutting down
Michael, thanks a lot.
I change the code and use try-catch with resources, the problem continues. Any other idea?
________________________________
发件人: Kazansky, Michael <mi...@jpmchase.com>>
发送时间: 2016年6月28日 21:47:48
收件人: user@storm.apache.org<ma...@storm.apache.org>
主题: RE: Bolt keep shutting down
try {
221 writer = Files.newBufferedWriter(file, charset, StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.APPEND);
223 writer.write(firstline, 0, firstline.length());
224 writer.flush();
225 } catch (IOException x) {
226 System.err.format("prepare() - IOException: %s%n", x);
227 }
You forgot to close your writer. Or use try-catch with resources for writer to be closed automatically.
Thanks,
Michael Kazansky
From: Yang Ruoxue [mailto:yesimsure@hotmail.com]
Sent: Tuesday, June 28, 2016 9:44 AM
To: user@storm.apache.org<ma...@storm.apache.org>
Subject: Bolt keep shutting down
Hi all,
Any one can help me on my problem?
I setup a 3 workers cluster on 4 nodes with Storm 1.0.0. My jobs is generating messages with a increment id and the timestamp when it was generated in spout and calculating the delay and throughput , and then push the result to local file in a bolt. However, my bolt is keeping shut down and restarting on another node(from node A to B to A to B to …) once submitted to the cluster.
log here:
562 2016-06-28 19:10:16.352 o.a.s.d.worker [INFO] Worker 3f9fa92d-e386-4f55-aed6-4044400c47f4 for storm ref-2-1467110596 on 2d6dc803-4242-4de3-82d6-902d36f47d98:6703 has finished l oading
563 2016-06-28 19:10:17.001 o.a.s.d.worker [INFO] All connections are ready for worker 2d6dc803-4242-4de3-82d6-902d36f47d98:6703 with id 3f9fa92d-e386-4f55-aed6-4044400c47f4
564 2016-06-28 19:10:17.029 o.a.s.d.executor [INFO] Preparing bolt simple:(2)
565 2016-06-28 19:10:17.041 o.a.s.d.executor [INFO] Prepared bolt simple:(2)
566 2016-06-28 19:10:17.041 o.a.s.d.executor [INFO] Preparing bolt __system:(-1)
567 2016-06-28 19:10:17.048 o.a.s.d.executor [INFO] Prepared bolt __system:(-1)
568 2016-06-28 19:10:55.211 o.a.s.s.o.a.z.ClientCnxn [INFO] Client session timed out, have not heard from server in 13702ms for sessionid 0x2558b61a1be0057, closing socket connecti on and attempting reconnect
569 2016-06-28 19:10:55.218 STDERR [INFO] java.lang.OutOfMemoryError: GC overhead limit exceeded
570 2016-06-28 19:10:55.221 STDERR [INFO] Dumping heap to artifacts/heapdump ...
571 2016-06-28 19:10:55.222 STDERR [INFO] Unable to create artifacts/heapdump: File exists
572 2016-06-28 19:10:56.893 o.a.s.s.o.a.c.f.s.ConnectionStateManager [INFO] State change: SUSPENDED
573 2016-06-28 19:10:58.505 o.a.s.s.o.a.z.ClientCnxn [INFO] Opening socket connection to server cherry02.db.ict.ac.cn/10.12.0.82:2181. Will not attempt to authenticate using SASL ( unknown error)
574 2016-06-28 19:10:58.505 o.a.s.s.o.a.z.ClientCnxn [INFO] Socket connection established to cherry02.db.ict.ac.cn/10.12.0.82:2181, initiating session
575 2016-06-28 19:10:59.571 o.a.s.c.zookeeper-state-factory [WARN] Received event :disconnected::none: with disconnected Writer Zookeeper.
576 2016-06-28 19:11:00.514 o.a.s.s.o.a.z.ClientCnxn [INFO] Session establishment complete on server cherry02.db.ict.ac.cn/10.12.0.82:2181, sessionid = 0x2558b61a1be0057, negotiate d timeout = 20000
577 2016-06-28 19:11:00.515 o.a.s.s.o.a.c.f.s.ConnectionStateManager [INFO] State change: RECONNECTED
578 2016-06-28 19:10:55.212 o.a.s.m.n.StormServerHandler [ERROR] server errors in handling the request
579 java.lang.OutOfMemoryError: GC overhead limit exceeded
580 at java.util.Arrays.copyOf(Arrays.java:2219) ~[?:1.7.0_40]
581 at java.util.ArrayList.grow(ArrayList.java:242) ~[?:1.7.0_40]
582 at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216) ~[?:1.7.0_40]
583 at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208) ~[?:1.7.0_40]
584 at java.util.ArrayList.add(ArrayList.java:440) ~[?:1.7.0_40]
585 at org.apache.storm.utils.ListDelegate.add(ListDelegate.java:73) ~[storm-core-1.0.0.jar:1.0.0]
586 at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:134) ~[kryo-3.0.3.jar:?]
587 at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:40) ~[kryo-3.0.3.jar:?]
588 at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:689) ~[kryo-3.0.3.jar:?]
589 at org.apache.storm.serialization.KryoValuesDeserializer.deserializeFrom(KryoValuesDeserializer.java:37) ~[storm-core-1.0.0.jar:1.0.0]
590 at org.apache.storm.serialization.KryoTupleDeserializer.deserialize(KryoTupleDeserializer.java:50) ~[storm-core-1.0.0.jar:1.0.0]
591 at org.apache.storm.messaging.DeserializingConnectionCallback.recv(DeserializingConnectionCallback.java:56) ~[storm-core-1.0.0.jar:1.0.0]
592 at org.apache.storm.messaging.netty.Server.enqueue(Server.java:133) ~[storm-core-1.0.0.jar:1.0.0]
593 at org.apache.storm.messaging.netty.Server.received(Server.java:254) ~[storm-core-1.0.0.jar:1.0.0]
594 at org.apache.storm.messaging.netty.StormServerHandler.messageReceived(StormServerHandler.java:61) ~[storm-core-1.0.0.jar:1.0.0]
595 at org.apache.storm.shade.org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) ~[storm-core-1.0.0.jar:1.0.0]
596 at org.apache.storm.shade.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) ~[storm-core-1.0.0.jar:1.0.0]
597 at org.apache.storm.shade.org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) ~[storm-core-1.0.0.j ar:1.0.0]
598 at org.apache.storm.shade.org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) ~[storm-core-1.0.0.jar:1.0.0]
599 at org.apache.storm.shade.org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462) ~[storm-core-1.0.0.jar:1.0.0]
600 at org.apache.storm.shade.org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443) ~[storm-core-1.0.0.jar:1.0.0]
601
602 at org.apache.storm.shade.org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) ~[storm-core-1.0.0.jar:1.0.0]
603 at org.apache.storm.shade.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) ~[storm-core-1.0.0.jar:1.0.0]
604 at org.apache.storm.shade.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) ~[storm-core-1.0.0.jar:1.0.0]
605 at org.apache.storm.shade.org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) ~[storm-core-1.0.0.jar:1.0.0]
606 at org.apache.storm.shade.org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) ~[storm-core-1.0.0.jar:1.0.0]
607 at org.apache.storm.shade.org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) ~[storm-core-1.0.0.jar:1.0.0]
608 at org.apache.storm.shade.org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) ~[storm-core-1.0.0.jar:1.0.0]
609 at org.apache.storm.shade.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318) ~[storm-core-1.0.0.jar:1.0.0]
610 at org.apache.storm.shade.org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) ~[storm-core-1.0.0.jar:1.0.0]
611 at org.apache.storm.shade.org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) ~[storm-core-1.0.0.jar:1.0.0]
612 2016-06-28 19:11:01.557 STDIO [ERROR] Halting due to Out Of Memory Error...Netty-server-localhost-6703-worker-1
And codes here:
83 public static class BaseBolt extends BaseRichBolt {
184
185 OutputCollector collector;
186
187 TopologyContext context;
188 String name; // id
189 int executor;
190
191 BufferedWriter writer = null;
192 long filestamp;
193
194 private long count = 0;
195 long begin;
196 long last;
197 float delay_sum = 0;
198
199 public BaseBolt(long stp){
200 filestamp = stp;
201 }
202
203 @Override
204 public void declareOutputFields(OutputFieldsDeclarer declarer) {
205 declarer.declare(new Fields("msgid", "stamp"));
206 } // declareOutputFields
207
208 @Override
209 public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
210
211 this.collector = collector;
212
213 this.context = context;
214 name = context.getThisComponentId();
215 executor = context.getThisTaskIndex();
216
217 Path file = Paths.get(System.getProperty("user.home"), "ref" + filestamp + name + executor);
218 Charset charset = Charset.forName("US-ASCII");
219 String firstline = "--------------------------------------------------\nname executor elapsed(ms) elapsed_sec(ms) count(tuples) delay(ms) avg_delay rate(tuple/s)\n";
220 try {
221 writer = Files.newBufferedWriter(file, charset, StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.APPEND);
223 writer.write(firstline, 0, firstline.length());
224 writer.flush();
225 } catch (IOException x) {
226 System.err.format("prepare() - IOException: %s%n", x);
227 }
228
229 begin = System.currentTimeMillis();
230 last = begin;
231 } // prepare
232
233 @Override
234 public void cleanup() {
235 long end = System.currentTimeMillis();
236 System.out.println("whole_elapased_time: "+(end - begin)/ARY);
237 try{
238 writer.close();
239 } catch (IOException x) {
240 System.err.format("cleanup() - IOException: %s%n", x);
241 }
242 } // cleanup
243
244 public long getMsgId(Tuple tuple) {
245 return tuple.getLongByField("msgid").longValue();
246 }
247
248 public void baseExecution(long msgid, long stamp){
249 collector.emit(new Values(msgid, stamp));
250
251 // count;
252 count++;
253
254 // dealy;
255 long crt = System.currentTimeMillis();
256 float delay = (crt - stamp) ;
257 // avg_delay;
258 delay_sum += delay ;
259 float avg_delay = delay_sum / count;
260
261 long interval = (crt - last);
262 float interval_sec = (crt - last) / ARY;
263 if(interval - INTERVAL*ARY >= 0){
264
265 // elapsed
266 long elapsed = (crt - begin); // ms
267 float elapsed_sec = (crt - begin)/ARY;
268 // rate
269 float rate = count / elapsed_sec; // tuple/s
270
271 // output
272 String s = name + " " + executor + " " + elapsed + " " + elapsed_sec + " " + count + " " + delay + " " + avg_delay + " " + rate + "\n";
273 try {
274 writer.write(s, 0, s.length());
275 writer.flush(); // XXX !!!
276 } catch (IOException x) {
277 System.err.format("IOException: %s%n", x);
278 }
279
280 last = crt;
281 } // if
282 } // baseExecution
283
284 @Override
285 public void execute(Tuple tuple) {
286
287 long msgid = getMsgId(tuple);
288 long stamp = tuple.getLongByField("stamp").longValue();
289 baseExecution(msgid,stamp);
290
291 } // execute
292
293 } // BaseBolt
Thanks for any help!
yesimsure
This communication is for informational purposes only. It is not intended as an offer or solicitation for the purchase or sale of any financial instrument or as an official confirmation of any transaction. All market prices, data and other information are not warranted as to completeness or accuracy and are subject to change without notice. Any comments or statements made herein do not necessarily reflect those of JPMorgan Chase & Co., its subsidiaries and affiliates (collectively, "JPMC"). This transmission may contain information that is proprietary, privileged, confidential and/or exempt from disclosure under applicable law. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution, or use of the information contained herein (including any reliance thereon) is STRICTLY PROHIBITED. If you received this transmission in error, please immediately contact the sender and destroy the material in its entirety, whether in electronic or hard copy format. Although this transmission and any attachments are believed to be free of any virus or other defect that might affect any computer system into which it is received and opened, it is the responsibility of the recipient to ensure that it is virus free and no responsibility is accepted by JPMC for any loss or damage arising in any way from its use. Please note that any electronic communication that is conducted within or through JPMC's systems is subject to interception, monitoring, review, retention and external production in accordance with JPMC's policy and local laws, rules and regulations; may be stored or otherwise processed in countries other than the country in which you are located; and will be treated in accordance with JPMC policies and applicable laws and regulations. Please refer to http://www.jpmorgan.com/pages/disclosures for disclosures relating to European legal entities.
This communication is for informational purposes only. It is not intended as an offer or solicitation for the purchase or sale of any financial instrument or as an official confirmation of any transaction. All market prices, data and other information are not warranted as to completeness or accuracy and are subject to change without notice. Any comments or statements made herein do not necessarily reflect those of JPMorgan Chase & Co., its subsidiaries and affiliates (collectively, "JPMC"). This transmission may contain information that is proprietary, privileged, confidential and/or exempt from disclosure under applicable law. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution, or use of the information contained herein (including any reliance thereon) is STRICTLY PROHIBITED. If you received this transmission in error, please immediately contact the sender and destroy the material in its entirety, whether in electronic or hard copy format. Although this transmission and any attachments are believed to be free of any virus or other defect that might affect any computer system into which it is received and opened, it is the responsibility of the recipient to ensure that it is virus free and no responsibility is accepted by JPMC for any loss or damage arising in any way from its use. Please note that any electronic communication that is conducted within or through JPMC's systems is subject to interception, monitoring, review, retention and external production in accordance with JPMC's policy and local laws, rules and regulations; may be stored or otherwise processed in countries other than the country in which you are located; and will be treated in accordance with JPMC policies and applicable laws and regulations. Please refer to http://www.jpmorgan.com/pages/disclosures for disclosures relating to European legal entities.