You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by "Yang R.X." <ye...@hotmail.com> on 2016/07/01 08:34:11 UTC

答复: Bolt keep shutting down

Hi Michael,


I missed your words below the log before. It really helps via “-Xmx1024m” !

Thanks a lot!


________________________________
发件人: Kazansky, Michael <mi...@jpmchase.com>
发送时间: 2016年6月28日 23:14
收件人: user@storm.apache.org
主题: RE: Bolt keep shutting down


I would look at other pieces of your code as well as analyze heapdump. Looks like you have memory leak



569 2016-06-28 19:10:55.218 STDERR [INFO] java.lang.OutOfMemoryError: GC overhead limit exceeded

 570 2016-06-28 19:10:55.221 STDERR [INFO] Dumping heap to artifacts/heapdump ...

 571 2016-06-28 19:10:55.222 STDERR [INFO] Unable to create artifacts/heapdump: File exists

 572 2016-06-28 19:10:56.893 o.a.s.s.o.a.c.f.s.ConnectionStateManager [INFO] State change: SUSPENDED

 573 2016-06-28 19:10:58.505 o.a.s.s.o.a.z.ClientCnxn [INFO] Opening socket connection to server cherry02.db.ict.ac.cn/10.12.0.82:2181. Will not attempt to authenticate using SASL (     unknown error)

 574 2016-06-28 19:10:58.505 o.a.s.s.o.a.z.ClientCnxn [INFO] Socket connection established to cherry02.db.ict.ac.cn/10.12.0.82:2181, initiating session

 575 2016-06-28 19:10:59.571 o.a.s.c.zookeeper-state-factory [WARN] Received event :disconnected::none: with disconnected Writer Zookeeper.

 576 2016-06-28 19:11:00.514 o.a.s.s.o.a.z.ClientCnxn [INFO] Session establishment complete on server cherry02.db.ict.ac.cn/10.12.0.82:2181, sessionid = 0x2558b61a1be0057, negotiate     d timeout = 20000

 577 2016-06-28 19:11:00.515 o.a.s.s.o.a.c.f.s.ConnectionStateManager [INFO] State change: RECONNECTED

 578 2016-06-28 19:10:55.212 o.a.s.m.n.StormServerHandler [ERROR] server errors in handling the request

 579 java.lang.OutOfMemoryError: GC overhead limit exceeded



java.lang.OutOfMemoryError: GC overhead limit exceeded



the error happens "if too much time is being spent in garbage collection: if more than 98% of the total time is spent in garbage collection and less than 2% of the heap is recovered, an OutOfMemoryError will be thrown.".



Try to increase the heap size, via "-Xmx1024m" (or more)



Thanks,

Michael Kazansky



From: Yang Ruoxue [mailto:yesimsure@hotmail.com]
Sent: Tuesday, June 28, 2016 10:51 AM
To: user@storm.apache.org
Subject: 答复: Bolt keep shutting down



Michael, thanks a lot.



I  change the code and use try-catch with resources, the problem continues. Any other idea?



________________________________

发件人: Kazansky, Michael <mi...@jpmchase.com>>
发送时间: 2016年6月28日 21:47:48
收件人: user@storm.apache.org<ma...@storm.apache.org>
主题: RE: Bolt keep shutting down



try {

221               writer = Files.newBufferedWriter(file, charset, StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.APPEND);

223               writer.write(firstline, 0, firstline.length());

224               writer.flush();

225         } catch (IOException x) {

226               System.err.format("prepare() - IOException: %s%n", x);

227         }



You forgot to close your writer. Or use try-catch with resources for writer to be closed automatically.



Thanks,

Michael Kazansky



From: Yang Ruoxue [mailto:yesimsure@hotmail.com]
Sent: Tuesday, June 28, 2016 9:44 AM
To: user@storm.apache.org<ma...@storm.apache.org>
Subject: Bolt keep shutting down



Hi all,

    Any one can help me on my problem?
    I setup a 3 workers cluster on 4 nodes with Storm 1.0.0. My jobs is generating messages with a increment id and the timestamp when it was generated in spout and  calculating the delay and throughput , and then push the result to local file in a bolt. However, my bolt is keeping shut down and restarting on another node(from node A to B to A to B to …) once submitted to  the cluster.



log here:



 562 2016-06-28 19:10:16.352 o.a.s.d.worker [INFO] Worker 3f9fa92d-e386-4f55-aed6-4044400c47f4 for storm ref-2-1467110596 on 2d6dc803-4242-4de3-82d6-902d36f47d98:6703 has finished l     oading

 563 2016-06-28 19:10:17.001 o.a.s.d.worker [INFO] All connections are ready for worker 2d6dc803-4242-4de3-82d6-902d36f47d98:6703 with id 3f9fa92d-e386-4f55-aed6-4044400c47f4

 564 2016-06-28 19:10:17.029 o.a.s.d.executor [INFO] Preparing bolt simple:(2)

 565 2016-06-28 19:10:17.041 o.a.s.d.executor [INFO] Prepared bolt simple:(2)

 566 2016-06-28 19:10:17.041 o.a.s.d.executor [INFO] Preparing bolt __system:(-1)

 567 2016-06-28 19:10:17.048 o.a.s.d.executor [INFO] Prepared bolt __system:(-1)

 568 2016-06-28 19:10:55.211 o.a.s.s.o.a.z.ClientCnxn [INFO] Client session timed out, have not heard from server in 13702ms for sessionid 0x2558b61a1be0057, closing socket connecti     on and attempting reconnect

 569 2016-06-28 19:10:55.218 STDERR [INFO] java.lang.OutOfMemoryError: GC overhead limit exceeded

 570 2016-06-28 19:10:55.221 STDERR [INFO] Dumping heap to artifacts/heapdump ...

 571 2016-06-28 19:10:55.222 STDERR [INFO] Unable to create artifacts/heapdump: File exists

 572 2016-06-28 19:10:56.893 o.a.s.s.o.a.c.f.s.ConnectionStateManager [INFO] State change: SUSPENDED

 573 2016-06-28 19:10:58.505 o.a.s.s.o.a.z.ClientCnxn [INFO] Opening socket connection to server cherry02.db.ict.ac.cn/10.12.0.82:2181. Will not attempt to authenticate using SASL (     unknown error)

 574 2016-06-28 19:10:58.505 o.a.s.s.o.a.z.ClientCnxn [INFO] Socket connection established to cherry02.db.ict.ac.cn/10.12.0.82:2181, initiating session

 575 2016-06-28 19:10:59.571 o.a.s.c.zookeeper-state-factory [WARN] Received event :disconnected::none: with disconnected Writer Zookeeper.

 576 2016-06-28 19:11:00.514 o.a.s.s.o.a.z.ClientCnxn [INFO] Session establishment complete on server cherry02.db.ict.ac.cn/10.12.0.82:2181, sessionid = 0x2558b61a1be0057, negotiate     d timeout = 20000

 577 2016-06-28 19:11:00.515 o.a.s.s.o.a.c.f.s.ConnectionStateManager [INFO] State change: RECONNECTED

 578 2016-06-28 19:10:55.212 o.a.s.m.n.StormServerHandler [ERROR] server errors in handling the request

 579 java.lang.OutOfMemoryError: GC overhead limit exceeded

 580         at java.util.Arrays.copyOf(Arrays.java:2219) ~[?:1.7.0_40]

 581         at java.util.ArrayList.grow(ArrayList.java:242) ~[?:1.7.0_40]

 582         at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216) ~[?:1.7.0_40]

 583         at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208) ~[?:1.7.0_40]

 584         at java.util.ArrayList.add(ArrayList.java:440) ~[?:1.7.0_40]

 585         at org.apache.storm.utils.ListDelegate.add(ListDelegate.java:73) ~[storm-core-1.0.0.jar:1.0.0]

 586         at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:134) ~[kryo-3.0.3.jar:?]

 587         at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:40) ~[kryo-3.0.3.jar:?]

 588         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:689) ~[kryo-3.0.3.jar:?]

 589         at org.apache.storm.serialization.KryoValuesDeserializer.deserializeFrom(KryoValuesDeserializer.java:37) ~[storm-core-1.0.0.jar:1.0.0]

 590         at org.apache.storm.serialization.KryoTupleDeserializer.deserialize(KryoTupleDeserializer.java:50) ~[storm-core-1.0.0.jar:1.0.0]

 591         at org.apache.storm.messaging.DeserializingConnectionCallback.recv(DeserializingConnectionCallback.java:56) ~[storm-core-1.0.0.jar:1.0.0]

 592         at org.apache.storm.messaging.netty.Server.enqueue(Server.java:133) ~[storm-core-1.0.0.jar:1.0.0]

 593         at org.apache.storm.messaging.netty.Server.received(Server.java:254) ~[storm-core-1.0.0.jar:1.0.0]

 594         at org.apache.storm.messaging.netty.StormServerHandler.messageReceived(StormServerHandler.java:61) ~[storm-core-1.0.0.jar:1.0.0]

 595         at org.apache.storm.shade.org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) ~[storm-core-1.0.0.jar:1.0.0]



 596         at org.apache.storm.shade.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) ~[storm-core-1.0.0.jar:1.0.0]

 597         at org.apache.storm.shade.org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) ~[storm-core-1.0.0.j     ar:1.0.0]

 598         at org.apache.storm.shade.org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) ~[storm-core-1.0.0.jar:1.0.0]

 599         at org.apache.storm.shade.org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462) ~[storm-core-1.0.0.jar:1.0.0]

 600         at org.apache.storm.shade.org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443) ~[storm-core-1.0.0.jar:1.0.0]

 601

 602         at org.apache.storm.shade.org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) ~[storm-core-1.0.0.jar:1.0.0]

 603         at org.apache.storm.shade.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) ~[storm-core-1.0.0.jar:1.0.0]

 604         at org.apache.storm.shade.org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) ~[storm-core-1.0.0.jar:1.0.0]

 605         at org.apache.storm.shade.org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) ~[storm-core-1.0.0.jar:1.0.0]

 606         at org.apache.storm.shade.org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) ~[storm-core-1.0.0.jar:1.0.0]

 607         at org.apache.storm.shade.org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) ~[storm-core-1.0.0.jar:1.0.0]

 608         at org.apache.storm.shade.org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) ~[storm-core-1.0.0.jar:1.0.0]

 609         at org.apache.storm.shade.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318) ~[storm-core-1.0.0.jar:1.0.0]

 610         at org.apache.storm.shade.org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) ~[storm-core-1.0.0.jar:1.0.0]

 611         at org.apache.storm.shade.org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) ~[storm-core-1.0.0.jar:1.0.0]

 612 2016-06-28 19:11:01.557 STDIO [ERROR] Halting due to Out Of Memory Error...Netty-server-localhost-6703-worker-1



And codes here:

83   public static class BaseBolt extends BaseRichBolt {

184

185       OutputCollector collector;

186

187       TopologyContext context;

188       String name; // id

189       int executor;

190

191       BufferedWriter writer = null;

192       long filestamp;

193

194       private long count = 0;

195       long begin;

196       long last;

197       float delay_sum = 0;

198

199     public BaseBolt(long stp){

200       filestamp = stp;

201     }

202

203     @Override

204     public void declareOutputFields(OutputFieldsDeclarer declarer) {

205        declarer.declare(new Fields("msgid", "stamp"));

206     } // declareOutputFields

207

208     @Override

209     public void prepare(Map conf, TopologyContext context, OutputCollector collector) {

210

211         this.collector = collector;

212

213         this.context = context;

214         name = context.getThisComponentId();

215         executor = context.getThisTaskIndex();

216

217         Path file = Paths.get(System.getProperty("user.home"), "ref" + filestamp + name + executor);

218         Charset charset = Charset.forName("US-ASCII");

219         String firstline = "--------------------------------------------------\nname executor elapsed(ms) elapsed_sec(ms) count(tuples) delay(ms) avg_delay rate(tuple/s)\n";

220         try {

221               writer = Files.newBufferedWriter(file, charset, StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.APPEND);

223               writer.write(firstline, 0, firstline.length());

224               writer.flush();

225         } catch (IOException x) {

226               System.err.format("prepare() - IOException: %s%n", x);

227         }

228

229         begin = System.currentTimeMillis();

230         last = begin;

231     } // prepare

232

233     @Override

234     public void cleanup() {

235       long end = System.currentTimeMillis();

236       System.out.println("whole_elapased_time: "+(end - begin)/ARY);

237       try{

238         writer.close();

239       } catch (IOException x) {

240             System.err.format("cleanup() - IOException: %s%n", x);

241       }

242     } // cleanup

243

244     public long getMsgId(Tuple tuple) {

245         return tuple.getLongByField("msgid").longValue();

246     }

247

248     public void baseExecution(long msgid, long stamp){

249         collector.emit(new Values(msgid, stamp));

250

251         // count;

252         count++;

253

254         // dealy;

255         long crt = System.currentTimeMillis();

256         float delay = (crt - stamp) ;

257         // avg_delay;

258         delay_sum += delay ;

259         float avg_delay = delay_sum / count;

260

261         long interval = (crt - last);

262         float interval_sec = (crt - last) / ARY;

263         if(interval - INTERVAL*ARY >= 0){

264

265           // elapsed

266           long elapsed = (crt - begin); // ms

267           float elapsed_sec = (crt - begin)/ARY;

268           // rate

269           float rate = count / elapsed_sec; // tuple/s

270

271           // output

272           String s = name + " " + executor + " " + elapsed + " " + elapsed_sec + " "  + count + " " + delay + " " + avg_delay + " " + rate + "\n";

273           try {

274                 writer.write(s, 0, s.length());

275                 writer.flush(); // XXX !!!

276           } catch (IOException x) {

277                 System.err.format("IOException: %s%n", x);

278           }

279

280           last = crt;

281         }  // if

282     } // baseExecution

283

284     @Override

285     public void execute(Tuple tuple) {

286

287         long msgid = getMsgId(tuple);

288         long stamp = tuple.getLongByField("stamp").longValue();

289         baseExecution(msgid,stamp);

290

291     } // execute

292

293   } // BaseBolt





Thanks for any help!





yesimsure

This communication is for informational purposes only. It is not intended as an offer or solicitation for the purchase or sale of any financial instrument or as an official confirmation of any transaction. All market prices, data and other information are not warranted as to completeness or accuracy and are subject to change without notice. Any comments or statements made herein do not necessarily reflect those of JPMorgan Chase & Co., its subsidiaries and affiliates (collectively, "JPMC"). This transmission may contain information that is proprietary, privileged, confidential and/or exempt from disclosure under applicable law. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution, or use of the information contained herein (including any reliance thereon) is STRICTLY PROHIBITED. If you received this transmission in error, please immediately contact the sender and destroy the material in its entirety, whether in electronic or hard copy format. Although this transmission and any attachments are believed to be free of any virus or other defect that might affect any computer system into which it is received and opened, it is the responsibility of the recipient to ensure that it is virus free and no responsibility is accepted by JPMC for any loss or damage arising in any way from its use. Please note that any electronic communication that is conducted within or through JPMC's systems is subject to interception, monitoring, review, retention and external production in accordance with JPMC's policy and local laws, rules and regulations; may be stored or otherwise processed in countries other than the country in which you are located; and will be treated in accordance with JPMC policies and applicable laws and regulations. Please refer to http://www.jpmorgan.com/pages/disclosures for disclosures relating to European legal entities.

This communication is for informational purposes only. It is not intended as an offer or solicitation for the purchase or sale of any financial instrument or as an official confirmation of any transaction. All market prices, data and other information are not warranted as to completeness or accuracy and are subject to change without notice. Any comments or statements made herein do not necessarily reflect those of JPMorgan Chase & Co., its subsidiaries and affiliates (collectively, "JPMC"). This transmission may contain information that is proprietary, privileged, confidential and/or exempt from disclosure under applicable law. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution, or use of the information contained herein (including any reliance thereon) is STRICTLY PROHIBITED. If you received this transmission in error, please immediately contact the sender and destroy the material in its entirety, whether in electronic or hard copy format. Although this transmission and any attachments are believed to be free of any virus or other defect that might affect any computer system into which it is received and opened, it is the responsibility of the recipient to ensure that it is virus free and no responsibility is accepted by JPMC for any loss or damage arising in any way from its use. Please note that any electronic communication that is conducted within or through JPMC's systems is subject to interception, monitoring, review, retention and external production in accordance with JPMC's policy and local laws, rules and regulations; may be stored or otherwise processed in countries other than the country in which you are located; and will be treated in accordance with JPMC policies and applicable laws and regulations. Please refer to http://www.jpmorgan.com/pages/disclosures for disclosures relating to European legal entities.