You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Sa Li <sa...@gmail.com> on 2015/03/05 19:10:11 UTC

java.lang.OutOfMemoryError: GC overhead limit exceeded

Hi, All

I have been running a trident topology on production server, code is like
this:

topology.newStream("spoutInit", kafkaSpout)
                .each(new Fields("str"),
                        new JsonObjectParse(),
                        new Fields("eventType", "event"))
                .parallelismHint(pHint)
                .groupBy(new Fields("event"))

.persistentAggregate(PostgresqlState.newFactory(config), new
Fields("eventType"), new EventUpdater(), new Fields("eventWord"))
        ;

        Config conf = new Config();
        conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 1);

Basically, it does simple things to get data from kafka, parse to
different field and write into postgresDB. But in storm UI, I did see
such error, "java.lang.OutOfMemoryError: GC overhead limit exceeded".
It all happens in same worker of each node - 6703. I understand this
is because by default the JVM is configured to throw this error if you
are spending more than *98% of the total time in GC and after the GC
less than 2% of the heap is recovered*.

I am not sure what is exact cause for memory leak, is it OK by simply
increase the heap? Here is my storm.yaml:

supervisor.slots.ports:

     - 6700

     - 6701

     - 6702

     - 6703

nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true"

ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"

supervisor.childopts: "-Djava.net.preferIPv4Stack=true"

worker.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"


Anyone has similar issues, and what will be the best way to overcome?


thanks in advance

AL

Re: java.lang.OutOfMemoryError: GC overhead limit exceeded

Posted by Nathan Leung <nc...@gmail.com>.
It depends on your application. You can use a profiler such as yourkit or
jvisualvm etc to get an idea of how much memory you are using.
On Mar 5, 2015 1:22 PM, "Sa Li" <sa...@gmail.com> wrote:

> Thanks, Nathan. How much is should be in general?
>
> On Thu, Mar 5, 2015 at 10:15 AM, Nathan Leung <nc...@gmail.com> wrote:
>
>> Your worker is allocated a maximum of 768mb of heap. It's quite possible
>> that this is not enough. Try increasing Xmx i worker.childopts.
>> On Mar 5, 2015 1:10 PM, "Sa Li" <sa...@gmail.com> wrote:
>>
>>> Hi, All
>>>
>>> I have been running a trident topology on production server, code is
>>> like this:
>>>
>>> topology.newStream("spoutInit", kafkaSpout)
>>>                 .each(new Fields("str"),
>>>                         new JsonObjectParse(),
>>>                         new Fields("eventType", "event"))
>>>                 .parallelismHint(pHint)
>>>                 .groupBy(new Fields("event"))
>>>                 .persistentAggregate(PostgresqlState.newFactory(config), new Fields("eventType"), new EventUpdater(), new Fields("eventWord"))
>>>         ;
>>>
>>>         Config conf = new Config();
>>>         conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 1);
>>>
>>> Basically, it does simple things to get data from kafka, parse to different field and write into postgresDB. But in storm UI, I did see such error, "java.lang.OutOfMemoryError: GC overhead limit exceeded". It all happens in same worker of each node - 6703. I understand this is because by default the JVM is configured to throw this error if you are spending more than *98% of the total time in GC and after the GC less than 2% of the heap is recovered*.
>>>
>>> I am not sure what is exact cause for memory leak, is it OK by simply increase the heap? Here is my storm.yaml:
>>>
>>> supervisor.slots.ports:
>>>
>>>      - 6700
>>>
>>>      - 6701
>>>
>>>      - 6702
>>>
>>>      - 6703
>>>
>>> nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true"
>>>
>>> ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>
>>> supervisor.childopts: "-Djava.net.preferIPv4Stack=true"
>>>
>>> worker.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>
>>>
>>> Anyone has similar issues, and what will be the best way to overcome?
>>>
>>>
>>> thanks in advance
>>>
>>> AL
>>>
>>>
>>>
>>>
>

Re: real time warehouse loads

Posted by "P. Taylor Goetz" <pt...@gmail.com>.
What you've read about trident's throughput is wrong. Of course it depends on what you actually do in your topology (it's possible to shoot yourself in the foot and kill performance with both the core and trident APIs), but trident can achieve nearly twice the throughput as a core api topology at the expense of additional latency.

In a simple benchmark I ran on ec2 with 3 supervisor nodes (m1.large -- not a huge amount of horsepower), the core storm topology was able to process about 150k tuples per second with 80ms. latency, while trident processed about 300k tuples per second with about 250ms. latency.
(The topologies were tuned to balance throughput and latency.)

Trident performance isn't any worse than the core API, but it is different owing to the fact that it processes data in micro batches.

-Taylor
> On Mar 7, 2015, at 3:41 PM, Adaryl Bob Wakefield, MBA <ad...@hotmail.com> wrote:
> 
> I’m actually planning on using the in memory database MemSQL. Creating a file then ingesting it seems like we’re back to batch processing. I know the definition of real time varies and any improvement over 24 hours is a good thing but I’d like to get as close to the actual event happing as possible.
>  
> I’ve been studying Storm, Samza, and Spark Streaming. The literature says that Storm is good for ETL but I’ve also read that the trident abstraction has a large negative impact on throughput.
>  
> So MemSQL boast rapid ingestion. Back to my original question. The method for loading data really is just a run of the mill INSERT statement? No other magic used than that?
>  
> Adaryl "Bob" Wakefield, MBA
> Principal
> Mass Street Analytics, LLC
> 913.938.6685
> www.linkedin.com/in/bobwakefieldmba
> Twitter: @BobLovesData
>  
> From: Palmer, Cliff A. (NE)
> Sent: Saturday, March 07, 2015 10:44 AM
> To: user@storm.apache.org
> Subject: RE: real time warehouse loads
>  
> Bob, if "real time" means "up to a few minutes is acceptable" then I'd recommend you use storm to do any pre-load processing and write the result to a text/csv/etc file in a directory.  Then use a seperate utility (most databases have something that does this) to load data from the files you create into the database.
> This sounds slower, but remember that establishing a connection to a database to run a SQL INSERT has noticable latency.  It's also true that each connection (usually) takes a port/socket, memory and is often a seperate OS task so you are consuming resources that you would probably want storm using.
> There are other solutions for something closer to real time, but they require an in-memory database or "fun with caching" which will require specialized expertise.
> HTH
>  
> From: Adaryl "Bob" Wakefield, MBA [adaryl.wakefield@hotmail.com]
> Sent: Friday, March 06, 2015 7:54 PM
> To: user@storm.apache.org
> Subject: real time warehouse loads
> 
> I’m looking at storm as a method to load data warehouses in real time. I am not that familiar with Java. I’m curious about the actual mechanism to load records into tables. Is it just a matter of feeding the final result of processing into a INSERT INTO SQL statement or is it more complicated than that? It seems to me that hammering the database with SQL statements of real time data is a bit inefficient.  
>  
> Adaryl "Bob" Wakefield, MBA
> Principal
> Mass Street Analytics, LLC
> 913.938.6685
> www.linkedin.com/in/bobwakefieldmba
> Twitter: @BobLovesData

Re: real time warehouse loads

Posted by "Adaryl \"Bob\" Wakefield, MBA" <ad...@hotmail.com>.
I’m actually planning on using the in memory database MemSQL. Creating a file then ingesting it seems like we’re back to batch processing. I know the definition of real time varies and any improvement over 24 hours is a good thing but I’d like to get as close to the actual event happing as possible. 

I’ve been studying Storm, Samza, and Spark Streaming. The literature says that Storm is good for ETL but I’ve also read that the trident abstraction has a large negative impact on throughput. 

So MemSQL boast rapid ingestion. Back to my original question. The method for loading data really is just a run of the mill INSERT statement? No other magic used than that?

Adaryl "Bob" Wakefield, MBA
Principal
Mass Street Analytics, LLC
913.938.6685
www.linkedin.com/in/bobwakefieldmba
Twitter: @BobLovesData

From: Palmer, Cliff A. (NE) 
Sent: Saturday, March 07, 2015 10:44 AM
To: user@storm.apache.org 
Subject: RE: real time warehouse loads

Bob, if "real time" means "up to a few minutes is acceptable" then I'd recommend you use storm to do any pre-load processing and write the result to a text/csv/etc file in a directory.  Then use a seperate utility (most databases have something that does this) to load data from the files you create into the database.

This sounds slower, but remember that establishing a connection to a database to run a SQL INSERT has noticable latency.  It's also true that each connection (usually) takes a port/socket, memory and is often a seperate OS task so you are consuming resources that you would probably want storm using.

There are other solutions for something closer to real time, but they require an in-memory database or "fun with caching" which will require specialized expertise.

HTH




--------------------------------------------------------------------------------

From: Adaryl "Bob" Wakefield, MBA [adaryl.wakefield@hotmail.com]
Sent: Friday, March 06, 2015 7:54 PM
To: user@storm.apache.org
Subject: real time warehouse loads


I’m looking at storm as a method to load data warehouses in real time. I am not that familiar with Java. I’m curious about the actual mechanism to load records into tables. Is it just a matter of feeding the final result of processing into a INSERT INTO SQL statement or is it more complicated than that? It seems to me that hammering the database with SQL statements of real time data is a bit inefficient.  

Adaryl "Bob" Wakefield, MBA
Principal
Mass Street Analytics, LLC
913.938.6685
www.linkedin.com/in/bobwakefieldmba
Twitter: @BobLovesData

RE: real time warehouse loads

Posted by "Palmer, Cliff A. (NE)" <Cl...@gd-ms.com>.
Bob, if "real time" means "up to a few minutes is acceptable" then I'd recommend you use storm to do any pre-load processing and write the result to a text/csv/etc file in a directory.  Then use a seperate utility (most databases have something that does this) to load data from the files you create into the database.

This sounds slower, but remember that establishing a connection to a database to run a SQL INSERT has noticable latency.  It's also true that each connection (usually) takes a port/socket, memory and is often a seperate OS task so you are consuming resources that you would probably want storm using.

There are other solutions for something closer to real time, but they require an in-memory database or "fun with caching" which will require specialized expertise.

HTH



________________________________
From: Adaryl "Bob" Wakefield, MBA [adaryl.wakefield@hotmail.com]
Sent: Friday, March 06, 2015 7:54 PM
To: user@storm.apache.org
Subject: real time warehouse loads

I’m looking at storm as a method to load data warehouses in real time. I am not that familiar with Java. I’m curious about the actual mechanism to load records into tables. Is it just a matter of feeding the final result of processing into a INSERT INTO SQL statement or is it more complicated than that? It seems to me that hammering the database with SQL statements of real time data is a bit inefficient.

Adaryl "Bob" Wakefield, MBA
Principal
Mass Street Analytics, LLC
913.938.6685
www.linkedin.com/in/bobwakefieldmba
Twitter: @BobLovesData

real time warehouse loads

Posted by "Adaryl \"Bob\" Wakefield, MBA" <ad...@hotmail.com>.
I’m looking at storm as a method to load data warehouses in real time. I am not that familiar with Java. I’m curious about the actual mechanism to load records into tables. Is it just a matter of feeding the final result of processing into a INSERT INTO SQL statement or is it more complicated than that? It seems to me that hammering the database with SQL statements of real time data is a bit inefficient.  

Adaryl "Bob" Wakefield, MBA
Principal
Mass Street Analytics, LLC
913.938.6685
www.linkedin.com/in/bobwakefieldmba
Twitter: @BobLovesData

RE: java.lang.OutOfMemoryError: GC overhead limit exceeded

Posted by Ra...@DellTeam.com.
Dell - Internal Use - Confidential
The previous reply was from March 10th so Bill may have resolved this issue but here is some info that can be  useful to other folks in the future.

You can set the JVM flags to do a heapdump on OOM using -XX:+HeapDumpOnOutOfMemoryError.

You can then analyze the heapdump in a tool like VisualVM, which has a leak detector to tell you where you are spending all your memory.

Make sure you set the worker childopts –Xmx to a reasonable value like 2G so that you can comfortably analyze the heapdump.

You can also turn on jmxremote monitoring and use tools like Java Mission Control(ships with JDK) to do flight recordings that give you a wealth of information on whats happening in your JVM, thread dumps at periodic times and such.

From: Binh Nguyen Van [mailto:binhnv80@gmail.com]
Sent: Wednesday, May 27, 2015 3:03 PM
To: user@storm.apache.org
Subject: Re: java.lang.OutOfMemoryError: GC overhead limit exceeded

Not sure if you fixed the issue but I think the problem may come from the max spout pending.
You are using Trident and this value is the max number of pending "BATCHES" and not number
of "tuples" so let say your topic has 10 partitions and max spout pending is set to 10 and the max
fetch size is set to 1MB then you will have 10*10*1 = 100MB input data in your topology at a moment
and this will blow up your heap really quick.
I think go with max spout pending set to 1 and then tune it is better way to go.

Hope this help
-Binh

On Tue, Mar 10, 2015 at 3:56 AM, Brunner, Bill <bi...@baml.com>> wrote:
Once you’ve profiled your app, you should also play around with different garbage collectors.  Considering you’re reaching max heap, I assume your tuples are probably pretty large.  If that’s the case and you’re using the CMS garbage collector, you’re going to blow out your heap regularly.  I found with large tuples and/or memory intensive computations that the old parallel GC works the best because it compresses old gen every time it collects… CMS doesn’t and each sweep it  tries to jam more into the heap until it can’t any longer and then blows up.  There is also a great article by Michael Knoll about storm’s message buffers and how to tweak them depending on your needs.  http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/



From: Sa Li [mailto:sa.in.vanc@gmail.com<ma...@gmail.com>]
Sent: Monday, March 09, 2015 10:15 PM
To: user@storm.apache.org<ma...@storm.apache.org>
Subject: Re: java.lang.OutOfMemoryError: GC overhead limit exceeded


I have not done that yet, not quite familiar with this, but I will try to do that tomorrow, thanks.
On Mar 9, 2015 7:10 PM, "Nathan Leung" <nc...@gmail.com>> wrote:
Have you profiled you spout / bolt logic as recommended earlier in this thread?

On Mon, Mar 9, 2015 at 9:49 PM, Sa Li <sa...@gmail.com>> wrote:

You are right , I have already increased the heap in yaml to 2 G for each worker, but still have the issue, so I doubt I may running into some other causes,  receive,send buffer size? And in general, before I see the GC overhead in storm ui,  I came cross other errors in worker log as well, like Netty connection, null pointer,etc, as I show in another post.

Thanks
On Mar 9, 2015 5:36 PM, "Nathan Leung" <nc...@gmail.com>> wrote:
I still think you should try running with a larger heap.  :)  Max spout pending determines how many tuples can be pending (tuple tree is not fully acked) per spout task.  If you have many spout tasks per worker this can be a large amount of memory.  It also depends on how big your tuples are.

On Mon, Mar 9, 2015 at 6:14 PM, Sa Li <sa...@gmail.com>> wrote:
Hi, Nathan

We have played around max spout pending in dev, if we set it as 10, it is OK, but if we set it more than 50, GC overhead starts to come out. We are finally writing tuples into postgresqlDB, the highest speed for writing into DB is around 40Krecords/minute, which is supposed to be very slow, maybe that is why tuples getting accumulated in memory before dumped into DB. But I think 10 is too small, does that mean, only 10 tuples are allowed in the flight?

thanks

AL

On Fri, Mar 6, 2015 at 7:39 PM, Nathan Leung <nc...@gmail.com>> wrote:
I've not modified netty so I can't comment on that.  I would set max spout pending; try 1000 at first.  This will limit the number of tuples that you can have in flight simultaneously and therefore limit the amount of memory used by these tuples and their processing.

On Fri, Mar 6, 2015 at 7:03 PM, Sa Li <sa...@gmail.com>> wrote:
Hi, Nathan

THe log size of that kafka topic is 23515541, each record is about 3K,  I check the yaml file, I don't have max spout pending set, so I assume it is should be default: topology.max.spout.pending: null

Should I set it to a certain value? Also I sometimes seeing the java.nio.channels.ClosedChannelException: null, or  b.s.d.worker [ERROR] Error on initialization of server mk-worker
does this mean I should add
storm.messaging.netty.server_worker_threads: 1




storm.messaging.netty.client_worker_threads: 1



storm.messaging.netty.buffer_size: 5242880 #5MB buffer



storm.messaging.netty.max_retries: 30



storm.messaging.netty.max_wait_ms: 1000



storm.messaging.netty.min_wait_ms: 100

into yaml, and modfiy the values?



thanks



On Fri, Mar 6, 2015 at 2:22 PM, Nathan Leung <nc...@gmail.com>> wrote:

How much data do you have in Kafka? How is your max spout pending set? If you have a high max spout pending (or if you emit unanchored tuples) you could be using up a lot of memory.
On Mar 6, 2015 5:14 PM, "Sa Li" <sa...@gmail.com>> wrote:
Hi, Nathan

I have met a strange issue, when I set spoutConf.forceFromStart=true, it will quickly run into GC overhead limit, even I already increase the heap size, but I if I remove this setting
it will work fine, I was thinking maybe the kafkaSpout consuming data much faster than the data being written into postgresDB, and data will quick take the memory and causing heap overflow. But I did the same test on my DEV cluster, it will working fine, even I set spoutConf.forceFromStart=true. I check the storm config for DEV and production, they are all same.

Any hints?

thanks

AL


On Thu, Mar 5, 2015 at 3:26 PM, Nathan Leung <nc...@gmail.com>> wrote:
I don't see anything glaring.  I would try increasing heap size.  It could be that you're right on the threshold of what you've allocated and you just need more memory.

On Thu, Mar 5, 2015 at 5:41 PM, Sa Li <sa...@gmail.com>> wrote:
Hi, All,
,
I kind locate where the problem come from, in my running command, I will specify the clientid of TridentKafkaConfig, if I keep the clientid as the one I used before, it will cause GC error, otherwise I am completely OK. Here is the code:


if (parameters.containsKey("clientid")) {
    logger.info("topic=>" + parameters.get("clientid") + "/" + parameters.get("topic"));
    spoutConf = new TridentKafkaConfig(zk, parameters.get("topic"), parameters.get("clientid"));

Any idea about this error?



Thanks



AL

On Thu, Mar 5, 2015 at 12:02 PM, Sa Li <sa...@gmail.com>> wrote:
Sorry, continue last thread:

2015-03-05T11:48:08.418-0800 b.s.util [ERROR] Async loop died!
java.lang.RuntimeException: java.lang.RuntimeException: Remote address is not reachable. We will close this client Netty-Client-complicated-laugh/10.100.98.103:6703<http://10.100.98.103:6703>
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94) ~[storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) ~[storm-core-0.9.3.jar:0.9.3]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
Caused by: java.lang.RuntimeException: Remote address is not reachable. We will close this client Netty-Client-complicated-laugh/10.100.98.103:6703<http://10.100.98.103:6703>
        at backtype.storm.messaging.netty.Client.connect(Client.java:171) ~[storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.messaging.netty.Client.send(Client.java:194) ~[storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54) ~[storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730$fn__3731.invoke(worker.clj:330) ~[storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730.invoke(worker.clj:328) ~[storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58) ~[storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.3.jar:0.9.3]
        ... 6 common frames omitted
2015-03-05T11:48:08.423-0800 b.s.util [ERROR] Halting process: ("Async loop died!")
java.lang.RuntimeException: ("Async loop died!")
        at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.3.jar:0.9.3]
        at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
        at backtype.storm.disruptor$consume_loop_STAR_$fn__1458.invoke(disruptor.clj:92) [storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.util$async_loop$fn__464.invoke(util.clj:473) [storm-core-0.9.3.jar:0.9.3]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
2015-03-05T11:48:08.425-0800 b.s.d.worker [INFO] Shutting down worker eventsStreamerv1-48-1425499636 0673ece0-cea2-4185-9b3e-6c49ad585576 6703
2015-03-05T11:48:08.426-0800 b.s.m.n.Client [INFO] Closing Netty Client Netty-Client-beloved-judge/10.100.98.104:6703<http://10.100.98.104:6703>

I doubt this is caused by my eventUpfater, which write data in batch


static class EventUpdater implements ReducerAggregator<List<String>> {

            @Override
            public List<String> init(){
                     return null;
            }

            @Override
            public List<String> reduce(List<String> curr, TridentTuple tuple) {
                   List<String> updated = null ;

                   if ( curr == null ) {
                                    String event = (String) tuple.getValue(1);
                                    System.out.println("===:" + event + ":");
                                    updated = Lists.newArrayList(event);
                   } else {
                                    System.out.println("===+" +  tuple + ":");
                                    updated = curr ;
                   }
//              System.out.println("(())");
              return updated ;
            }
        }
How do you think

THanks

On Thu, Mar 5, 2015 at 11:57 AM, Sa Li <sa...@gmail.com>> wrote:
Thank you very much for the reply, here is error I saw in production server worker-6703.log,


On Thu, Mar 5, 2015 at 11:31 AM, Nathan Leung <nc...@gmail.com>> wrote:
Yeah, then in this case maybe you can install JDK / Yourkit in the remote machines and run the tools over X or something.  I'm assuming this is a development cluster (not live / production) and that installing debugging tools and running remote UIs etc is not a problem.  :)

On Thu, Mar 5, 2015 at 1:52 PM, Andrew Xor <an...@gmail.com>> wrote:
Nathan I think that if he wants to profile a bolt per se that runs in a worker that resides in a different cluster node than the one the profiling tool runs he won't be able to attach the process since it resides in a different physical machine, me thinks (well, now that I think of it better it can be done... via remote debugging but that's just a pain in the ***).
Regards,
A.

On Thu, Mar 5, 2015 at 8:46 PM, Nathan Leung <nc...@gmail.com>> wrote:

You don't need to change your code. As Andrew mentioned you can get a lot of mileage by profiling your logic in a standalone program. For jvisualvm, you can just run your program (a loop that runs for a long time is best) then attach to the running process with jvisualvm.  It's pretty straightforward to use and you can also find good guides with a Google search.
On Mar 5, 2015 1:43 PM, "Andrew Xor" <an...@gmail.com>> wrote:
​
Well...  detecting memory leaks in Java is a bit tricky as Java does a lot for you. Generally though, as long as you avoid using "new" operator and close any resources that you do not use you should be fine... but a Profiler such as the ones mentioned by Nathan will tell you the whole truth. YourKit is awesome and has a free trial, go ahead and test drive it. I am pretty sure that you need a working jar (or compilable code that has a main function in it) in order to profile it, although if you want to profile your bolts and spouts is a bit tricker. Hopefully your algorithm (or portions of it) can be put in a sample test program that is able to be executed locally for you to profile it.
Hope this helped. Regards,
A.
​

On Thu, Mar 5, 2015 at 8:33 PM, Sa Li <sa...@gmail.com>> wrote:

On Thu, Mar 5, 2015 at 10:26 AM, Andrew Xor <an...@gmail.com>> wrote:
Unfortunately that is not fixed, it depends on the computations and data-structures you have; in my case for example I use more than 2GB since I need to keep a large matrix in memory... having said that, in most cases it should be relatively easy to estimate how much memory you are going to need and use that... or if that's not possible you can just increase it and try the "set and see" approach. Check for memory leaks as well... (unclosed resources and so on...!)
Regards.

​A.​

On Thu, Mar 5, 2015 at 8:21 PM, Sa Li <sa...@gmail.com>> wrote:
Thanks, Nathan. How much is should be in general?

On Thu, Mar 5, 2015 at 10:15 AM, Nathan Leung <nc...@gmail.com>> wrote:

Your worker is allocated a maximum of 768mb of heap. It's quite possible that this is not enough. Try increasing Xmx i worker.childopts.
On Mar 5, 2015 1:10 PM, "Sa Li" <sa...@gmail.com>> wrote:
Hi, All

I have been running a trident topology on production server, code is like this:


topology.newStream("spoutInit", kafkaSpout)
                .each(new Fields("str"),
                        new JsonObjectParse(),
                        new Fields("eventType", "event"))
                .parallelismHint(pHint)
                .groupBy(new Fields("event"))
                .persistentAggregate(PostgresqlState.newFactory(config), new Fields("eventType"), new EventUpdater(), new Fields("eventWord"))
        ;

        Config conf = new Config();
        conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 1);

Basically, it does simple things to get data from kafka, parse to different field and write into postgresDB. But in storm UI, I did see such error, "java.lang.OutOfMemoryError: GC overhead limit exceeded". It all happens in same worker of each node - 6703. I understand this is because by default the JVM is configured to throw this error if you are spending more than 98% of the total time in GC and after the GC less than 2% of the heap is recovered.

I am not sure what is exact cause for memory leak, is it OK by simply increase the heap? Here is my storm.yaml:

supervisor.slots.ports:

     - 6700

     - 6701

     - 6702

     - 6703

nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true"

ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"

supervisor.childopts: "-Djava.net.preferIPv4Stack=true"

worker.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"



Anyone has similar issues, and what will be the best way to overcome?





thanks in advance



AL

















________________________________
This message, and any attachments, is for the intended recipient(s) only, may contain information that is privileged, confidential and/or proprietary and subject to important terms and conditions available at http://www.bankofamerica.com/emaildisclaimer. If you are not the intended recipient, please delete this message.


Re: java.lang.OutOfMemoryError: GC overhead limit exceeded

Posted by Binh Nguyen Van <bi...@gmail.com>.
Not sure if you fixed the issue but I think the problem may come from the
max spout pending.
You are using Trident and this value is the max number of pending "BATCHES"
and not number
of "tuples" so let say your topic has 10 partitions and max spout pending
is set to 10 and the max
fetch size is set to 1MB then you will have 10*10*1 = 100MB input data in
your topology at a moment
and this will blow up your heap really quick.
I think go with max spout pending set to 1 and then tune it is better way
to go.

Hope this help
-Binh

On Tue, Mar 10, 2015 at 3:56 AM, Brunner, Bill <bi...@baml.com>
wrote:

>  Once you’ve profiled your app, you should also play around with
> different garbage collectors.  Considering you’re reaching max heap, I
> assume your tuples are probably pretty large.  If that’s the case and
> you’re using the CMS garbage collector, you’re going to blow out your heap
> regularly.  I found with large tuples and/or memory intensive computations
> that the old parallel GC works the best because it compresses old gen every
> time it collects… CMS doesn’t and each sweep it  tries to jam more into the
> heap until it can’t any longer and then blows up.  There is also a great
> article by Michael Knoll about storm’s message buffers and how to tweak
> them depending on your needs.
> http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/
>
>
>
>
>
>
>
> *From:* Sa Li [mailto:sa.in.vanc@gmail.com]
> *Sent:* Monday, March 09, 2015 10:15 PM
> *To:* user@storm.apache.org
> *Subject:* Re: java.lang.OutOfMemoryError: GC overhead limit exceeded
>
>
>
> I have not done that yet, not quite familiar with this, but I will try to
> do that tomorrow, thanks.
>
> On Mar 9, 2015 7:10 PM, "Nathan Leung" <nc...@gmail.com> wrote:
>
> Have you profiled you spout / bolt logic as recommended earlier in this
> thread?
>
>
>
> On Mon, Mar 9, 2015 at 9:49 PM, Sa Li <sa...@gmail.com> wrote:
>
> You are right , I have already increased the heap in yaml to 2 G for each
> worker, but still have the issue, so I doubt I may running into some other
> causes,  receive,send buffer size? And in general, before I see the GC
> overhead in storm ui,  I came cross other errors in worker log as well,
> like Netty connection, null pointer,etc, as I show in another post.
>
> Thanks
>
> On Mar 9, 2015 5:36 PM, "Nathan Leung" <nc...@gmail.com> wrote:
>
> I still think you should try running with a larger heap.  :)  Max spout
> pending determines how many tuples can be pending (tuple tree is not fully
> acked) per spout task.  If you have many spout tasks per worker this can be
> a large amount of memory.  It also depends on how big your tuples are.
>
>
>
> On Mon, Mar 9, 2015 at 6:14 PM, Sa Li <sa...@gmail.com> wrote:
>
> Hi, Nathan
>
>
>
> We have played around max spout pending in dev, if we set it as 10, it is
> OK, but if we set it more than 50, GC overhead starts to come out. We are
> finally writing tuples into postgresqlDB, the highest speed for writing
> into DB is around 40Krecords/minute, which is supposed to be very slow,
> maybe that is why tuples getting accumulated in memory before dumped into
> DB. But I think 10 is too small, does that mean, only 10 tuples are allowed
> in the flight?
>
>
>
> thanks
>
>
>
> AL
>
>
>
> On Fri, Mar 6, 2015 at 7:39 PM, Nathan Leung <nc...@gmail.com> wrote:
>
> I've not modified netty so I can't comment on that.  I would set max spout
> pending; try 1000 at first.  This will limit the number of tuples that you
> can have in flight simultaneously and therefore limit the amount of memory
> used by these tuples and their processing.
>
>
>
> On Fri, Mar 6, 2015 at 7:03 PM, Sa Li <sa...@gmail.com> wrote:
>
> Hi, Nathan
>
>
>
> THe log size of that kafka topic is 23515541, each record is about 3K,  I
> check the yaml file, I don't have max spout pending set, so I assume it
> is should be default: topology.max.spout.pending: null
>
>
>
> Should I set it to a certain value? Also I sometimes seeing the java.nio.channels.ClosedChannelException:
> null, or  b.s.d.worker [ERROR] Error on initialization of server mk-worker
>
> does this mean I should add
>
> storm.messaging.netty.server_worker_threads: 1
>
>
>
> storm.messaging.netty.client_worker_threads: 1
>
> storm.messaging.netty.buffer_size: 5242880 #5MB buffer
>
> storm.messaging.netty.max_retries: 30
>
> storm.messaging.netty.max_wait_ms: 1000
>
>
>
> storm.messaging.netty.min_wait_ms: 100
>
> into yaml, and modfiy the values?
>
>
>
>
>
> thanks
>
>
>
>
>
>
>
> On Fri, Mar 6, 2015 at 2:22 PM, Nathan Leung <nc...@gmail.com> wrote:
>
> How much data do you have in Kafka? How is your max spout pending set? If
> you have a high max spout pending (or if you emit unanchored tuples) you
> could be using up a lot of memory.
>
> On Mar 6, 2015 5:14 PM, "Sa Li" <sa...@gmail.com> wrote:
>
> Hi, Nathan
>
>
>
> I have met a strange issue, when I set spoutConf.forceFromStart=true, it
> will quickly run into GC overhead limit, even I already increase the heap
> size, but I if I remove this setting
>
> it will work fine, I was thinking maybe the kafkaSpout consuming data much
> faster than the data being written into postgresDB, and data will quick
> take the memory and causing heap overflow. But I did the same test on my
> DEV cluster, it will working fine, even I set
> spoutConf.forceFromStart=true. I check the storm config for DEV and
> production, they are all same.
>
>
>
> Any hints?
>
>
>
> thanks
>
>
>
> AL
>
>
>
>
>
> On Thu, Mar 5, 2015 at 3:26 PM, Nathan Leung <nc...@gmail.com> wrote:
>
> I don't see anything glaring.  I would try increasing heap size.  It could
> be that you're right on the threshold of what you've allocated and you just
> need more memory.
>
>
>
> On Thu, Mar 5, 2015 at 5:41 PM, Sa Li <sa...@gmail.com> wrote:
>
> Hi, All,
>
> ,
>
> I kind locate where the problem come from, in my running command, I will
> specify the clientid of TridentKafkaConfig, if I keep the clientid as the
> one I used before, it will cause GC error, otherwise I am completely OK.
> Here is the code:
>
>
>
> *if *(parameters.containsKey(*"clientid"*)) {
>     *logger*.info(*"topic=>" *+ parameters.get(*"clientid"*) + *"/" *+ parameters.get(*"topic"*));
>     spoutConf = *new *TridentKafkaConfig(zk, parameters.get(*"topic"*), parameters.get(*"clientid"*));
>
> Any idea about this error?
>
>
>
> Thanks
>
>
>
> AL
>
>
>
> On Thu, Mar 5, 2015 at 12:02 PM, Sa Li <sa...@gmail.com> wrote:
>
> Sorry, continue last thread:
>
>
>
> 2015-03-05T11:48:08.418-0800 b.s.util [ERROR] Async loop died!
>
> java.lang.RuntimeException: java.lang.RuntimeException: Remote address is
> not reachable. We will close this client Netty-Client-complicated-laugh/
> 10.100.98.103:6703
>
>         at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
> ~[storm-core-0.9.3.jar:0.9.3]
>
>         at
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
> ~[storm-core-0.9.3.jar:0.9.3]
>
>         at
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
> ~[storm-core-0.9.3.jar:0.9.3]
>
>         at
> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
> ~[storm-core-0.9.3.jar:0.9.3]
>
>         at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463)
> ~[storm-core-0.9.3.jar:0.9.3]
>
>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>
>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
>
> Caused by: java.lang.RuntimeException: Remote address is not reachable. We
> will close this client Netty-Client-complicated-laugh/10.100.98.103:6703
>
>         at backtype.storm.messaging.netty.Client.connect(Client.java:171)
> ~[storm-core-0.9.3.jar:0.9.3]
>
>         at backtype.storm.messaging.netty.Client.send(Client.java:194)
> ~[storm-core-0.9.3.jar:0.9.3]
>
>         at
> backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54)
> ~[storm-core-0.9.3.jar:0.9.3]
>
>         at
> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730$fn__3731.invoke(worker.clj:330)
> ~[storm-core-0.9.3.jar:0.9.3]
>
>         at
> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730.invoke(worker.clj:328)
> ~[storm-core-0.9.3.jar:0.9.3]
>
>         at
> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
> ~[storm-core-0.9.3.jar:0.9.3]
>
>         at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
> ~[storm-core-0.9.3.jar:0.9.3]
>
>         ... 6 common frames omitted
>
> 2015-03-05T11:48:08.423-0800 b.s.util [ERROR] Halting process: ("Async
> loop died!")
>
> java.lang.RuntimeException: ("Async loop died!")
>
>         at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325)
> [storm-core-0.9.3.jar:0.9.3]
>
>         at clojure.lang.RestFn.invoke(RestFn.java:423)
> [clojure-1.5.1.jar:na]
>
>         at
> backtype.storm.disruptor$consume_loop_STAR_$fn__1458.invoke(disruptor.clj:92)
> [storm-core-0.9.3.jar:0.9.3]
>
>         at backtype.storm.util$async_loop$fn__464.invoke(util.clj:473)
> [storm-core-0.9.3.jar:0.9.3]
>
>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>
>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
>
> 2015-03-05T11:48:08.425-0800 b.s.d.worker [INFO] Shutting down worker
> eventsStreamerv1-48-1425499636 0673ece0-cea2-4185-9b3e-6c49ad585576 6703
>
> 2015-03-05T11:48:08.426-0800 b.s.m.n.Client [INFO] Closing Netty Client
> Netty-Client-beloved-judge/10.100.98.104:6703
>
>
>
> I doubt this is caused by my eventUpfater, which write data in batch
>
>
>
> *static class *EventUpdater *implements *ReducerAggregator<List<String>> {
>
>             @Override
>             *public *List<String> init(){
>                      *return null*;
>             }
>
>             @Override
>             *public *List<String> reduce(List<String> curr, TridentTuple tuple) {
>                    List<String> updated = *null *;
>
>                    *if *( curr == *null *) {
>                                     String event = (String) tuple.getValue(1);
>                                     System.*out*.println(*"===:" *+ event + *":"*);
>                                     updated = Lists.*newArrayList*(event);
>                    } *else *{
>                                     System.*out*.println(*"===+" *+  tuple + *":"*);
>                                     updated = curr ;
>                    }
>
> *//              System.out.println("(())");              **return *updated ;
>             }
>         }
>
>  How do you think
>
>
>
> THanks
>
>
>
> On Thu, Mar 5, 2015 at 11:57 AM, Sa Li <sa...@gmail.com> wrote:
>
> Thank you very much for the reply, here is error I saw in production
> server worker-6703.log,
>
>
>
>
>
> On Thu, Mar 5, 2015 at 11:31 AM, Nathan Leung <nc...@gmail.com> wrote:
>
> Yeah, then in this case maybe you can install JDK / Yourkit in the remote
> machines and run the tools over X or something.  I'm assuming this is a
> development cluster (not live / production) and that installing debugging
> tools and running remote UIs etc is not a problem.  :)
>
>
>
> On Thu, Mar 5, 2015 at 1:52 PM, Andrew Xor <an...@gmail.com>
> wrote:
>
> Nathan I think that if he wants to profile a bolt per se that runs in a
> worker that resides in a different cluster node than the one the profiling
> tool runs he won't be able to attach the process since it resides in a
> different physical machine, me thinks (well, now that I think of it better
> it can be done... via remote debugging but that's just a pain in the ***).
>
> Regards,
>
> A.
>
>
>
> On Thu, Mar 5, 2015 at 8:46 PM, Nathan Leung <nc...@gmail.com> wrote:
>
> You don't need to change your code. As Andrew mentioned you can get a lot
> of mileage by profiling your logic in a standalone program. For jvisualvm,
> you can just run your program (a loop that runs for a long time is best)
> then attach to the running process with jvisualvm.  It's pretty
> straightforward to use and you can also find good guides with a Google
> search.
>
> On Mar 5, 2015 1:43 PM, "Andrew Xor" <an...@gmail.com> wrote:
>
> ​
>
> Well...  detecting memory leaks in Java is a bit tricky as Java does a lot
> for you. Generally though, as long as you avoid using "new" operator and
> close any resources that you do not use you should be fine... but a
> Profiler such as the ones mentioned by Nathan will tell you the whole
> truth. YourKit is awesome and has a free trial, go ahead and test drive it.
> I am pretty sure that you need a working jar (or compilable code that has a
> main function in it) in order to profile it, although if you want to
> profile your bolts and spouts is a bit tricker. Hopefully your algorithm
> (or portions of it) can be put in a sample test program that is able to be
> executed locally for you to profile it.
>
> Hope this helped. Regards,
>
> A.
>
> ​
>
>
>
> On Thu, Mar 5, 2015 at 8:33 PM, Sa Li <sa...@gmail.com> wrote:
>
>
>
> On Thu, Mar 5, 2015 at 10:26 AM, Andrew Xor <an...@gmail.com>
> wrote:
>
> Unfortunately that is not fixed, it depends on the computations and
> data-structures you have; in my case for example I use more than 2GB since
> I need to keep a large matrix in memory... having said that, in most cases
> it should be relatively easy to estimate how much memory you are going to
> need and use that... or if that's not possible you can just increase it and
> try the "set and see" approach. Check for memory leaks as well... (unclosed
> resources and so on...!)
>
> Regards.
>
>
>
> ​A.​
>
>
>
> On Thu, Mar 5, 2015 at 8:21 PM, Sa Li <sa...@gmail.com> wrote:
>
> Thanks, Nathan. How much is should be in general?
>
>
>
> On Thu, Mar 5, 2015 at 10:15 AM, Nathan Leung <nc...@gmail.com> wrote:
>
> Your worker is allocated a maximum of 768mb of heap. It's quite possible
> that this is not enough. Try increasing Xmx i worker.childopts.
>
> On Mar 5, 2015 1:10 PM, "Sa Li" <sa...@gmail.com> wrote:
>
> Hi, All
>
>
>
> I have been running a trident topology on production server, code is like
> this:
>
>
>
> topology.newStream("spoutInit", kafkaSpout)
>                 .each(new Fields("str"),
>                         new JsonObjectParse(),
>                         new Fields("eventType", "event"))
>                 .parallelismHint(pHint)
>                 .groupBy(new Fields("event"))
>                 .persistentAggregate(PostgresqlState.newFactory(config), new Fields("eventType"), new EventUpdater(), new Fields("eventWord"))
>         ;
>
>         Config conf = new Config();
>         conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 1);
>
> Basically, it does simple things to get data from kafka, parse to different field and write into postgresDB. But in storm UI, I did see such error, "java.lang.OutOfMemoryError: GC overhead limit exceeded". It all happens in same worker of each node - 6703. I understand this is because by default the JVM is configured to throw this error if you are spending more than *98% of the total time in GC and after the GC less than 2% of the heap is recovered*.
>
> I am not sure what is exact cause for memory leak, is it OK by simply increase the heap? Here is my storm.yaml:
>
> supervisor.slots.ports:
>
>      - 6700
>
>      - 6701
>
>      - 6702
>
>      - 6703
>
> nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true"
>
> ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>
> supervisor.childopts: "-Djava.net.preferIPv4Stack=true"
>
> worker.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>
>
>
>  Anyone has similar issues, and what will be the best way to overcome?
>
>
>
>
>
>  thanks in advance
>
>
>
>  AL
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>   ------------------------------
> This message, and any attachments, is for the intended recipient(s) only,
> may contain information that is privileged, confidential and/or proprietary
> and subject to important terms and conditions available at
> http://www.bankofamerica.com/emaildisclaimer. If you are not the intended
> recipient, please delete this message.
>

RE: java.lang.OutOfMemoryError: GC overhead limit exceeded

Posted by "Brunner, Bill" <bi...@baml.com>.
Once you’ve profiled your app, you should also play around with different garbage collectors.  Considering you’re reaching max heap, I assume your tuples are probably pretty large.  If that’s the case and you’re using the CMS garbage collector, you’re going to blow out your heap regularly.  I found with large tuples and/or memory intensive computations that the old parallel GC works the best because it compresses old gen every time it collects… CMS doesn’t and each sweep it  tries to jam more into the heap until it can’t any longer and then blows up.  There is also a great article by Michael Knoll about storm’s message buffers and how to tweak them depending on your needs.  http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/



From: Sa Li [mailto:sa.in.vanc@gmail.com]
Sent: Monday, March 09, 2015 10:15 PM
To: user@storm.apache.org
Subject: Re: java.lang.OutOfMemoryError: GC overhead limit exceeded


I have not done that yet, not quite familiar with this, but I will try to do that tomorrow, thanks.
On Mar 9, 2015 7:10 PM, "Nathan Leung" <nc...@gmail.com>> wrote:
Have you profiled you spout / bolt logic as recommended earlier in this thread?

On Mon, Mar 9, 2015 at 9:49 PM, Sa Li <sa...@gmail.com>> wrote:

You are right , I have already increased the heap in yaml to 2 G for each worker, but still have the issue, so I doubt I may running into some other causes,  receive,send buffer size? And in general, before I see the GC overhead in storm ui,  I came cross other errors in worker log as well, like Netty connection, null pointer,etc, as I show in another post.

Thanks
On Mar 9, 2015 5:36 PM, "Nathan Leung" <nc...@gmail.com>> wrote:
I still think you should try running with a larger heap.  :)  Max spout pending determines how many tuples can be pending (tuple tree is not fully acked) per spout task.  If you have many spout tasks per worker this can be a large amount of memory.  It also depends on how big your tuples are.

On Mon, Mar 9, 2015 at 6:14 PM, Sa Li <sa...@gmail.com>> wrote:
Hi, Nathan

We have played around max spout pending in dev, if we set it as 10, it is OK, but if we set it more than 50, GC overhead starts to come out. We are finally writing tuples into postgresqlDB, the highest speed for writing into DB is around 40Krecords/minute, which is supposed to be very slow, maybe that is why tuples getting accumulated in memory before dumped into DB. But I think 10 is too small, does that mean, only 10 tuples are allowed in the flight?

thanks

AL

On Fri, Mar 6, 2015 at 7:39 PM, Nathan Leung <nc...@gmail.com>> wrote:
I've not modified netty so I can't comment on that.  I would set max spout pending; try 1000 at first.  This will limit the number of tuples that you can have in flight simultaneously and therefore limit the amount of memory used by these tuples and their processing.

On Fri, Mar 6, 2015 at 7:03 PM, Sa Li <sa...@gmail.com>> wrote:
Hi, Nathan

THe log size of that kafka topic is 23515541, each record is about 3K,  I check the yaml file, I don't have max spout pending set, so I assume it is should be default: topology.max.spout.pending: null

Should I set it to a certain value? Also I sometimes seeing the java.nio.channels.ClosedChannelException: null, or  b.s.d.worker [ERROR] Error on initialization of server mk-worker
does this mean I should add
storm.messaging.netty.server_worker_threads: 1




storm.messaging.netty.client_worker_threads: 1



storm.messaging.netty.buffer_size: 5242880 #5MB buffer



storm.messaging.netty.max_retries: 30



storm.messaging.netty.max_wait_ms: 1000



storm.messaging.netty.min_wait_ms: 100

into yaml, and modfiy the values?



thanks



On Fri, Mar 6, 2015 at 2:22 PM, Nathan Leung <nc...@gmail.com>> wrote:

How much data do you have in Kafka? How is your max spout pending set? If you have a high max spout pending (or if you emit unanchored tuples) you could be using up a lot of memory.
On Mar 6, 2015 5:14 PM, "Sa Li" <sa...@gmail.com>> wrote:
Hi, Nathan

I have met a strange issue, when I set spoutConf.forceFromStart=true, it will quickly run into GC overhead limit, even I already increase the heap size, but I if I remove this setting
it will work fine, I was thinking maybe the kafkaSpout consuming data much faster than the data being written into postgresDB, and data will quick take the memory and causing heap overflow. But I did the same test on my DEV cluster, it will working fine, even I set spoutConf.forceFromStart=true. I check the storm config for DEV and production, they are all same.

Any hints?

thanks

AL


On Thu, Mar 5, 2015 at 3:26 PM, Nathan Leung <nc...@gmail.com>> wrote:
I don't see anything glaring.  I would try increasing heap size.  It could be that you're right on the threshold of what you've allocated and you just need more memory.

On Thu, Mar 5, 2015 at 5:41 PM, Sa Li <sa...@gmail.com>> wrote:
Hi, All,
,
I kind locate where the problem come from, in my running command, I will specify the clientid of TridentKafkaConfig, if I keep the clientid as the one I used before, it will cause GC error, otherwise I am completely OK. Here is the code:


if (parameters.containsKey("clientid")) {
    logger.info("topic=>" + parameters.get("clientid") + "/" + parameters.get("topic"));
    spoutConf = new TridentKafkaConfig(zk, parameters.get("topic"), parameters.get("clientid"));

Any idea about this error?



Thanks



AL

On Thu, Mar 5, 2015 at 12:02 PM, Sa Li <sa...@gmail.com>> wrote:
Sorry, continue last thread:

2015-03-05T11:48:08.418-0800 b.s.util [ERROR] Async loop died!
java.lang.RuntimeException: java.lang.RuntimeException: Remote address is not reachable. We will close this client Netty-Client-complicated-laugh/10.100.98.103:6703<http://10.100.98.103:6703>
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94) ~[storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) ~[storm-core-0.9.3.jar:0.9.3]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
Caused by: java.lang.RuntimeException: Remote address is not reachable. We will close this client Netty-Client-complicated-laugh/10.100.98.103:6703<http://10.100.98.103:6703>
        at backtype.storm.messaging.netty.Client.connect(Client.java:171) ~[storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.messaging.netty.Client.send(Client.java:194) ~[storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54) ~[storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730$fn__3731.invoke(worker.clj:330) ~[storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730.invoke(worker.clj:328) ~[storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58) ~[storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.3.jar:0.9.3]
        ... 6 common frames omitted
2015-03-05T11:48:08.423-0800 b.s.util [ERROR] Halting process: ("Async loop died!")
java.lang.RuntimeException: ("Async loop died!")
        at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.3.jar:0.9.3]
        at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
        at backtype.storm.disruptor$consume_loop_STAR_$fn__1458.invoke(disruptor.clj:92) [storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.util$async_loop$fn__464.invoke(util.clj:473) [storm-core-0.9.3.jar:0.9.3]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
2015-03-05T11:48:08.425-0800 b.s.d.worker [INFO] Shutting down worker eventsStreamerv1-48-1425499636 0673ece0-cea2-4185-9b3e-6c49ad585576 6703
2015-03-05T11:48:08.426-0800 b.s.m.n.Client [INFO] Closing Netty Client Netty-Client-beloved-judge/10.100.98.104:6703<http://10.100.98.104:6703>

I doubt this is caused by my eventUpfater, which write data in batch


static class EventUpdater implements ReducerAggregator<List<String>> {

            @Override
            public List<String> init(){
                     return null;
            }

            @Override
            public List<String> reduce(List<String> curr, TridentTuple tuple) {
                   List<String> updated = null ;

                   if ( curr == null ) {
                                    String event = (String) tuple.getValue(1);
                                    System.out.println("===:" + event + ":");
                                    updated = Lists.newArrayList(event);
                   } else {
                                    System.out.println("===+" +  tuple + ":");
                                    updated = curr ;
                   }
//              System.out.println("(())");
              return updated ;
            }
        }
How do you think

THanks

On Thu, Mar 5, 2015 at 11:57 AM, Sa Li <sa...@gmail.com>> wrote:
Thank you very much for the reply, here is error I saw in production server worker-6703.log,


On Thu, Mar 5, 2015 at 11:31 AM, Nathan Leung <nc...@gmail.com>> wrote:
Yeah, then in this case maybe you can install JDK / Yourkit in the remote machines and run the tools over X or something.  I'm assuming this is a development cluster (not live / production) and that installing debugging tools and running remote UIs etc is not a problem.  :)

On Thu, Mar 5, 2015 at 1:52 PM, Andrew Xor <an...@gmail.com>> wrote:
Nathan I think that if he wants to profile a bolt per se that runs in a worker that resides in a different cluster node than the one the profiling tool runs he won't be able to attach the process since it resides in a different physical machine, me thinks (well, now that I think of it better it can be done... via remote debugging but that's just a pain in the ***).
Regards,
A.

On Thu, Mar 5, 2015 at 8:46 PM, Nathan Leung <nc...@gmail.com>> wrote:

You don't need to change your code. As Andrew mentioned you can get a lot of mileage by profiling your logic in a standalone program. For jvisualvm, you can just run your program (a loop that runs for a long time is best) then attach to the running process with jvisualvm.  It's pretty straightforward to use and you can also find good guides with a Google search.
On Mar 5, 2015 1:43 PM, "Andrew Xor" <an...@gmail.com>> wrote:
​
Well...  detecting memory leaks in Java is a bit tricky as Java does a lot for you. Generally though, as long as you avoid using "new" operator and close any resources that you do not use you should be fine... but a Profiler such as the ones mentioned by Nathan will tell you the whole truth. YourKit is awesome and has a free trial, go ahead and test drive it. I am pretty sure that you need a working jar (or compilable code that has a main function in it) in order to profile it, although if you want to profile your bolts and spouts is a bit tricker. Hopefully your algorithm (or portions of it) can be put in a sample test program that is able to be executed locally for you to profile it.
Hope this helped. Regards,
A.
​

On Thu, Mar 5, 2015 at 8:33 PM, Sa Li <sa...@gmail.com>> wrote:

On Thu, Mar 5, 2015 at 10:26 AM, Andrew Xor <an...@gmail.com>> wrote:
Unfortunately that is not fixed, it depends on the computations and data-structures you have; in my case for example I use more than 2GB since I need to keep a large matrix in memory... having said that, in most cases it should be relatively easy to estimate how much memory you are going to need and use that... or if that's not possible you can just increase it and try the "set and see" approach. Check for memory leaks as well... (unclosed resources and so on...!)
Regards.

​A.​

On Thu, Mar 5, 2015 at 8:21 PM, Sa Li <sa...@gmail.com>> wrote:
Thanks, Nathan. How much is should be in general?

On Thu, Mar 5, 2015 at 10:15 AM, Nathan Leung <nc...@gmail.com>> wrote:

Your worker is allocated a maximum of 768mb of heap. It's quite possible that this is not enough. Try increasing Xmx i worker.childopts.
On Mar 5, 2015 1:10 PM, "Sa Li" <sa...@gmail.com>> wrote:
Hi, All

I have been running a trident topology on production server, code is like this:


topology.newStream("spoutInit", kafkaSpout)
                .each(new Fields("str"),
                        new JsonObjectParse(),
                        new Fields("eventType", "event"))
                .parallelismHint(pHint)
                .groupBy(new Fields("event"))
                .persistentAggregate(PostgresqlState.newFactory(config), new Fields("eventType"), new EventUpdater(), new Fields("eventWord"))
        ;

        Config conf = new Config();
        conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 1);

Basically, it does simple things to get data from kafka, parse to different field and write into postgresDB. But in storm UI, I did see such error, "java.lang.OutOfMemoryError: GC overhead limit exceeded". It all happens in same worker of each node - 6703. I understand this is because by default the JVM is configured to throw this error if you are spending more than 98% of the total time in GC and after the GC less than 2% of the heap is recovered.

I am not sure what is exact cause for memory leak, is it OK by simply increase the heap? Here is my storm.yaml:

supervisor.slots.ports:

     - 6700

     - 6701

     - 6702

     - 6703

nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true"

ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"

supervisor.childopts: "-Djava.net.preferIPv4Stack=true"

worker.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"



Anyone has similar issues, and what will be the best way to overcome?





thanks in advance



AL


















----------------------------------------------------------------------
This message, and any attachments, is for the intended recipient(s) only, may contain information that is privileged, confidential and/or proprietary and subject to important terms and conditions available at http://www.bankofamerica.com/emaildisclaimer.   If you are not the intended recipient, please delete this message.

Re: java.lang.OutOfMemoryError: GC overhead limit exceeded

Posted by Sa Li <sa...@gmail.com>.
I have not done that yet, not quite familiar with this, but I will try to
do that tomorrow, thanks.
On Mar 9, 2015 7:10 PM, "Nathan Leung" <nc...@gmail.com> wrote:

> Have you profiled you spout / bolt logic as recommended earlier in this
> thread?
>
> On Mon, Mar 9, 2015 at 9:49 PM, Sa Li <sa...@gmail.com> wrote:
>
>> You are right , I have already increased the heap in yaml to 2 G for each
>> worker, but still have the issue, so I doubt I may running into some other
>> causes,  receive,send buffer size? And in general, before I see the GC
>> overhead in storm ui,  I came cross other errors in worker log as well,
>> like Netty connection, null pointer,etc, as I show in another post.
>>
>> Thanks
>> On Mar 9, 2015 5:36 PM, "Nathan Leung" <nc...@gmail.com> wrote:
>>
>>> I still think you should try running with a larger heap.  :)  Max spout
>>> pending determines how many tuples can be pending (tuple tree is not fully
>>> acked) per spout task.  If you have many spout tasks per worker this can be
>>> a large amount of memory.  It also depends on how big your tuples are.
>>>
>>> On Mon, Mar 9, 2015 at 6:14 PM, Sa Li <sa...@gmail.com> wrote:
>>>
>>>> Hi, Nathan
>>>>
>>>> We have played around max spout pending in dev, if we set it as 10, it
>>>> is OK, but if we set it more than 50, GC overhead starts to come out. We
>>>> are finally writing tuples into postgresqlDB, the highest speed for writing
>>>> into DB is around 40Krecords/minute, which is supposed to be very slow,
>>>> maybe that is why tuples getting accumulated in memory before dumped into
>>>> DB. But I think 10 is too small, does that mean, only 10 tuples are allowed
>>>> in the flight?
>>>>
>>>> thanks
>>>>
>>>> AL
>>>>
>>>> On Fri, Mar 6, 2015 at 7:39 PM, Nathan Leung <nc...@gmail.com> wrote:
>>>>
>>>>> I've not modified netty so I can't comment on that.  I would set max
>>>>> spout pending; try 1000 at first.  This will limit the number of tuples
>>>>> that you can have in flight simultaneously and therefore limit the amount
>>>>> of memory used by these tuples and their processing.
>>>>>
>>>>> On Fri, Mar 6, 2015 at 7:03 PM, Sa Li <sa...@gmail.com> wrote:
>>>>>
>>>>>> Hi, Nathan
>>>>>>
>>>>>> THe log size of that kafka topic is 23515541, each record is about
>>>>>> 3K,  I check the yaml file, I don't have max spout pending set, so I
>>>>>> assume it is should be default: topology.max.spout.pending: null
>>>>>>
>>>>>> Should I set it to a certain value? Also I sometimes seeing the
>>>>>> java.nio.channels.ClosedChannelException: null, or  b.s.d.worker
>>>>>> [ERROR] Error on initialization of server mk-worker
>>>>>> does this mean I should add
>>>>>> storm.messaging.netty.server_worker_threads: 1
>>>>>> storm.messaging.netty.client_worker_threads: 1
>>>>>> storm.messaging.netty.buffer_size: 5242880 #5MB buffer
>>>>>> storm.messaging.netty.max_retries: 30
>>>>>> storm.messaging.netty.max_wait_ms: 1000
>>>>>> storm.messaging.netty.min_wait_ms: 100
>>>>>>
>>>>>> into yaml, and modfiy the values?
>>>>>>
>>>>>>
>>>>>>
>>>>>> thanks
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Mar 6, 2015 at 2:22 PM, Nathan Leung <nc...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> How much data do you have in Kafka? How is your max spout pending
>>>>>>> set? If you have a high max spout pending (or if you emit unanchored
>>>>>>> tuples) you could be using up a lot of memory.
>>>>>>> On Mar 6, 2015 5:14 PM, "Sa Li" <sa...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi, Nathan
>>>>>>>>
>>>>>>>> I have met a strange issue, when I set
>>>>>>>> spoutConf.forceFromStart=true, it will quickly run into GC overhead limit,
>>>>>>>> even I already increase the heap size, but I if I remove this setting
>>>>>>>> it will work fine, I was thinking maybe the kafkaSpout consuming
>>>>>>>> data much faster than the data being written into postgresDB, and data will
>>>>>>>> quick take the memory and causing heap overflow. But I did the same test on
>>>>>>>> my DEV cluster, it will working fine, even I set
>>>>>>>> spoutConf.forceFromStart=true. I check the storm config for DEV and
>>>>>>>> production, they are all same.
>>>>>>>>
>>>>>>>> Any hints?
>>>>>>>>
>>>>>>>> thanks
>>>>>>>>
>>>>>>>> AL
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Mar 5, 2015 at 3:26 PM, Nathan Leung <nc...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I don't see anything glaring.  I would try increasing heap size.
>>>>>>>>> It could be that you're right on the threshold of what you've allocated and
>>>>>>>>> you just need more memory.
>>>>>>>>>
>>>>>>>>> On Thu, Mar 5, 2015 at 5:41 PM, Sa Li <sa...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi, All,
>>>>>>>>>> ,
>>>>>>>>>> I kind locate where the problem come from, in my running command,
>>>>>>>>>> I will specify the clientid of TridentKafkaConfig, if I keep the clientid
>>>>>>>>>> as the one I used before, it will cause GC error, otherwise I am completely
>>>>>>>>>> OK. Here is the code:
>>>>>>>>>>
>>>>>>>>>> if (parameters.containsKey("clientid")) {
>>>>>>>>>>     logger.info("topic=>" + parameters.get("clientid") + "/" + parameters.get("topic"));
>>>>>>>>>>     spoutConf = new TridentKafkaConfig(zk, parameters.get("topic"), parameters.get("clientid"));
>>>>>>>>>>
>>>>>>>>>> Any idea about this error?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> AL
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Mar 5, 2015 at 12:02 PM, Sa Li <sa...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Sorry, continue last thread:
>>>>>>>>>>>
>>>>>>>>>>> 2015-03-05T11:48:08.418-0800 b.s.util [ERROR] Async loop died!
>>>>>>>>>>> java.lang.RuntimeException: java.lang.RuntimeException: Remote
>>>>>>>>>>> address is not reachable. We will close this client
>>>>>>>>>>> Netty-Client-complicated-laugh/10.100.98.103:6703
>>>>>>>>>>>         at
>>>>>>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>>>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>>>         at
>>>>>>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
>>>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>>>         at
>>>>>>>>>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>>>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>>>         at
>>>>>>>>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
>>>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>>>         at
>>>>>>>>>>> backtype.storm.util$async_loop$fn__464.invoke(util.clj:463)
>>>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>>>         at clojure.lang.AFn.run(AFn.java:24)
>>>>>>>>>>> [clojure-1.5.1.jar:na]
>>>>>>>>>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
>>>>>>>>>>> Caused by: java.lang.RuntimeException: Remote address is not
>>>>>>>>>>> reachable. We will close this client Netty-Client-complicated-laugh/
>>>>>>>>>>> 10.100.98.103:6703
>>>>>>>>>>>         at
>>>>>>>>>>> backtype.storm.messaging.netty.Client.connect(Client.java:171)
>>>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>>>         at
>>>>>>>>>>> backtype.storm.messaging.netty.Client.send(Client.java:194)
>>>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>>>         at
>>>>>>>>>>> backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54)
>>>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>>>         at
>>>>>>>>>>> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730$fn__3731.invoke(worker.clj:330)
>>>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>>>         at
>>>>>>>>>>> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730.invoke(worker.clj:328)
>>>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>>>         at
>>>>>>>>>>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
>>>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>>>         at
>>>>>>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
>>>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>>>         ... 6 common frames omitted
>>>>>>>>>>> 2015-03-05T11:48:08.423-0800 b.s.util [ERROR] Halting process:
>>>>>>>>>>> ("Async loop died!")
>>>>>>>>>>> java.lang.RuntimeException: ("Async loop died!")
>>>>>>>>>>>         at
>>>>>>>>>>> backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325)
>>>>>>>>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>>>         at clojure.lang.RestFn.invoke(RestFn.java:423)
>>>>>>>>>>> [clojure-1.5.1.jar:na]
>>>>>>>>>>>         at
>>>>>>>>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1458.invoke(disruptor.clj:92)
>>>>>>>>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>>>         at
>>>>>>>>>>> backtype.storm.util$async_loop$fn__464.invoke(util.clj:473)
>>>>>>>>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>>>         at clojure.lang.AFn.run(AFn.java:24)
>>>>>>>>>>> [clojure-1.5.1.jar:na]
>>>>>>>>>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
>>>>>>>>>>> 2015-03-05T11:48:08.425-0800 b.s.d.worker [INFO] Shutting down
>>>>>>>>>>> worker eventsStreamerv1-48-1425499636 0673ece0-cea2-4185-9b3e-6c49ad585576
>>>>>>>>>>> 6703
>>>>>>>>>>> 2015-03-05T11:48:08.426-0800 b.s.m.n.Client [INFO] Closing Netty
>>>>>>>>>>> Client Netty-Client-beloved-judge/10.100.98.104:6703
>>>>>>>>>>>
>>>>>>>>>>> I doubt this is caused by my eventUpfater, which write data in
>>>>>>>>>>> batch
>>>>>>>>>>>
>>>>>>>>>>> static class EventUpdater implements ReducerAggregator<List<String>> {
>>>>>>>>>>>
>>>>>>>>>>>             @Override
>>>>>>>>>>>             public List<String> init(){
>>>>>>>>>>>                      return null;
>>>>>>>>>>>             }
>>>>>>>>>>>
>>>>>>>>>>>             @Override
>>>>>>>>>>>             public List<String> reduce(List<String> curr, TridentTuple tuple) {
>>>>>>>>>>>                    List<String> updated = null ;
>>>>>>>>>>>
>>>>>>>>>>>                    if ( curr == null ) {
>>>>>>>>>>>                                     String event = (String) tuple.getValue(1);
>>>>>>>>>>>                                     System.out.println("===:" + event + ":");
>>>>>>>>>>>                                     updated = Lists.newArrayList(event);
>>>>>>>>>>>                    } else {
>>>>>>>>>>>                                     System.out.println("===+" +  tuple + ":");
>>>>>>>>>>>                                     updated = curr ;
>>>>>>>>>>>                    }
>>>>>>>>>>> //              System.out.println("(())");
>>>>>>>>>>>               return updated ;
>>>>>>>>>>>             }
>>>>>>>>>>>         }
>>>>>>>>>>>
>>>>>>>>>>> How do you think
>>>>>>>>>>>
>>>>>>>>>>> THanks
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Mar 5, 2015 at 11:57 AM, Sa Li <sa...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thank you very much for the reply, here is error I saw in
>>>>>>>>>>>> production server worker-6703.log,
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Mar 5, 2015 at 11:31 AM, Nathan Leung <
>>>>>>>>>>>> ncleung@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Yeah, then in this case maybe you can install JDK / Yourkit in
>>>>>>>>>>>>> the remote machines and run the tools over X or something.  I'm assuming
>>>>>>>>>>>>> this is a development cluster (not live / production) and that installing
>>>>>>>>>>>>> debugging tools and running remote UIs etc is not a problem.  :)
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 1:52 PM, Andrew Xor <
>>>>>>>>>>>>> andreas.grammenos@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Nathan I think that if he wants to profile a bolt per se that
>>>>>>>>>>>>>> runs in a worker that resides in a different cluster node than the one the
>>>>>>>>>>>>>> profiling tool runs he won't be able to attach the process since it resides
>>>>>>>>>>>>>> in a different physical machine, me thinks (well, now that I think of it
>>>>>>>>>>>>>> better it can be done... via remote debugging but that's just a pain in the
>>>>>>>>>>>>>> ***).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> A.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 8:46 PM, Nathan Leung <
>>>>>>>>>>>>>> ncleung@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> You don't need to change your code. As Andrew mentioned you
>>>>>>>>>>>>>>> can get a lot of mileage by profiling your logic in a standalone program.
>>>>>>>>>>>>>>> For jvisualvm, you can just run your program (a loop that runs for a long
>>>>>>>>>>>>>>> time is best) then attach to the running process with jvisualvm.  It's
>>>>>>>>>>>>>>> pretty straightforward to use and you can also find good guides with a
>>>>>>>>>>>>>>> Google search.
>>>>>>>>>>>>>>> On Mar 5, 2015 1:43 PM, "Andrew Xor" <
>>>>>>>>>>>>>>> andreas.grammenos@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> ​
>>>>>>>>>>>>>>>> Well...  detecting memory leaks in Java is a bit tricky as
>>>>>>>>>>>>>>>> Java does a lot for you. Generally though, as long as you avoid using "new"
>>>>>>>>>>>>>>>> operator and close any resources that you do not use you should be fine...
>>>>>>>>>>>>>>>> but a Profiler such as the ones mentioned by Nathan will tell you the whole
>>>>>>>>>>>>>>>> truth. YourKit is awesome and has a free trial, go ahead and test drive it.
>>>>>>>>>>>>>>>> I am pretty sure that you need a working jar (or compilable code that has a
>>>>>>>>>>>>>>>> main function in it) in order to profile it, although if you want to
>>>>>>>>>>>>>>>> profile your bolts and spouts is a bit tricker. Hopefully your algorithm
>>>>>>>>>>>>>>>> (or portions of it) can be put in a sample test program that is able to be
>>>>>>>>>>>>>>>> executed locally for you to profile it.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hope this helped. Regards,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> A.
>>>>>>>>>>>>>>>> ​
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 8:33 PM, Sa Li <sa.in.vanc@gmail.com
>>>>>>>>>>>>>>>> > wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 10:26 AM, Andrew Xor <
>>>>>>>>>>>>>>>>> andreas.grammenos@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Unfortunately that is not fixed, it depends on the
>>>>>>>>>>>>>>>>>> computations and data-structures you have; in my case for example I use
>>>>>>>>>>>>>>>>>> more than 2GB since I need to keep a large matrix in memory... having said
>>>>>>>>>>>>>>>>>> that, in most cases it should be relatively easy to estimate how much
>>>>>>>>>>>>>>>>>> memory you are going to need and use that... or if that's not possible you
>>>>>>>>>>>>>>>>>> can just increase it and try the "set and see" approach. Check for memory
>>>>>>>>>>>>>>>>>> leaks as well... (unclosed resources and so on...!)
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Regards.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> ​A.​
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 8:21 PM, Sa Li <
>>>>>>>>>>>>>>>>>> sa.in.vanc@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks, Nathan. How much is should be in general?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 10:15 AM, Nathan Leung <
>>>>>>>>>>>>>>>>>>> ncleung@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Your worker is allocated a maximum of 768mb of heap.
>>>>>>>>>>>>>>>>>>>> It's quite possible that this is not enough. Try increasing Xmx i
>>>>>>>>>>>>>>>>>>>> worker.childopts.
>>>>>>>>>>>>>>>>>>>> On Mar 5, 2015 1:10 PM, "Sa Li" <sa...@gmail.com>
>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Hi, All
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I have been running a trident topology on production
>>>>>>>>>>>>>>>>>>>>> server, code is like this:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> topology.newStream("spoutInit", kafkaSpout)
>>>>>>>>>>>>>>>>>>>>>                 .each(new Fields("str"),
>>>>>>>>>>>>>>>>>>>>>                         new JsonObjectParse(),
>>>>>>>>>>>>>>>>>>>>>                         new Fields("eventType", "event"))
>>>>>>>>>>>>>>>>>>>>>                 .parallelismHint(pHint)
>>>>>>>>>>>>>>>>>>>>>                 .groupBy(new Fields("event"))
>>>>>>>>>>>>>>>>>>>>>                 .persistentAggregate(PostgresqlState.newFactory(config), new Fields("eventType"), new EventUpdater(), new Fields("eventWord"))
>>>>>>>>>>>>>>>>>>>>>         ;
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>         Config conf = new Config();
>>>>>>>>>>>>>>>>>>>>>         conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 1);
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Basically, it does simple things to get data from kafka, parse to different field and write into postgresDB. But in storm UI, I did see such error, "java.lang.OutOfMemoryError: GC overhead limit exceeded". It all happens in same worker of each node - 6703. I understand this is because by default the JVM is configured to throw this error if you are spending more than *98% of the total time in GC and after the GC less than 2% of the heap is recovered*.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I am not sure what is exact cause for memory leak, is it OK by simply increase the heap? Here is my storm.yaml:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> supervisor.slots.ports:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>      - 6700
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>      - 6701
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>      - 6702
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>      - 6703
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> supervisor.childopts: "-Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> worker.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Anyone has similar issues, and what will be the best
>>>>>>>>>>>>>>>>>>>>> way to overcome?
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> thanks in advance
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> AL
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>

Re: java.lang.OutOfMemoryError: GC overhead limit exceeded

Posted by Nathan Leung <nc...@gmail.com>.
Have you profiled you spout / bolt logic as recommended earlier in this
thread?

On Mon, Mar 9, 2015 at 9:49 PM, Sa Li <sa...@gmail.com> wrote:

> You are right , I have already increased the heap in yaml to 2 G for each
> worker, but still have the issue, so I doubt I may running into some other
> causes,  receive,send buffer size? And in general, before I see the GC
> overhead in storm ui,  I came cross other errors in worker log as well,
> like Netty connection, null pointer,etc, as I show in another post.
>
> Thanks
> On Mar 9, 2015 5:36 PM, "Nathan Leung" <nc...@gmail.com> wrote:
>
>> I still think you should try running with a larger heap.  :)  Max spout
>> pending determines how many tuples can be pending (tuple tree is not fully
>> acked) per spout task.  If you have many spout tasks per worker this can be
>> a large amount of memory.  It also depends on how big your tuples are.
>>
>> On Mon, Mar 9, 2015 at 6:14 PM, Sa Li <sa...@gmail.com> wrote:
>>
>>> Hi, Nathan
>>>
>>> We have played around max spout pending in dev, if we set it as 10, it
>>> is OK, but if we set it more than 50, GC overhead starts to come out. We
>>> are finally writing tuples into postgresqlDB, the highest speed for writing
>>> into DB is around 40Krecords/minute, which is supposed to be very slow,
>>> maybe that is why tuples getting accumulated in memory before dumped into
>>> DB. But I think 10 is too small, does that mean, only 10 tuples are allowed
>>> in the flight?
>>>
>>> thanks
>>>
>>> AL
>>>
>>> On Fri, Mar 6, 2015 at 7:39 PM, Nathan Leung <nc...@gmail.com> wrote:
>>>
>>>> I've not modified netty so I can't comment on that.  I would set max
>>>> spout pending; try 1000 at first.  This will limit the number of tuples
>>>> that you can have in flight simultaneously and therefore limit the amount
>>>> of memory used by these tuples and their processing.
>>>>
>>>> On Fri, Mar 6, 2015 at 7:03 PM, Sa Li <sa...@gmail.com> wrote:
>>>>
>>>>> Hi, Nathan
>>>>>
>>>>> THe log size of that kafka topic is 23515541, each record is about 3K,
>>>>>  I check the yaml file, I don't have max spout pending set, so I
>>>>> assume it is should be default: topology.max.spout.pending: null
>>>>>
>>>>> Should I set it to a certain value? Also I sometimes seeing the
>>>>> java.nio.channels.ClosedChannelException: null, or  b.s.d.worker
>>>>> [ERROR] Error on initialization of server mk-worker
>>>>> does this mean I should add
>>>>> storm.messaging.netty.server_worker_threads: 1
>>>>> storm.messaging.netty.client_worker_threads: 1
>>>>> storm.messaging.netty.buffer_size: 5242880 #5MB buffer
>>>>> storm.messaging.netty.max_retries: 30
>>>>> storm.messaging.netty.max_wait_ms: 1000
>>>>> storm.messaging.netty.min_wait_ms: 100
>>>>>
>>>>> into yaml, and modfiy the values?
>>>>>
>>>>>
>>>>>
>>>>> thanks
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Mar 6, 2015 at 2:22 PM, Nathan Leung <nc...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> How much data do you have in Kafka? How is your max spout pending
>>>>>> set? If you have a high max spout pending (or if you emit unanchored
>>>>>> tuples) you could be using up a lot of memory.
>>>>>> On Mar 6, 2015 5:14 PM, "Sa Li" <sa...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi, Nathan
>>>>>>>
>>>>>>> I have met a strange issue, when I set
>>>>>>> spoutConf.forceFromStart=true, it will quickly run into GC overhead limit,
>>>>>>> even I already increase the heap size, but I if I remove this setting
>>>>>>> it will work fine, I was thinking maybe the kafkaSpout consuming
>>>>>>> data much faster than the data being written into postgresDB, and data will
>>>>>>> quick take the memory and causing heap overflow. But I did the same test on
>>>>>>> my DEV cluster, it will working fine, even I set
>>>>>>> spoutConf.forceFromStart=true. I check the storm config for DEV and
>>>>>>> production, they are all same.
>>>>>>>
>>>>>>> Any hints?
>>>>>>>
>>>>>>> thanks
>>>>>>>
>>>>>>> AL
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Mar 5, 2015 at 3:26 PM, Nathan Leung <nc...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I don't see anything glaring.  I would try increasing heap size.
>>>>>>>> It could be that you're right on the threshold of what you've allocated and
>>>>>>>> you just need more memory.
>>>>>>>>
>>>>>>>> On Thu, Mar 5, 2015 at 5:41 PM, Sa Li <sa...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi, All,
>>>>>>>>> ,
>>>>>>>>> I kind locate where the problem come from, in my running command,
>>>>>>>>> I will specify the clientid of TridentKafkaConfig, if I keep the clientid
>>>>>>>>> as the one I used before, it will cause GC error, otherwise I am completely
>>>>>>>>> OK. Here is the code:
>>>>>>>>>
>>>>>>>>> if (parameters.containsKey("clientid")) {
>>>>>>>>>     logger.info("topic=>" + parameters.get("clientid") + "/" + parameters.get("topic"));
>>>>>>>>>     spoutConf = new TridentKafkaConfig(zk, parameters.get("topic"), parameters.get("clientid"));
>>>>>>>>>
>>>>>>>>> Any idea about this error?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> AL
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Mar 5, 2015 at 12:02 PM, Sa Li <sa...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Sorry, continue last thread:
>>>>>>>>>>
>>>>>>>>>> 2015-03-05T11:48:08.418-0800 b.s.util [ERROR] Async loop died!
>>>>>>>>>> java.lang.RuntimeException: java.lang.RuntimeException: Remote
>>>>>>>>>> address is not reachable. We will close this client
>>>>>>>>>> Netty-Client-complicated-laugh/10.100.98.103:6703
>>>>>>>>>>         at
>>>>>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>>         at
>>>>>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
>>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>>         at
>>>>>>>>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>>         at
>>>>>>>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
>>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>>         at
>>>>>>>>>> backtype.storm.util$async_loop$fn__464.invoke(util.clj:463)
>>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>>         at clojure.lang.AFn.run(AFn.java:24)
>>>>>>>>>> [clojure-1.5.1.jar:na]
>>>>>>>>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
>>>>>>>>>> Caused by: java.lang.RuntimeException: Remote address is not
>>>>>>>>>> reachable. We will close this client Netty-Client-complicated-laugh/
>>>>>>>>>> 10.100.98.103:6703
>>>>>>>>>>         at
>>>>>>>>>> backtype.storm.messaging.netty.Client.connect(Client.java:171)
>>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>>         at
>>>>>>>>>> backtype.storm.messaging.netty.Client.send(Client.java:194)
>>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>>         at
>>>>>>>>>> backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54)
>>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>>         at
>>>>>>>>>> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730$fn__3731.invoke(worker.clj:330)
>>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>>         at
>>>>>>>>>> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730.invoke(worker.clj:328)
>>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>>         at
>>>>>>>>>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
>>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>>         at
>>>>>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
>>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>>         ... 6 common frames omitted
>>>>>>>>>> 2015-03-05T11:48:08.423-0800 b.s.util [ERROR] Halting process:
>>>>>>>>>> ("Async loop died!")
>>>>>>>>>> java.lang.RuntimeException: ("Async loop died!")
>>>>>>>>>>         at
>>>>>>>>>> backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325)
>>>>>>>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>>         at clojure.lang.RestFn.invoke(RestFn.java:423)
>>>>>>>>>> [clojure-1.5.1.jar:na]
>>>>>>>>>>         at
>>>>>>>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1458.invoke(disruptor.clj:92)
>>>>>>>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>>         at
>>>>>>>>>> backtype.storm.util$async_loop$fn__464.invoke(util.clj:473)
>>>>>>>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>>         at clojure.lang.AFn.run(AFn.java:24)
>>>>>>>>>> [clojure-1.5.1.jar:na]
>>>>>>>>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
>>>>>>>>>> 2015-03-05T11:48:08.425-0800 b.s.d.worker [INFO] Shutting down
>>>>>>>>>> worker eventsStreamerv1-48-1425499636 0673ece0-cea2-4185-9b3e-6c49ad585576
>>>>>>>>>> 6703
>>>>>>>>>> 2015-03-05T11:48:08.426-0800 b.s.m.n.Client [INFO] Closing Netty
>>>>>>>>>> Client Netty-Client-beloved-judge/10.100.98.104:6703
>>>>>>>>>>
>>>>>>>>>> I doubt this is caused by my eventUpfater, which write data in
>>>>>>>>>> batch
>>>>>>>>>>
>>>>>>>>>> static class EventUpdater implements ReducerAggregator<List<String>> {
>>>>>>>>>>
>>>>>>>>>>             @Override
>>>>>>>>>>             public List<String> init(){
>>>>>>>>>>                      return null;
>>>>>>>>>>             }
>>>>>>>>>>
>>>>>>>>>>             @Override
>>>>>>>>>>             public List<String> reduce(List<String> curr, TridentTuple tuple) {
>>>>>>>>>>                    List<String> updated = null ;
>>>>>>>>>>
>>>>>>>>>>                    if ( curr == null ) {
>>>>>>>>>>                                     String event = (String) tuple.getValue(1);
>>>>>>>>>>                                     System.out.println("===:" + event + ":");
>>>>>>>>>>                                     updated = Lists.newArrayList(event);
>>>>>>>>>>                    } else {
>>>>>>>>>>                                     System.out.println("===+" +  tuple + ":");
>>>>>>>>>>                                     updated = curr ;
>>>>>>>>>>                    }
>>>>>>>>>> //              System.out.println("(())");
>>>>>>>>>>               return updated ;
>>>>>>>>>>             }
>>>>>>>>>>         }
>>>>>>>>>>
>>>>>>>>>> How do you think
>>>>>>>>>>
>>>>>>>>>> THanks
>>>>>>>>>>
>>>>>>>>>> On Thu, Mar 5, 2015 at 11:57 AM, Sa Li <sa...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thank you very much for the reply, here is error I saw in
>>>>>>>>>>> production server worker-6703.log,
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Mar 5, 2015 at 11:31 AM, Nathan Leung <ncleung@gmail.com
>>>>>>>>>>> > wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Yeah, then in this case maybe you can install JDK / Yourkit in
>>>>>>>>>>>> the remote machines and run the tools over X or something.  I'm assuming
>>>>>>>>>>>> this is a development cluster (not live / production) and that installing
>>>>>>>>>>>> debugging tools and running remote UIs etc is not a problem.  :)
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Mar 5, 2015 at 1:52 PM, Andrew Xor <
>>>>>>>>>>>> andreas.grammenos@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Nathan I think that if he wants to profile a bolt per se that
>>>>>>>>>>>>> runs in a worker that resides in a different cluster node than the one the
>>>>>>>>>>>>> profiling tool runs he won't be able to attach the process since it resides
>>>>>>>>>>>>> in a different physical machine, me thinks (well, now that I think of it
>>>>>>>>>>>>> better it can be done... via remote debugging but that's just a pain in the
>>>>>>>>>>>>> ***).
>>>>>>>>>>>>>
>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>
>>>>>>>>>>>>> A.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 8:46 PM, Nathan Leung <
>>>>>>>>>>>>> ncleung@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> You don't need to change your code. As Andrew mentioned you
>>>>>>>>>>>>>> can get a lot of mileage by profiling your logic in a standalone program.
>>>>>>>>>>>>>> For jvisualvm, you can just run your program (a loop that runs for a long
>>>>>>>>>>>>>> time is best) then attach to the running process with jvisualvm.  It's
>>>>>>>>>>>>>> pretty straightforward to use and you can also find good guides with a
>>>>>>>>>>>>>> Google search.
>>>>>>>>>>>>>> On Mar 5, 2015 1:43 PM, "Andrew Xor" <
>>>>>>>>>>>>>> andreas.grammenos@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> ​
>>>>>>>>>>>>>>> Well...  detecting memory leaks in Java is a bit tricky as
>>>>>>>>>>>>>>> Java does a lot for you. Generally though, as long as you avoid using "new"
>>>>>>>>>>>>>>> operator and close any resources that you do not use you should be fine...
>>>>>>>>>>>>>>> but a Profiler such as the ones mentioned by Nathan will tell you the whole
>>>>>>>>>>>>>>> truth. YourKit is awesome and has a free trial, go ahead and test drive it.
>>>>>>>>>>>>>>> I am pretty sure that you need a working jar (or compilable code that has a
>>>>>>>>>>>>>>> main function in it) in order to profile it, although if you want to
>>>>>>>>>>>>>>> profile your bolts and spouts is a bit tricker. Hopefully your algorithm
>>>>>>>>>>>>>>> (or portions of it) can be put in a sample test program that is able to be
>>>>>>>>>>>>>>> executed locally for you to profile it.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hope this helped. Regards,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> A.
>>>>>>>>>>>>>>> ​
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 8:33 PM, Sa Li <sa...@gmail.com>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 10:26 AM, Andrew Xor <
>>>>>>>>>>>>>>>> andreas.grammenos@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Unfortunately that is not fixed, it depends on the
>>>>>>>>>>>>>>>>> computations and data-structures you have; in my case for example I use
>>>>>>>>>>>>>>>>> more than 2GB since I need to keep a large matrix in memory... having said
>>>>>>>>>>>>>>>>> that, in most cases it should be relatively easy to estimate how much
>>>>>>>>>>>>>>>>> memory you are going to need and use that... or if that's not possible you
>>>>>>>>>>>>>>>>> can just increase it and try the "set and see" approach. Check for memory
>>>>>>>>>>>>>>>>> leaks as well... (unclosed resources and so on...!)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Regards.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> ​A.​
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 8:21 PM, Sa Li <
>>>>>>>>>>>>>>>>> sa.in.vanc@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks, Nathan. How much is should be in general?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 10:15 AM, Nathan Leung <
>>>>>>>>>>>>>>>>>> ncleung@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Your worker is allocated a maximum of 768mb of heap.
>>>>>>>>>>>>>>>>>>> It's quite possible that this is not enough. Try increasing Xmx i
>>>>>>>>>>>>>>>>>>> worker.childopts.
>>>>>>>>>>>>>>>>>>> On Mar 5, 2015 1:10 PM, "Sa Li" <sa...@gmail.com>
>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Hi, All
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I have been running a trident topology on production
>>>>>>>>>>>>>>>>>>>> server, code is like this:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> topology.newStream("spoutInit", kafkaSpout)
>>>>>>>>>>>>>>>>>>>>                 .each(new Fields("str"),
>>>>>>>>>>>>>>>>>>>>                         new JsonObjectParse(),
>>>>>>>>>>>>>>>>>>>>                         new Fields("eventType", "event"))
>>>>>>>>>>>>>>>>>>>>                 .parallelismHint(pHint)
>>>>>>>>>>>>>>>>>>>>                 .groupBy(new Fields("event"))
>>>>>>>>>>>>>>>>>>>>                 .persistentAggregate(PostgresqlState.newFactory(config), new Fields("eventType"), new EventUpdater(), new Fields("eventWord"))
>>>>>>>>>>>>>>>>>>>>         ;
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>         Config conf = new Config();
>>>>>>>>>>>>>>>>>>>>         conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 1);
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Basically, it does simple things to get data from kafka, parse to different field and write into postgresDB. But in storm UI, I did see such error, "java.lang.OutOfMemoryError: GC overhead limit exceeded". It all happens in same worker of each node - 6703. I understand this is because by default the JVM is configured to throw this error if you are spending more than *98% of the total time in GC and after the GC less than 2% of the heap is recovered*.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I am not sure what is exact cause for memory leak, is it OK by simply increase the heap? Here is my storm.yaml:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> supervisor.slots.ports:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>      - 6700
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>      - 6701
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>      - 6702
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>      - 6703
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> supervisor.childopts: "-Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> worker.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Anyone has similar issues, and what will be the best
>>>>>>>>>>>>>>>>>>>> way to overcome?
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> thanks in advance
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> AL
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>>
>>>
>>

Re: java.lang.OutOfMemoryError: GC overhead limit exceeded

Posted by Sa Li <sa...@gmail.com>.
You are right , I have already increased the heap in yaml to 2 G for each
worker, but still have the issue, so I doubt I may running into some other
causes,  receive,send buffer size? And in general, before I see the GC
overhead in storm ui,  I came cross other errors in worker log as well,
like Netty connection, null pointer,etc, as I show in another post.

Thanks
On Mar 9, 2015 5:36 PM, "Nathan Leung" <nc...@gmail.com> wrote:

> I still think you should try running with a larger heap.  :)  Max spout
> pending determines how many tuples can be pending (tuple tree is not fully
> acked) per spout task.  If you have many spout tasks per worker this can be
> a large amount of memory.  It also depends on how big your tuples are.
>
> On Mon, Mar 9, 2015 at 6:14 PM, Sa Li <sa...@gmail.com> wrote:
>
>> Hi, Nathan
>>
>> We have played around max spout pending in dev, if we set it as 10, it is
>> OK, but if we set it more than 50, GC overhead starts to come out. We are
>> finally writing tuples into postgresqlDB, the highest speed for writing
>> into DB is around 40Krecords/minute, which is supposed to be very slow,
>> maybe that is why tuples getting accumulated in memory before dumped into
>> DB. But I think 10 is too small, does that mean, only 10 tuples are allowed
>> in the flight?
>>
>> thanks
>>
>> AL
>>
>> On Fri, Mar 6, 2015 at 7:39 PM, Nathan Leung <nc...@gmail.com> wrote:
>>
>>> I've not modified netty so I can't comment on that.  I would set max
>>> spout pending; try 1000 at first.  This will limit the number of tuples
>>> that you can have in flight simultaneously and therefore limit the amount
>>> of memory used by these tuples and their processing.
>>>
>>> On Fri, Mar 6, 2015 at 7:03 PM, Sa Li <sa...@gmail.com> wrote:
>>>
>>>> Hi, Nathan
>>>>
>>>> THe log size of that kafka topic is 23515541, each record is about 3K,
>>>>  I check the yaml file, I don't have max spout pending set, so I
>>>> assume it is should be default: topology.max.spout.pending: null
>>>>
>>>> Should I set it to a certain value? Also I sometimes seeing the
>>>> java.nio.channels.ClosedChannelException: null, or  b.s.d.worker
>>>> [ERROR] Error on initialization of server mk-worker
>>>> does this mean I should add
>>>> storm.messaging.netty.server_worker_threads: 1
>>>> storm.messaging.netty.client_worker_threads: 1
>>>> storm.messaging.netty.buffer_size: 5242880 #5MB buffer
>>>> storm.messaging.netty.max_retries: 30
>>>> storm.messaging.netty.max_wait_ms: 1000
>>>> storm.messaging.netty.min_wait_ms: 100
>>>>
>>>> into yaml, and modfiy the values?
>>>>
>>>>
>>>>
>>>> thanks
>>>>
>>>>
>>>>
>>>> On Fri, Mar 6, 2015 at 2:22 PM, Nathan Leung <nc...@gmail.com> wrote:
>>>>
>>>>> How much data do you have in Kafka? How is your max spout pending set?
>>>>> If you have a high max spout pending (or if you emit unanchored tuples) you
>>>>> could be using up a lot of memory.
>>>>> On Mar 6, 2015 5:14 PM, "Sa Li" <sa...@gmail.com> wrote:
>>>>>
>>>>>> Hi, Nathan
>>>>>>
>>>>>> I have met a strange issue, when I set spoutConf.forceFromStart=true,
>>>>>> it will quickly run into GC overhead limit, even I already increase the
>>>>>> heap size, but I if I remove this setting
>>>>>> it will work fine, I was thinking maybe the kafkaSpout consuming data
>>>>>> much faster than the data being written into postgresDB, and data will
>>>>>> quick take the memory and causing heap overflow. But I did the same test on
>>>>>> my DEV cluster, it will working fine, even I set
>>>>>> spoutConf.forceFromStart=true. I check the storm config for DEV and
>>>>>> production, they are all same.
>>>>>>
>>>>>> Any hints?
>>>>>>
>>>>>> thanks
>>>>>>
>>>>>> AL
>>>>>>
>>>>>>
>>>>>> On Thu, Mar 5, 2015 at 3:26 PM, Nathan Leung <nc...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I don't see anything glaring.  I would try increasing heap size.  It
>>>>>>> could be that you're right on the threshold of what you've allocated and
>>>>>>> you just need more memory.
>>>>>>>
>>>>>>> On Thu, Mar 5, 2015 at 5:41 PM, Sa Li <sa...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi, All,
>>>>>>>> ,
>>>>>>>> I kind locate where the problem come from, in my running command, I
>>>>>>>> will specify the clientid of TridentKafkaConfig, if I keep the clientid as
>>>>>>>> the one I used before, it will cause GC error, otherwise I am completely
>>>>>>>> OK. Here is the code:
>>>>>>>>
>>>>>>>> if (parameters.containsKey("clientid")) {
>>>>>>>>     logger.info("topic=>" + parameters.get("clientid") + "/" + parameters.get("topic"));
>>>>>>>>     spoutConf = new TridentKafkaConfig(zk, parameters.get("topic"), parameters.get("clientid"));
>>>>>>>>
>>>>>>>> Any idea about this error?
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>>
>>>>>>>> AL
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Mar 5, 2015 at 12:02 PM, Sa Li <sa...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Sorry, continue last thread:
>>>>>>>>>
>>>>>>>>> 2015-03-05T11:48:08.418-0800 b.s.util [ERROR] Async loop died!
>>>>>>>>> java.lang.RuntimeException: java.lang.RuntimeException: Remote
>>>>>>>>> address is not reachable. We will close this client
>>>>>>>>> Netty-Client-complicated-laugh/10.100.98.103:6703
>>>>>>>>>         at
>>>>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>         at
>>>>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>         at
>>>>>>>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>         at
>>>>>>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>         at
>>>>>>>>> backtype.storm.util$async_loop$fn__464.invoke(util.clj:463)
>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>>>>>>>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
>>>>>>>>> Caused by: java.lang.RuntimeException: Remote address is not
>>>>>>>>> reachable. We will close this client Netty-Client-complicated-laugh/
>>>>>>>>> 10.100.98.103:6703
>>>>>>>>>         at
>>>>>>>>> backtype.storm.messaging.netty.Client.connect(Client.java:171)
>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>         at
>>>>>>>>> backtype.storm.messaging.netty.Client.send(Client.java:194)
>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>         at
>>>>>>>>> backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54)
>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>         at
>>>>>>>>> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730$fn__3731.invoke(worker.clj:330)
>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>         at
>>>>>>>>> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730.invoke(worker.clj:328)
>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>         at
>>>>>>>>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>         at
>>>>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
>>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>         ... 6 common frames omitted
>>>>>>>>> 2015-03-05T11:48:08.423-0800 b.s.util [ERROR] Halting process:
>>>>>>>>> ("Async loop died!")
>>>>>>>>> java.lang.RuntimeException: ("Async loop died!")
>>>>>>>>>         at
>>>>>>>>> backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325)
>>>>>>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>         at clojure.lang.RestFn.invoke(RestFn.java:423)
>>>>>>>>> [clojure-1.5.1.jar:na]
>>>>>>>>>         at
>>>>>>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1458.invoke(disruptor.clj:92)
>>>>>>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>         at
>>>>>>>>> backtype.storm.util$async_loop$fn__464.invoke(util.clj:473)
>>>>>>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>>>>>>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>>>>>>>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
>>>>>>>>> 2015-03-05T11:48:08.425-0800 b.s.d.worker [INFO] Shutting down
>>>>>>>>> worker eventsStreamerv1-48-1425499636 0673ece0-cea2-4185-9b3e-6c49ad585576
>>>>>>>>> 6703
>>>>>>>>> 2015-03-05T11:48:08.426-0800 b.s.m.n.Client [INFO] Closing Netty
>>>>>>>>> Client Netty-Client-beloved-judge/10.100.98.104:6703
>>>>>>>>>
>>>>>>>>> I doubt this is caused by my eventUpfater, which write data in
>>>>>>>>> batch
>>>>>>>>>
>>>>>>>>> static class EventUpdater implements ReducerAggregator<List<String>> {
>>>>>>>>>
>>>>>>>>>             @Override
>>>>>>>>>             public List<String> init(){
>>>>>>>>>                      return null;
>>>>>>>>>             }
>>>>>>>>>
>>>>>>>>>             @Override
>>>>>>>>>             public List<String> reduce(List<String> curr, TridentTuple tuple) {
>>>>>>>>>                    List<String> updated = null ;
>>>>>>>>>
>>>>>>>>>                    if ( curr == null ) {
>>>>>>>>>                                     String event = (String) tuple.getValue(1);
>>>>>>>>>                                     System.out.println("===:" + event + ":");
>>>>>>>>>                                     updated = Lists.newArrayList(event);
>>>>>>>>>                    } else {
>>>>>>>>>                                     System.out.println("===+" +  tuple + ":");
>>>>>>>>>                                     updated = curr ;
>>>>>>>>>                    }
>>>>>>>>> //              System.out.println("(())");
>>>>>>>>>               return updated ;
>>>>>>>>>             }
>>>>>>>>>         }
>>>>>>>>>
>>>>>>>>> How do you think
>>>>>>>>>
>>>>>>>>> THanks
>>>>>>>>>
>>>>>>>>> On Thu, Mar 5, 2015 at 11:57 AM, Sa Li <sa...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thank you very much for the reply, here is error I saw in
>>>>>>>>>> production server worker-6703.log,
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Mar 5, 2015 at 11:31 AM, Nathan Leung <nc...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Yeah, then in this case maybe you can install JDK / Yourkit in
>>>>>>>>>>> the remote machines and run the tools over X or something.  I'm assuming
>>>>>>>>>>> this is a development cluster (not live / production) and that installing
>>>>>>>>>>> debugging tools and running remote UIs etc is not a problem.  :)
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Mar 5, 2015 at 1:52 PM, Andrew Xor <
>>>>>>>>>>> andreas.grammenos@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Nathan I think that if he wants to profile a bolt per se that
>>>>>>>>>>>> runs in a worker that resides in a different cluster node than the one the
>>>>>>>>>>>> profiling tool runs he won't be able to attach the process since it resides
>>>>>>>>>>>> in a different physical machine, me thinks (well, now that I think of it
>>>>>>>>>>>> better it can be done... via remote debugging but that's just a pain in the
>>>>>>>>>>>> ***).
>>>>>>>>>>>>
>>>>>>>>>>>> Regards,
>>>>>>>>>>>>
>>>>>>>>>>>> A.
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Mar 5, 2015 at 8:46 PM, Nathan Leung <ncleung@gmail.com
>>>>>>>>>>>> > wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> You don't need to change your code. As Andrew mentioned you
>>>>>>>>>>>>> can get a lot of mileage by profiling your logic in a standalone program.
>>>>>>>>>>>>> For jvisualvm, you can just run your program (a loop that runs for a long
>>>>>>>>>>>>> time is best) then attach to the running process with jvisualvm.  It's
>>>>>>>>>>>>> pretty straightforward to use and you can also find good guides with a
>>>>>>>>>>>>> Google search.
>>>>>>>>>>>>> On Mar 5, 2015 1:43 PM, "Andrew Xor" <
>>>>>>>>>>>>> andreas.grammenos@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> ​
>>>>>>>>>>>>>> Well...  detecting memory leaks in Java is a bit tricky as
>>>>>>>>>>>>>> Java does a lot for you. Generally though, as long as you avoid using "new"
>>>>>>>>>>>>>> operator and close any resources that you do not use you should be fine...
>>>>>>>>>>>>>> but a Profiler such as the ones mentioned by Nathan will tell you the whole
>>>>>>>>>>>>>> truth. YourKit is awesome and has a free trial, go ahead and test drive it.
>>>>>>>>>>>>>> I am pretty sure that you need a working jar (or compilable code that has a
>>>>>>>>>>>>>> main function in it) in order to profile it, although if you want to
>>>>>>>>>>>>>> profile your bolts and spouts is a bit tricker. Hopefully your algorithm
>>>>>>>>>>>>>> (or portions of it) can be put in a sample test program that is able to be
>>>>>>>>>>>>>> executed locally for you to profile it.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hope this helped. Regards,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> A.
>>>>>>>>>>>>>> ​
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 8:33 PM, Sa Li <sa...@gmail.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 10:26 AM, Andrew Xor <
>>>>>>>>>>>>>>> andreas.grammenos@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Unfortunately that is not fixed, it depends on the
>>>>>>>>>>>>>>>> computations and data-structures you have; in my case for example I use
>>>>>>>>>>>>>>>> more than 2GB since I need to keep a large matrix in memory... having said
>>>>>>>>>>>>>>>> that, in most cases it should be relatively easy to estimate how much
>>>>>>>>>>>>>>>> memory you are going to need and use that... or if that's not possible you
>>>>>>>>>>>>>>>> can just increase it and try the "set and see" approach. Check for memory
>>>>>>>>>>>>>>>> leaks as well... (unclosed resources and so on...!)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Regards.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> ​A.​
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 8:21 PM, Sa Li <sa.in.vanc@gmail.com
>>>>>>>>>>>>>>>> > wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks, Nathan. How much is should be in general?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 10:15 AM, Nathan Leung <
>>>>>>>>>>>>>>>>> ncleung@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Your worker is allocated a maximum of 768mb of heap. It's
>>>>>>>>>>>>>>>>>> quite possible that this is not enough. Try increasing Xmx i
>>>>>>>>>>>>>>>>>> worker.childopts.
>>>>>>>>>>>>>>>>>> On Mar 5, 2015 1:10 PM, "Sa Li" <sa...@gmail.com>
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi, All
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I have been running a trident topology on production
>>>>>>>>>>>>>>>>>>> server, code is like this:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> topology.newStream("spoutInit", kafkaSpout)
>>>>>>>>>>>>>>>>>>>                 .each(new Fields("str"),
>>>>>>>>>>>>>>>>>>>                         new JsonObjectParse(),
>>>>>>>>>>>>>>>>>>>                         new Fields("eventType", "event"))
>>>>>>>>>>>>>>>>>>>                 .parallelismHint(pHint)
>>>>>>>>>>>>>>>>>>>                 .groupBy(new Fields("event"))
>>>>>>>>>>>>>>>>>>>                 .persistentAggregate(PostgresqlState.newFactory(config), new Fields("eventType"), new EventUpdater(), new Fields("eventWord"))
>>>>>>>>>>>>>>>>>>>         ;
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>         Config conf = new Config();
>>>>>>>>>>>>>>>>>>>         conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 1);
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Basically, it does simple things to get data from kafka, parse to different field and write into postgresDB. But in storm UI, I did see such error, "java.lang.OutOfMemoryError: GC overhead limit exceeded". It all happens in same worker of each node - 6703. I understand this is because by default the JVM is configured to throw this error if you are spending more than *98% of the total time in GC and after the GC less than 2% of the heap is recovered*.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I am not sure what is exact cause for memory leak, is it OK by simply increase the heap? Here is my storm.yaml:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> supervisor.slots.ports:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>      - 6700
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>      - 6701
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>      - 6702
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>      - 6703
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> supervisor.childopts: "-Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> worker.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Anyone has similar issues, and what will be the best way
>>>>>>>>>>>>>>>>>>> to overcome?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> thanks in advance
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> AL
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>
>>
>

Re: java.lang.OutOfMemoryError: GC overhead limit exceeded

Posted by Nathan Leung <nc...@gmail.com>.
I still think you should try running with a larger heap.  :)  Max spout
pending determines how many tuples can be pending (tuple tree is not fully
acked) per spout task.  If you have many spout tasks per worker this can be
a large amount of memory.  It also depends on how big your tuples are.

On Mon, Mar 9, 2015 at 6:14 PM, Sa Li <sa...@gmail.com> wrote:

> Hi, Nathan
>
> We have played around max spout pending in dev, if we set it as 10, it is
> OK, but if we set it more than 50, GC overhead starts to come out. We are
> finally writing tuples into postgresqlDB, the highest speed for writing
> into DB is around 40Krecords/minute, which is supposed to be very slow,
> maybe that is why tuples getting accumulated in memory before dumped into
> DB. But I think 10 is too small, does that mean, only 10 tuples are allowed
> in the flight?
>
> thanks
>
> AL
>
> On Fri, Mar 6, 2015 at 7:39 PM, Nathan Leung <nc...@gmail.com> wrote:
>
>> I've not modified netty so I can't comment on that.  I would set max
>> spout pending; try 1000 at first.  This will limit the number of tuples
>> that you can have in flight simultaneously and therefore limit the amount
>> of memory used by these tuples and their processing.
>>
>> On Fri, Mar 6, 2015 at 7:03 PM, Sa Li <sa...@gmail.com> wrote:
>>
>>> Hi, Nathan
>>>
>>> THe log size of that kafka topic is 23515541, each record is about 3K,
>>>  I check the yaml file, I don't have max spout pending set, so I assume
>>> it is should be default: topology.max.spout.pending: null
>>>
>>> Should I set it to a certain value? Also I sometimes seeing the
>>> java.nio.channels.ClosedChannelException: null, or  b.s.d.worker
>>> [ERROR] Error on initialization of server mk-worker
>>> does this mean I should add
>>> storm.messaging.netty.server_worker_threads: 1
>>> storm.messaging.netty.client_worker_threads: 1
>>> storm.messaging.netty.buffer_size: 5242880 #5MB buffer
>>> storm.messaging.netty.max_retries: 30
>>> storm.messaging.netty.max_wait_ms: 1000
>>> storm.messaging.netty.min_wait_ms: 100
>>>
>>> into yaml, and modfiy the values?
>>>
>>>
>>>
>>> thanks
>>>
>>>
>>>
>>> On Fri, Mar 6, 2015 at 2:22 PM, Nathan Leung <nc...@gmail.com> wrote:
>>>
>>>> How much data do you have in Kafka? How is your max spout pending set?
>>>> If you have a high max spout pending (or if you emit unanchored tuples) you
>>>> could be using up a lot of memory.
>>>> On Mar 6, 2015 5:14 PM, "Sa Li" <sa...@gmail.com> wrote:
>>>>
>>>>> Hi, Nathan
>>>>>
>>>>> I have met a strange issue, when I set spoutConf.forceFromStart=true,
>>>>> it will quickly run into GC overhead limit, even I already increase the
>>>>> heap size, but I if I remove this setting
>>>>> it will work fine, I was thinking maybe the kafkaSpout consuming data
>>>>> much faster than the data being written into postgresDB, and data will
>>>>> quick take the memory and causing heap overflow. But I did the same test on
>>>>> my DEV cluster, it will working fine, even I set
>>>>> spoutConf.forceFromStart=true. I check the storm config for DEV and
>>>>> production, they are all same.
>>>>>
>>>>> Any hints?
>>>>>
>>>>> thanks
>>>>>
>>>>> AL
>>>>>
>>>>>
>>>>> On Thu, Mar 5, 2015 at 3:26 PM, Nathan Leung <nc...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I don't see anything glaring.  I would try increasing heap size.  It
>>>>>> could be that you're right on the threshold of what you've allocated and
>>>>>> you just need more memory.
>>>>>>
>>>>>> On Thu, Mar 5, 2015 at 5:41 PM, Sa Li <sa...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi, All,
>>>>>>> ,
>>>>>>> I kind locate where the problem come from, in my running command, I
>>>>>>> will specify the clientid of TridentKafkaConfig, if I keep the clientid as
>>>>>>> the one I used before, it will cause GC error, otherwise I am completely
>>>>>>> OK. Here is the code:
>>>>>>>
>>>>>>> if (parameters.containsKey("clientid")) {
>>>>>>>     logger.info("topic=>" + parameters.get("clientid") + "/" + parameters.get("topic"));
>>>>>>>     spoutConf = new TridentKafkaConfig(zk, parameters.get("topic"), parameters.get("clientid"));
>>>>>>>
>>>>>>> Any idea about this error?
>>>>>>>
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>>
>>>>>>> AL
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Mar 5, 2015 at 12:02 PM, Sa Li <sa...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Sorry, continue last thread:
>>>>>>>>
>>>>>>>> 2015-03-05T11:48:08.418-0800 b.s.util [ERROR] Async loop died!
>>>>>>>> java.lang.RuntimeException: java.lang.RuntimeException: Remote
>>>>>>>> address is not reachable. We will close this client
>>>>>>>> Netty-Client-complicated-laugh/10.100.98.103:6703
>>>>>>>>         at
>>>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>         at
>>>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>         at
>>>>>>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>         at
>>>>>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>         at
>>>>>>>> backtype.storm.util$async_loop$fn__464.invoke(util.clj:463)
>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>>>>>>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
>>>>>>>> Caused by: java.lang.RuntimeException: Remote address is not
>>>>>>>> reachable. We will close this client Netty-Client-complicated-laugh/
>>>>>>>> 10.100.98.103:6703
>>>>>>>>         at
>>>>>>>> backtype.storm.messaging.netty.Client.connect(Client.java:171)
>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>         at
>>>>>>>> backtype.storm.messaging.netty.Client.send(Client.java:194)
>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>         at
>>>>>>>> backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54)
>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>         at
>>>>>>>> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730$fn__3731.invoke(worker.clj:330)
>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>         at
>>>>>>>> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730.invoke(worker.clj:328)
>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>         at
>>>>>>>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>         at
>>>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
>>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>>         ... 6 common frames omitted
>>>>>>>> 2015-03-05T11:48:08.423-0800 b.s.util [ERROR] Halting process:
>>>>>>>> ("Async loop died!")
>>>>>>>> java.lang.RuntimeException: ("Async loop died!")
>>>>>>>>         at
>>>>>>>> backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325)
>>>>>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>>>>>         at clojure.lang.RestFn.invoke(RestFn.java:423)
>>>>>>>> [clojure-1.5.1.jar:na]
>>>>>>>>         at
>>>>>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1458.invoke(disruptor.clj:92)
>>>>>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>>>>>         at
>>>>>>>> backtype.storm.util$async_loop$fn__464.invoke(util.clj:473)
>>>>>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>>>>>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>>>>>>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
>>>>>>>> 2015-03-05T11:48:08.425-0800 b.s.d.worker [INFO] Shutting down
>>>>>>>> worker eventsStreamerv1-48-1425499636 0673ece0-cea2-4185-9b3e-6c49ad585576
>>>>>>>> 6703
>>>>>>>> 2015-03-05T11:48:08.426-0800 b.s.m.n.Client [INFO] Closing Netty
>>>>>>>> Client Netty-Client-beloved-judge/10.100.98.104:6703
>>>>>>>>
>>>>>>>> I doubt this is caused by my eventUpfater, which write data in batch
>>>>>>>>
>>>>>>>> static class EventUpdater implements ReducerAggregator<List<String>> {
>>>>>>>>
>>>>>>>>             @Override
>>>>>>>>             public List<String> init(){
>>>>>>>>                      return null;
>>>>>>>>             }
>>>>>>>>
>>>>>>>>             @Override
>>>>>>>>             public List<String> reduce(List<String> curr, TridentTuple tuple) {
>>>>>>>>                    List<String> updated = null ;
>>>>>>>>
>>>>>>>>                    if ( curr == null ) {
>>>>>>>>                                     String event = (String) tuple.getValue(1);
>>>>>>>>                                     System.out.println("===:" + event + ":");
>>>>>>>>                                     updated = Lists.newArrayList(event);
>>>>>>>>                    } else {
>>>>>>>>                                     System.out.println("===+" +  tuple + ":");
>>>>>>>>                                     updated = curr ;
>>>>>>>>                    }
>>>>>>>> //              System.out.println("(())");
>>>>>>>>               return updated ;
>>>>>>>>             }
>>>>>>>>         }
>>>>>>>>
>>>>>>>> How do you think
>>>>>>>>
>>>>>>>> THanks
>>>>>>>>
>>>>>>>> On Thu, Mar 5, 2015 at 11:57 AM, Sa Li <sa...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thank you very much for the reply, here is error I saw in
>>>>>>>>> production server worker-6703.log,
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Mar 5, 2015 at 11:31 AM, Nathan Leung <nc...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Yeah, then in this case maybe you can install JDK / Yourkit in
>>>>>>>>>> the remote machines and run the tools over X or something.  I'm assuming
>>>>>>>>>> this is a development cluster (not live / production) and that installing
>>>>>>>>>> debugging tools and running remote UIs etc is not a problem.  :)
>>>>>>>>>>
>>>>>>>>>> On Thu, Mar 5, 2015 at 1:52 PM, Andrew Xor <
>>>>>>>>>> andreas.grammenos@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Nathan I think that if he wants to profile a bolt per se that
>>>>>>>>>>> runs in a worker that resides in a different cluster node than the one the
>>>>>>>>>>> profiling tool runs he won't be able to attach the process since it resides
>>>>>>>>>>> in a different physical machine, me thinks (well, now that I think of it
>>>>>>>>>>> better it can be done... via remote debugging but that's just a pain in the
>>>>>>>>>>> ***).
>>>>>>>>>>>
>>>>>>>>>>> Regards,
>>>>>>>>>>>
>>>>>>>>>>> A.
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Mar 5, 2015 at 8:46 PM, Nathan Leung <nc...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> You don't need to change your code. As Andrew mentioned you can
>>>>>>>>>>>> get a lot of mileage by profiling your logic in a standalone program. For
>>>>>>>>>>>> jvisualvm, you can just run your program (a loop that runs for a long time
>>>>>>>>>>>> is best) then attach to the running process with jvisualvm.  It's pretty
>>>>>>>>>>>> straightforward to use and you can also find good guides with a Google
>>>>>>>>>>>> search.
>>>>>>>>>>>> On Mar 5, 2015 1:43 PM, "Andrew Xor" <
>>>>>>>>>>>> andreas.grammenos@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> ​
>>>>>>>>>>>>> Well...  detecting memory leaks in Java is a bit tricky as
>>>>>>>>>>>>> Java does a lot for you. Generally though, as long as you avoid using "new"
>>>>>>>>>>>>> operator and close any resources that you do not use you should be fine...
>>>>>>>>>>>>> but a Profiler such as the ones mentioned by Nathan will tell you the whole
>>>>>>>>>>>>> truth. YourKit is awesome and has a free trial, go ahead and test drive it.
>>>>>>>>>>>>> I am pretty sure that you need a working jar (or compilable code that has a
>>>>>>>>>>>>> main function in it) in order to profile it, although if you want to
>>>>>>>>>>>>> profile your bolts and spouts is a bit tricker. Hopefully your algorithm
>>>>>>>>>>>>> (or portions of it) can be put in a sample test program that is able to be
>>>>>>>>>>>>> executed locally for you to profile it.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hope this helped. Regards,
>>>>>>>>>>>>>
>>>>>>>>>>>>> A.
>>>>>>>>>>>>> ​
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 8:33 PM, Sa Li <sa...@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 10:26 AM, Andrew Xor <
>>>>>>>>>>>>>> andreas.grammenos@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Unfortunately that is not fixed, it depends on the
>>>>>>>>>>>>>>> computations and data-structures you have; in my case for example I use
>>>>>>>>>>>>>>> more than 2GB since I need to keep a large matrix in memory... having said
>>>>>>>>>>>>>>> that, in most cases it should be relatively easy to estimate how much
>>>>>>>>>>>>>>> memory you are going to need and use that... or if that's not possible you
>>>>>>>>>>>>>>> can just increase it and try the "set and see" approach. Check for memory
>>>>>>>>>>>>>>> leaks as well... (unclosed resources and so on...!)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Regards.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> ​A.​
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 8:21 PM, Sa Li <sa...@gmail.com>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks, Nathan. How much is should be in general?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 10:15 AM, Nathan Leung <
>>>>>>>>>>>>>>>> ncleung@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Your worker is allocated a maximum of 768mb of heap. It's
>>>>>>>>>>>>>>>>> quite possible that this is not enough. Try increasing Xmx i
>>>>>>>>>>>>>>>>> worker.childopts.
>>>>>>>>>>>>>>>>> On Mar 5, 2015 1:10 PM, "Sa Li" <sa...@gmail.com>
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi, All
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I have been running a trident topology on production
>>>>>>>>>>>>>>>>>> server, code is like this:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> topology.newStream("spoutInit", kafkaSpout)
>>>>>>>>>>>>>>>>>>                 .each(new Fields("str"),
>>>>>>>>>>>>>>>>>>                         new JsonObjectParse(),
>>>>>>>>>>>>>>>>>>                         new Fields("eventType", "event"))
>>>>>>>>>>>>>>>>>>                 .parallelismHint(pHint)
>>>>>>>>>>>>>>>>>>                 .groupBy(new Fields("event"))
>>>>>>>>>>>>>>>>>>                 .persistentAggregate(PostgresqlState.newFactory(config), new Fields("eventType"), new EventUpdater(), new Fields("eventWord"))
>>>>>>>>>>>>>>>>>>         ;
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>         Config conf = new Config();
>>>>>>>>>>>>>>>>>>         conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 1);
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Basically, it does simple things to get data from kafka, parse to different field and write into postgresDB. But in storm UI, I did see such error, "java.lang.OutOfMemoryError: GC overhead limit exceeded". It all happens in same worker of each node - 6703. I understand this is because by default the JVM is configured to throw this error if you are spending more than *98% of the total time in GC and after the GC less than 2% of the heap is recovered*.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I am not sure what is exact cause for memory leak, is it OK by simply increase the heap? Here is my storm.yaml:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> supervisor.slots.ports:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>      - 6700
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>      - 6701
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>      - 6702
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>      - 6703
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> supervisor.childopts: "-Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> worker.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Anyone has similar issues, and what will be the best way
>>>>>>>>>>>>>>>>>> to overcome?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> thanks in advance
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> AL
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>
>>
>

Re: java.lang.OutOfMemoryError: GC overhead limit exceeded

Posted by Sa Li <sa...@gmail.com>.
Hi, Nathan

We have played around max spout pending in dev, if we set it as 10, it is
OK, but if we set it more than 50, GC overhead starts to come out. We are
finally writing tuples into postgresqlDB, the highest speed for writing
into DB is around 40Krecords/minute, which is supposed to be very slow,
maybe that is why tuples getting accumulated in memory before dumped into
DB. But I think 10 is too small, does that mean, only 10 tuples are allowed
in the flight?

thanks

AL

On Fri, Mar 6, 2015 at 7:39 PM, Nathan Leung <nc...@gmail.com> wrote:

> I've not modified netty so I can't comment on that.  I would set max spout
> pending; try 1000 at first.  This will limit the number of tuples that you
> can have in flight simultaneously and therefore limit the amount of memory
> used by these tuples and their processing.
>
> On Fri, Mar 6, 2015 at 7:03 PM, Sa Li <sa...@gmail.com> wrote:
>
>> Hi, Nathan
>>
>> THe log size of that kafka topic is 23515541, each record is about 3K,  I
>> check the yaml file, I don't have max spout pending set, so I assume it
>> is should be default: topology.max.spout.pending: null
>>
>> Should I set it to a certain value? Also I sometimes seeing the
>> java.nio.channels.ClosedChannelException: null, or  b.s.d.worker [ERROR]
>> Error on initialization of server mk-worker
>> does this mean I should add
>> storm.messaging.netty.server_worker_threads: 1
>> storm.messaging.netty.client_worker_threads: 1
>> storm.messaging.netty.buffer_size: 5242880 #5MB buffer
>> storm.messaging.netty.max_retries: 30  storm.messaging.netty.max_wait_ms:
>> 1000  storm.messaging.netty.min_wait_ms: 100
>>
>> into yaml, and modfiy the values?
>>
>>
>>
>> thanks
>>
>>
>>
>> On Fri, Mar 6, 2015 at 2:22 PM, Nathan Leung <nc...@gmail.com> wrote:
>>
>>> How much data do you have in Kafka? How is your max spout pending set?
>>> If you have a high max spout pending (or if you emit unanchored tuples) you
>>> could be using up a lot of memory.
>>> On Mar 6, 2015 5:14 PM, "Sa Li" <sa...@gmail.com> wrote:
>>>
>>>> Hi, Nathan
>>>>
>>>> I have met a strange issue, when I set spoutConf.forceFromStart=true,
>>>> it will quickly run into GC overhead limit, even I already increase the
>>>> heap size, but I if I remove this setting
>>>> it will work fine, I was thinking maybe the kafkaSpout consuming data
>>>> much faster than the data being written into postgresDB, and data will
>>>> quick take the memory and causing heap overflow. But I did the same test on
>>>> my DEV cluster, it will working fine, even I set
>>>> spoutConf.forceFromStart=true. I check the storm config for DEV and
>>>> production, they are all same.
>>>>
>>>> Any hints?
>>>>
>>>> thanks
>>>>
>>>> AL
>>>>
>>>>
>>>> On Thu, Mar 5, 2015 at 3:26 PM, Nathan Leung <nc...@gmail.com> wrote:
>>>>
>>>>> I don't see anything glaring.  I would try increasing heap size.  It
>>>>> could be that you're right on the threshold of what you've allocated and
>>>>> you just need more memory.
>>>>>
>>>>> On Thu, Mar 5, 2015 at 5:41 PM, Sa Li <sa...@gmail.com> wrote:
>>>>>
>>>>>> Hi, All,
>>>>>> ,
>>>>>> I kind locate where the problem come from, in my running command, I
>>>>>> will specify the clientid of TridentKafkaConfig, if I keep the clientid as
>>>>>> the one I used before, it will cause GC error, otherwise I am completely
>>>>>> OK. Here is the code:
>>>>>>
>>>>>> if (parameters.containsKey("clientid")) {
>>>>>>     logger.info("topic=>" + parameters.get("clientid") + "/" + parameters.get("topic"));
>>>>>>     spoutConf = new TridentKafkaConfig(zk, parameters.get("topic"), parameters.get("clientid"));
>>>>>>
>>>>>> Any idea about this error?
>>>>>>
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>>
>>>>>> AL
>>>>>>
>>>>>>
>>>>>> On Thu, Mar 5, 2015 at 12:02 PM, Sa Li <sa...@gmail.com> wrote:
>>>>>>
>>>>>>> Sorry, continue last thread:
>>>>>>>
>>>>>>> 2015-03-05T11:48:08.418-0800 b.s.util [ERROR] Async loop died!
>>>>>>> java.lang.RuntimeException: java.lang.RuntimeException: Remote
>>>>>>> address is not reachable. We will close this client
>>>>>>> Netty-Client-complicated-laugh/10.100.98.103:6703
>>>>>>>         at
>>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>         at
>>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>         at
>>>>>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>         at
>>>>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>         at
>>>>>>> backtype.storm.util$async_loop$fn__464.invoke(util.clj:463)
>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>>>>>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
>>>>>>> Caused by: java.lang.RuntimeException: Remote address is not
>>>>>>> reachable. We will close this client Netty-Client-complicated-laugh/
>>>>>>> 10.100.98.103:6703
>>>>>>>         at
>>>>>>> backtype.storm.messaging.netty.Client.connect(Client.java:171)
>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>         at
>>>>>>> backtype.storm.messaging.netty.Client.send(Client.java:194)
>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>         at
>>>>>>> backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54)
>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>         at
>>>>>>> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730$fn__3731.invoke(worker.clj:330)
>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>         at
>>>>>>> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730.invoke(worker.clj:328)
>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>         at
>>>>>>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>         at
>>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
>>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>>         ... 6 common frames omitted
>>>>>>> 2015-03-05T11:48:08.423-0800 b.s.util [ERROR] Halting process:
>>>>>>> ("Async loop died!")
>>>>>>> java.lang.RuntimeException: ("Async loop died!")
>>>>>>>         at
>>>>>>> backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325)
>>>>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>>>>         at clojure.lang.RestFn.invoke(RestFn.java:423)
>>>>>>> [clojure-1.5.1.jar:na]
>>>>>>>         at
>>>>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1458.invoke(disruptor.clj:92)
>>>>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>>>>         at
>>>>>>> backtype.storm.util$async_loop$fn__464.invoke(util.clj:473)
>>>>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>>>>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>>>>>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
>>>>>>> 2015-03-05T11:48:08.425-0800 b.s.d.worker [INFO] Shutting down
>>>>>>> worker eventsStreamerv1-48-1425499636 0673ece0-cea2-4185-9b3e-6c49ad585576
>>>>>>> 6703
>>>>>>> 2015-03-05T11:48:08.426-0800 b.s.m.n.Client [INFO] Closing Netty
>>>>>>> Client Netty-Client-beloved-judge/10.100.98.104:6703
>>>>>>>
>>>>>>> I doubt this is caused by my eventUpfater, which write data in batch
>>>>>>>
>>>>>>> static class EventUpdater implements ReducerAggregator<List<String>> {
>>>>>>>
>>>>>>>             @Override
>>>>>>>             public List<String> init(){
>>>>>>>                      return null;
>>>>>>>             }
>>>>>>>
>>>>>>>             @Override
>>>>>>>             public List<String> reduce(List<String> curr, TridentTuple tuple) {
>>>>>>>                    List<String> updated = null ;
>>>>>>>
>>>>>>>                    if ( curr == null ) {
>>>>>>>                                     String event = (String) tuple.getValue(1);
>>>>>>>                                     System.out.println("===:" + event + ":");
>>>>>>>                                     updated = Lists.newArrayList(event);
>>>>>>>                    } else {
>>>>>>>                                     System.out.println("===+" +  tuple + ":");
>>>>>>>                                     updated = curr ;
>>>>>>>                    }
>>>>>>> //              System.out.println("(())");
>>>>>>>               return updated ;
>>>>>>>             }
>>>>>>>         }
>>>>>>>
>>>>>>> How do you think
>>>>>>>
>>>>>>> THanks
>>>>>>>
>>>>>>> On Thu, Mar 5, 2015 at 11:57 AM, Sa Li <sa...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Thank you very much for the reply, here is error I saw in
>>>>>>>> production server worker-6703.log,
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Mar 5, 2015 at 11:31 AM, Nathan Leung <nc...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Yeah, then in this case maybe you can install JDK / Yourkit in the
>>>>>>>>> remote machines and run the tools over X or something.  I'm assuming this
>>>>>>>>> is a development cluster (not live / production) and that installing
>>>>>>>>> debugging tools and running remote UIs etc is not a problem.  :)
>>>>>>>>>
>>>>>>>>> On Thu, Mar 5, 2015 at 1:52 PM, Andrew Xor <
>>>>>>>>> andreas.grammenos@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Nathan I think that if he wants to profile a bolt per se that
>>>>>>>>>> runs in a worker that resides in a different cluster node than the one the
>>>>>>>>>> profiling tool runs he won't be able to attach the process since it resides
>>>>>>>>>> in a different physical machine, me thinks (well, now that I think of it
>>>>>>>>>> better it can be done... via remote debugging but that's just a pain in the
>>>>>>>>>> ***).
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>>
>>>>>>>>>> A.
>>>>>>>>>>
>>>>>>>>>> On Thu, Mar 5, 2015 at 8:46 PM, Nathan Leung <nc...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> You don't need to change your code. As Andrew mentioned you can
>>>>>>>>>>> get a lot of mileage by profiling your logic in a standalone program. For
>>>>>>>>>>> jvisualvm, you can just run your program (a loop that runs for a long time
>>>>>>>>>>> is best) then attach to the running process with jvisualvm.  It's pretty
>>>>>>>>>>> straightforward to use and you can also find good guides with a Google
>>>>>>>>>>> search.
>>>>>>>>>>> On Mar 5, 2015 1:43 PM, "Andrew Xor" <
>>>>>>>>>>> andreas.grammenos@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> ​
>>>>>>>>>>>> Well...  detecting memory leaks in Java is a bit tricky as Java
>>>>>>>>>>>> does a lot for you. Generally though, as long as you avoid using "new"
>>>>>>>>>>>> operator and close any resources that you do not use you should be fine...
>>>>>>>>>>>> but a Profiler such as the ones mentioned by Nathan will tell you the whole
>>>>>>>>>>>> truth. YourKit is awesome and has a free trial, go ahead and test drive it.
>>>>>>>>>>>> I am pretty sure that you need a working jar (or compilable code that has a
>>>>>>>>>>>> main function in it) in order to profile it, although if you want to
>>>>>>>>>>>> profile your bolts and spouts is a bit tricker. Hopefully your algorithm
>>>>>>>>>>>> (or portions of it) can be put in a sample test program that is able to be
>>>>>>>>>>>> executed locally for you to profile it.
>>>>>>>>>>>>
>>>>>>>>>>>> Hope this helped. Regards,
>>>>>>>>>>>>
>>>>>>>>>>>> A.
>>>>>>>>>>>> ​
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Mar 5, 2015 at 8:33 PM, Sa Li <sa...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 10:26 AM, Andrew Xor <
>>>>>>>>>>>>> andreas.grammenos@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Unfortunately that is not fixed, it depends on the
>>>>>>>>>>>>>> computations and data-structures you have; in my case for example I use
>>>>>>>>>>>>>> more than 2GB since I need to keep a large matrix in memory... having said
>>>>>>>>>>>>>> that, in most cases it should be relatively easy to estimate how much
>>>>>>>>>>>>>> memory you are going to need and use that... or if that's not possible you
>>>>>>>>>>>>>> can just increase it and try the "set and see" approach. Check for memory
>>>>>>>>>>>>>> leaks as well... (unclosed resources and so on...!)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Regards.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ​A.​
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 8:21 PM, Sa Li <sa...@gmail.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks, Nathan. How much is should be in general?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 10:15 AM, Nathan Leung <
>>>>>>>>>>>>>>> ncleung@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Your worker is allocated a maximum of 768mb of heap. It's
>>>>>>>>>>>>>>>> quite possible that this is not enough. Try increasing Xmx i
>>>>>>>>>>>>>>>> worker.childopts.
>>>>>>>>>>>>>>>> On Mar 5, 2015 1:10 PM, "Sa Li" <sa...@gmail.com>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi, All
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I have been running a trident topology on production
>>>>>>>>>>>>>>>>> server, code is like this:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> topology.newStream("spoutInit", kafkaSpout)
>>>>>>>>>>>>>>>>>                 .each(new Fields("str"),
>>>>>>>>>>>>>>>>>                         new JsonObjectParse(),
>>>>>>>>>>>>>>>>>                         new Fields("eventType", "event"))
>>>>>>>>>>>>>>>>>                 .parallelismHint(pHint)
>>>>>>>>>>>>>>>>>                 .groupBy(new Fields("event"))
>>>>>>>>>>>>>>>>>                 .persistentAggregate(PostgresqlState.newFactory(config), new Fields("eventType"), new EventUpdater(), new Fields("eventWord"))
>>>>>>>>>>>>>>>>>         ;
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>         Config conf = new Config();
>>>>>>>>>>>>>>>>>         conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 1);
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Basically, it does simple things to get data from kafka, parse to different field and write into postgresDB. But in storm UI, I did see such error, "java.lang.OutOfMemoryError: GC overhead limit exceeded". It all happens in same worker of each node - 6703. I understand this is because by default the JVM is configured to throw this error if you are spending more than *98% of the total time in GC and after the GC less than 2% of the heap is recovered*.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I am not sure what is exact cause for memory leak, is it OK by simply increase the heap? Here is my storm.yaml:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> supervisor.slots.ports:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>      - 6700
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>      - 6701
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>      - 6702
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>      - 6703
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> supervisor.childopts: "-Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> worker.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Anyone has similar issues, and what will be the best way
>>>>>>>>>>>>>>>>> to overcome?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> thanks in advance
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> AL
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>
>

Re: java.lang.OutOfMemoryError: GC overhead limit exceeded

Posted by Nathan Leung <nc...@gmail.com>.
I've not modified netty so I can't comment on that.  I would set max spout
pending; try 1000 at first.  This will limit the number of tuples that you
can have in flight simultaneously and therefore limit the amount of memory
used by these tuples and their processing.

On Fri, Mar 6, 2015 at 7:03 PM, Sa Li <sa...@gmail.com> wrote:

> Hi, Nathan
>
> THe log size of that kafka topic is 23515541, each record is about 3K,  I
> check the yaml file, I don't have max spout pending set, so I assume it
> is should be default: topology.max.spout.pending: null
>
> Should I set it to a certain value? Also I sometimes seeing the
> java.nio.channels.ClosedChannelException: null, or  b.s.d.worker [ERROR]
> Error on initialization of server mk-worker
> does this mean I should add
> storm.messaging.netty.server_worker_threads: 1
> storm.messaging.netty.client_worker_threads: 1
> storm.messaging.netty.buffer_size: 5242880 #5MB buffer
> storm.messaging.netty.max_retries: 30  storm.messaging.netty.max_wait_ms:
> 1000  storm.messaging.netty.min_wait_ms: 100
>
> into yaml, and modfiy the values?
>
>
>
> thanks
>
>
>
> On Fri, Mar 6, 2015 at 2:22 PM, Nathan Leung <nc...@gmail.com> wrote:
>
>> How much data do you have in Kafka? How is your max spout pending set? If
>> you have a high max spout pending (or if you emit unanchored tuples) you
>> could be using up a lot of memory.
>> On Mar 6, 2015 5:14 PM, "Sa Li" <sa...@gmail.com> wrote:
>>
>>> Hi, Nathan
>>>
>>> I have met a strange issue, when I set spoutConf.forceFromStart=true, it
>>> will quickly run into GC overhead limit, even I already increase the heap
>>> size, but I if I remove this setting
>>> it will work fine, I was thinking maybe the kafkaSpout consuming data
>>> much faster than the data being written into postgresDB, and data will
>>> quick take the memory and causing heap overflow. But I did the same test on
>>> my DEV cluster, it will working fine, even I set
>>> spoutConf.forceFromStart=true. I check the storm config for DEV and
>>> production, they are all same.
>>>
>>> Any hints?
>>>
>>> thanks
>>>
>>> AL
>>>
>>>
>>> On Thu, Mar 5, 2015 at 3:26 PM, Nathan Leung <nc...@gmail.com> wrote:
>>>
>>>> I don't see anything glaring.  I would try increasing heap size.  It
>>>> could be that you're right on the threshold of what you've allocated and
>>>> you just need more memory.
>>>>
>>>> On Thu, Mar 5, 2015 at 5:41 PM, Sa Li <sa...@gmail.com> wrote:
>>>>
>>>>> Hi, All,
>>>>> ,
>>>>> I kind locate where the problem come from, in my running command, I
>>>>> will specify the clientid of TridentKafkaConfig, if I keep the clientid as
>>>>> the one I used before, it will cause GC error, otherwise I am completely
>>>>> OK. Here is the code:
>>>>>
>>>>> if (parameters.containsKey("clientid")) {
>>>>>     logger.info("topic=>" + parameters.get("clientid") + "/" + parameters.get("topic"));
>>>>>     spoutConf = new TridentKafkaConfig(zk, parameters.get("topic"), parameters.get("clientid"));
>>>>>
>>>>> Any idea about this error?
>>>>>
>>>>>
>>>>> Thanks
>>>>>
>>>>>
>>>>> AL
>>>>>
>>>>>
>>>>> On Thu, Mar 5, 2015 at 12:02 PM, Sa Li <sa...@gmail.com> wrote:
>>>>>
>>>>>> Sorry, continue last thread:
>>>>>>
>>>>>> 2015-03-05T11:48:08.418-0800 b.s.util [ERROR] Async loop died!
>>>>>> java.lang.RuntimeException: java.lang.RuntimeException: Remote
>>>>>> address is not reachable. We will close this client
>>>>>> Netty-Client-complicated-laugh/10.100.98.103:6703
>>>>>>         at
>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>         at
>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>         at
>>>>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>         at
>>>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>         at
>>>>>> backtype.storm.util$async_loop$fn__464.invoke(util.clj:463)
>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>>>>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
>>>>>> Caused by: java.lang.RuntimeException: Remote address is not
>>>>>> reachable. We will close this client Netty-Client-complicated-laugh/
>>>>>> 10.100.98.103:6703
>>>>>>         at
>>>>>> backtype.storm.messaging.netty.Client.connect(Client.java:171)
>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>         at
>>>>>> backtype.storm.messaging.netty.Client.send(Client.java:194)
>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>         at
>>>>>> backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54)
>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>         at
>>>>>> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730$fn__3731.invoke(worker.clj:330)
>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>         at
>>>>>> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730.invoke(worker.clj:328)
>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>         at
>>>>>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>         at
>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
>>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>>         ... 6 common frames omitted
>>>>>> 2015-03-05T11:48:08.423-0800 b.s.util [ERROR] Halting process:
>>>>>> ("Async loop died!")
>>>>>> java.lang.RuntimeException: ("Async loop died!")
>>>>>>         at
>>>>>> backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325)
>>>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>>>         at clojure.lang.RestFn.invoke(RestFn.java:423)
>>>>>> [clojure-1.5.1.jar:na]
>>>>>>         at
>>>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1458.invoke(disruptor.clj:92)
>>>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>>>         at
>>>>>> backtype.storm.util$async_loop$fn__464.invoke(util.clj:473)
>>>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>>>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>>>>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
>>>>>> 2015-03-05T11:48:08.425-0800 b.s.d.worker [INFO] Shutting down worker
>>>>>> eventsStreamerv1-48-1425499636 0673ece0-cea2-4185-9b3e-6c49ad585576 6703
>>>>>> 2015-03-05T11:48:08.426-0800 b.s.m.n.Client [INFO] Closing Netty
>>>>>> Client Netty-Client-beloved-judge/10.100.98.104:6703
>>>>>>
>>>>>> I doubt this is caused by my eventUpfater, which write data in batch
>>>>>>
>>>>>> static class EventUpdater implements ReducerAggregator<List<String>> {
>>>>>>
>>>>>>             @Override
>>>>>>             public List<String> init(){
>>>>>>                      return null;
>>>>>>             }
>>>>>>
>>>>>>             @Override
>>>>>>             public List<String> reduce(List<String> curr, TridentTuple tuple) {
>>>>>>                    List<String> updated = null ;
>>>>>>
>>>>>>                    if ( curr == null ) {
>>>>>>                                     String event = (String) tuple.getValue(1);
>>>>>>                                     System.out.println("===:" + event + ":");
>>>>>>                                     updated = Lists.newArrayList(event);
>>>>>>                    } else {
>>>>>>                                     System.out.println("===+" +  tuple + ":");
>>>>>>                                     updated = curr ;
>>>>>>                    }
>>>>>> //              System.out.println("(())");
>>>>>>               return updated ;
>>>>>>             }
>>>>>>         }
>>>>>>
>>>>>> How do you think
>>>>>>
>>>>>> THanks
>>>>>>
>>>>>> On Thu, Mar 5, 2015 at 11:57 AM, Sa Li <sa...@gmail.com> wrote:
>>>>>>
>>>>>>> Thank you very much for the reply, here is error I saw in production
>>>>>>> server worker-6703.log,
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Mar 5, 2015 at 11:31 AM, Nathan Leung <nc...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Yeah, then in this case maybe you can install JDK / Yourkit in the
>>>>>>>> remote machines and run the tools over X or something.  I'm assuming this
>>>>>>>> is a development cluster (not live / production) and that installing
>>>>>>>> debugging tools and running remote UIs etc is not a problem.  :)
>>>>>>>>
>>>>>>>> On Thu, Mar 5, 2015 at 1:52 PM, Andrew Xor <
>>>>>>>> andreas.grammenos@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Nathan I think that if he wants to profile a bolt per se that runs
>>>>>>>>> in a worker that resides in a different cluster node than the one the
>>>>>>>>> profiling tool runs he won't be able to attach the process since it resides
>>>>>>>>> in a different physical machine, me thinks (well, now that I think of it
>>>>>>>>> better it can be done... via remote debugging but that's just a pain in the
>>>>>>>>> ***).
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>>
>>>>>>>>> A.
>>>>>>>>>
>>>>>>>>> On Thu, Mar 5, 2015 at 8:46 PM, Nathan Leung <nc...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> You don't need to change your code. As Andrew mentioned you can
>>>>>>>>>> get a lot of mileage by profiling your logic in a standalone program. For
>>>>>>>>>> jvisualvm, you can just run your program (a loop that runs for a long time
>>>>>>>>>> is best) then attach to the running process with jvisualvm.  It's pretty
>>>>>>>>>> straightforward to use and you can also find good guides with a Google
>>>>>>>>>> search.
>>>>>>>>>> On Mar 5, 2015 1:43 PM, "Andrew Xor" <an...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> ​
>>>>>>>>>>> Well...  detecting memory leaks in Java is a bit tricky as Java
>>>>>>>>>>> does a lot for you. Generally though, as long as you avoid using "new"
>>>>>>>>>>> operator and close any resources that you do not use you should be fine...
>>>>>>>>>>> but a Profiler such as the ones mentioned by Nathan will tell you the whole
>>>>>>>>>>> truth. YourKit is awesome and has a free trial, go ahead and test drive it.
>>>>>>>>>>> I am pretty sure that you need a working jar (or compilable code that has a
>>>>>>>>>>> main function in it) in order to profile it, although if you want to
>>>>>>>>>>> profile your bolts and spouts is a bit tricker. Hopefully your algorithm
>>>>>>>>>>> (or portions of it) can be put in a sample test program that is able to be
>>>>>>>>>>> executed locally for you to profile it.
>>>>>>>>>>>
>>>>>>>>>>> Hope this helped. Regards,
>>>>>>>>>>>
>>>>>>>>>>> A.
>>>>>>>>>>> ​
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Mar 5, 2015 at 8:33 PM, Sa Li <sa...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Mar 5, 2015 at 10:26 AM, Andrew Xor <
>>>>>>>>>>>> andreas.grammenos@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Unfortunately that is not fixed, it depends on the
>>>>>>>>>>>>> computations and data-structures you have; in my case for example I use
>>>>>>>>>>>>> more than 2GB since I need to keep a large matrix in memory... having said
>>>>>>>>>>>>> that, in most cases it should be relatively easy to estimate how much
>>>>>>>>>>>>> memory you are going to need and use that... or if that's not possible you
>>>>>>>>>>>>> can just increase it and try the "set and see" approach. Check for memory
>>>>>>>>>>>>> leaks as well... (unclosed resources and so on...!)
>>>>>>>>>>>>>
>>>>>>>>>>>>> Regards.
>>>>>>>>>>>>>
>>>>>>>>>>>>> ​A.​
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 8:21 PM, Sa Li <sa...@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks, Nathan. How much is should be in general?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 10:15 AM, Nathan Leung <
>>>>>>>>>>>>>> ncleung@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Your worker is allocated a maximum of 768mb of heap. It's
>>>>>>>>>>>>>>> quite possible that this is not enough. Try increasing Xmx i
>>>>>>>>>>>>>>> worker.childopts.
>>>>>>>>>>>>>>> On Mar 5, 2015 1:10 PM, "Sa Li" <sa...@gmail.com>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi, All
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I have been running a trident topology on production
>>>>>>>>>>>>>>>> server, code is like this:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> topology.newStream("spoutInit", kafkaSpout)
>>>>>>>>>>>>>>>>                 .each(new Fields("str"),
>>>>>>>>>>>>>>>>                         new JsonObjectParse(),
>>>>>>>>>>>>>>>>                         new Fields("eventType", "event"))
>>>>>>>>>>>>>>>>                 .parallelismHint(pHint)
>>>>>>>>>>>>>>>>                 .groupBy(new Fields("event"))
>>>>>>>>>>>>>>>>                 .persistentAggregate(PostgresqlState.newFactory(config), new Fields("eventType"), new EventUpdater(), new Fields("eventWord"))
>>>>>>>>>>>>>>>>         ;
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>         Config conf = new Config();
>>>>>>>>>>>>>>>>         conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 1);
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Basically, it does simple things to get data from kafka, parse to different field and write into postgresDB. But in storm UI, I did see such error, "java.lang.OutOfMemoryError: GC overhead limit exceeded". It all happens in same worker of each node - 6703. I understand this is because by default the JVM is configured to throw this error if you are spending more than *98% of the total time in GC and after the GC less than 2% of the heap is recovered*.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I am not sure what is exact cause for memory leak, is it OK by simply increase the heap? Here is my storm.yaml:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> supervisor.slots.ports:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>      - 6700
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>      - 6701
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>      - 6702
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>      - 6703
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> supervisor.childopts: "-Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> worker.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Anyone has similar issues, and what will be the best way to
>>>>>>>>>>>>>>>> overcome?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> thanks in advance
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> AL
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>

Re: java.lang.OutOfMemoryError: GC overhead limit exceeded

Posted by Sa Li <sa...@gmail.com>.
Hi, Nathan

THe log size of that kafka topic is 23515541, each record is about 3K,  I
check the yaml file, I don't have max spout pending set, so I assume it is
should be default: topology.max.spout.pending: null

Should I set it to a certain value? Also I sometimes seeing the
java.nio.channels.ClosedChannelException: null, or  b.s.d.worker [ERROR]
Error on initialization of server mk-worker
does this mean I should add
storm.messaging.netty.server_worker_threads: 1
storm.messaging.netty.client_worker_threads: 1
storm.messaging.netty.buffer_size: 5242880 #5MB buffer
storm.messaging.netty.max_retries: 30  storm.messaging.netty.max_wait_ms:
1000  storm.messaging.netty.min_wait_ms: 100

into yaml, and modfiy the values?



thanks



On Fri, Mar 6, 2015 at 2:22 PM, Nathan Leung <nc...@gmail.com> wrote:

> How much data do you have in Kafka? How is your max spout pending set? If
> you have a high max spout pending (or if you emit unanchored tuples) you
> could be using up a lot of memory.
> On Mar 6, 2015 5:14 PM, "Sa Li" <sa...@gmail.com> wrote:
>
>> Hi, Nathan
>>
>> I have met a strange issue, when I set spoutConf.forceFromStart=true, it
>> will quickly run into GC overhead limit, even I already increase the heap
>> size, but I if I remove this setting
>> it will work fine, I was thinking maybe the kafkaSpout consuming data
>> much faster than the data being written into postgresDB, and data will
>> quick take the memory and causing heap overflow. But I did the same test on
>> my DEV cluster, it will working fine, even I set
>> spoutConf.forceFromStart=true. I check the storm config for DEV and
>> production, they are all same.
>>
>> Any hints?
>>
>> thanks
>>
>> AL
>>
>>
>> On Thu, Mar 5, 2015 at 3:26 PM, Nathan Leung <nc...@gmail.com> wrote:
>>
>>> I don't see anything glaring.  I would try increasing heap size.  It
>>> could be that you're right on the threshold of what you've allocated and
>>> you just need more memory.
>>>
>>> On Thu, Mar 5, 2015 at 5:41 PM, Sa Li <sa...@gmail.com> wrote:
>>>
>>>> Hi, All,
>>>> ,
>>>> I kind locate where the problem come from, in my running command, I
>>>> will specify the clientid of TridentKafkaConfig, if I keep the clientid as
>>>> the one I used before, it will cause GC error, otherwise I am completely
>>>> OK. Here is the code:
>>>>
>>>> if (parameters.containsKey("clientid")) {
>>>>     logger.info("topic=>" + parameters.get("clientid") + "/" + parameters.get("topic"));
>>>>     spoutConf = new TridentKafkaConfig(zk, parameters.get("topic"), parameters.get("clientid"));
>>>>
>>>> Any idea about this error?
>>>>
>>>>
>>>> Thanks
>>>>
>>>>
>>>> AL
>>>>
>>>>
>>>> On Thu, Mar 5, 2015 at 12:02 PM, Sa Li <sa...@gmail.com> wrote:
>>>>
>>>>> Sorry, continue last thread:
>>>>>
>>>>> 2015-03-05T11:48:08.418-0800 b.s.util [ERROR] Async loop died!
>>>>> java.lang.RuntimeException: java.lang.RuntimeException: Remote address
>>>>> is not reachable. We will close this client Netty-Client-complicated-laugh/
>>>>> 10.100.98.103:6703
>>>>>         at
>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>         at
>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>         at
>>>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>         at
>>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>         at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463)
>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>>>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
>>>>> Caused by: java.lang.RuntimeException: Remote address is not
>>>>> reachable. We will close this client Netty-Client-complicated-laugh/
>>>>> 10.100.98.103:6703
>>>>>         at
>>>>> backtype.storm.messaging.netty.Client.connect(Client.java:171)
>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>         at backtype.storm.messaging.netty.Client.send(Client.java:194)
>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>         at
>>>>> backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54)
>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>         at
>>>>> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730$fn__3731.invoke(worker.clj:330)
>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>         at
>>>>> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730.invoke(worker.clj:328)
>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>         at
>>>>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>         at
>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
>>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>>         ... 6 common frames omitted
>>>>> 2015-03-05T11:48:08.423-0800 b.s.util [ERROR] Halting process: ("Async
>>>>> loop died!")
>>>>> java.lang.RuntimeException: ("Async loop died!")
>>>>>         at
>>>>> backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325)
>>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>>         at clojure.lang.RestFn.invoke(RestFn.java:423)
>>>>> [clojure-1.5.1.jar:na]
>>>>>         at
>>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1458.invoke(disruptor.clj:92)
>>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>>         at backtype.storm.util$async_loop$fn__464.invoke(util.clj:473)
>>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>>>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
>>>>> 2015-03-05T11:48:08.425-0800 b.s.d.worker [INFO] Shutting down worker
>>>>> eventsStreamerv1-48-1425499636 0673ece0-cea2-4185-9b3e-6c49ad585576 6703
>>>>> 2015-03-05T11:48:08.426-0800 b.s.m.n.Client [INFO] Closing Netty
>>>>> Client Netty-Client-beloved-judge/10.100.98.104:6703
>>>>>
>>>>> I doubt this is caused by my eventUpfater, which write data in batch
>>>>>
>>>>> static class EventUpdater implements ReducerAggregator<List<String>> {
>>>>>
>>>>>             @Override
>>>>>             public List<String> init(){
>>>>>                      return null;
>>>>>             }
>>>>>
>>>>>             @Override
>>>>>             public List<String> reduce(List<String> curr, TridentTuple tuple) {
>>>>>                    List<String> updated = null ;
>>>>>
>>>>>                    if ( curr == null ) {
>>>>>                                     String event = (String) tuple.getValue(1);
>>>>>                                     System.out.println("===:" + event + ":");
>>>>>                                     updated = Lists.newArrayList(event);
>>>>>                    } else {
>>>>>                                     System.out.println("===+" +  tuple + ":");
>>>>>                                     updated = curr ;
>>>>>                    }
>>>>> //              System.out.println("(())");
>>>>>               return updated ;
>>>>>             }
>>>>>         }
>>>>>
>>>>> How do you think
>>>>>
>>>>> THanks
>>>>>
>>>>> On Thu, Mar 5, 2015 at 11:57 AM, Sa Li <sa...@gmail.com> wrote:
>>>>>
>>>>>> Thank you very much for the reply, here is error I saw in production
>>>>>> server worker-6703.log,
>>>>>>
>>>>>>
>>>>>> On Thu, Mar 5, 2015 at 11:31 AM, Nathan Leung <nc...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Yeah, then in this case maybe you can install JDK / Yourkit in the
>>>>>>> remote machines and run the tools over X or something.  I'm assuming this
>>>>>>> is a development cluster (not live / production) and that installing
>>>>>>> debugging tools and running remote UIs etc is not a problem.  :)
>>>>>>>
>>>>>>> On Thu, Mar 5, 2015 at 1:52 PM, Andrew Xor <
>>>>>>> andreas.grammenos@gmail.com> wrote:
>>>>>>>
>>>>>>>> Nathan I think that if he wants to profile a bolt per se that runs
>>>>>>>> in a worker that resides in a different cluster node than the one the
>>>>>>>> profiling tool runs he won't be able to attach the process since it resides
>>>>>>>> in a different physical machine, me thinks (well, now that I think of it
>>>>>>>> better it can be done... via remote debugging but that's just a pain in the
>>>>>>>> ***).
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>>
>>>>>>>> A.
>>>>>>>>
>>>>>>>> On Thu, Mar 5, 2015 at 8:46 PM, Nathan Leung <nc...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> You don't need to change your code. As Andrew mentioned you can
>>>>>>>>> get a lot of mileage by profiling your logic in a standalone program. For
>>>>>>>>> jvisualvm, you can just run your program (a loop that runs for a long time
>>>>>>>>> is best) then attach to the running process with jvisualvm.  It's pretty
>>>>>>>>> straightforward to use and you can also find good guides with a Google
>>>>>>>>> search.
>>>>>>>>> On Mar 5, 2015 1:43 PM, "Andrew Xor" <an...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> ​
>>>>>>>>>> Well...  detecting memory leaks in Java is a bit tricky as Java
>>>>>>>>>> does a lot for you. Generally though, as long as you avoid using "new"
>>>>>>>>>> operator and close any resources that you do not use you should be fine...
>>>>>>>>>> but a Profiler such as the ones mentioned by Nathan will tell you the whole
>>>>>>>>>> truth. YourKit is awesome and has a free trial, go ahead and test drive it.
>>>>>>>>>> I am pretty sure that you need a working jar (or compilable code that has a
>>>>>>>>>> main function in it) in order to profile it, although if you want to
>>>>>>>>>> profile your bolts and spouts is a bit tricker. Hopefully your algorithm
>>>>>>>>>> (or portions of it) can be put in a sample test program that is able to be
>>>>>>>>>> executed locally for you to profile it.
>>>>>>>>>>
>>>>>>>>>> Hope this helped. Regards,
>>>>>>>>>>
>>>>>>>>>> A.
>>>>>>>>>> ​
>>>>>>>>>>
>>>>>>>>>> On Thu, Mar 5, 2015 at 8:33 PM, Sa Li <sa...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Mar 5, 2015 at 10:26 AM, Andrew Xor <
>>>>>>>>>>> andreas.grammenos@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Unfortunately that is not fixed, it depends on the computations
>>>>>>>>>>>> and data-structures you have; in my case for example I use more than 2GB
>>>>>>>>>>>> since I need to keep a large matrix in memory... having said that, in most
>>>>>>>>>>>> cases it should be relatively easy to estimate how much memory you are
>>>>>>>>>>>> going to need and use that... or if that's not possible you can just
>>>>>>>>>>>> increase it and try the "set and see" approach. Check for memory leaks as
>>>>>>>>>>>> well... (unclosed resources and so on...!)
>>>>>>>>>>>>
>>>>>>>>>>>> Regards.
>>>>>>>>>>>>
>>>>>>>>>>>> ​A.​
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Mar 5, 2015 at 8:21 PM, Sa Li <sa...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks, Nathan. How much is should be in general?
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Mar 5, 2015 at 10:15 AM, Nathan Leung <
>>>>>>>>>>>>> ncleung@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Your worker is allocated a maximum of 768mb of heap. It's
>>>>>>>>>>>>>> quite possible that this is not enough. Try increasing Xmx i
>>>>>>>>>>>>>> worker.childopts.
>>>>>>>>>>>>>> On Mar 5, 2015 1:10 PM, "Sa Li" <sa...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi, All
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I have been running a trident topology on production server,
>>>>>>>>>>>>>>> code is like this:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> topology.newStream("spoutInit", kafkaSpout)
>>>>>>>>>>>>>>>                 .each(new Fields("str"),
>>>>>>>>>>>>>>>                         new JsonObjectParse(),
>>>>>>>>>>>>>>>                         new Fields("eventType", "event"))
>>>>>>>>>>>>>>>                 .parallelismHint(pHint)
>>>>>>>>>>>>>>>                 .groupBy(new Fields("event"))
>>>>>>>>>>>>>>>                 .persistentAggregate(PostgresqlState.newFactory(config), new Fields("eventType"), new EventUpdater(), new Fields("eventWord"))
>>>>>>>>>>>>>>>         ;
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>         Config conf = new Config();
>>>>>>>>>>>>>>>         conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 1);
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Basically, it does simple things to get data from kafka, parse to different field and write into postgresDB. But in storm UI, I did see such error, "java.lang.OutOfMemoryError: GC overhead limit exceeded". It all happens in same worker of each node - 6703. I understand this is because by default the JVM is configured to throw this error if you are spending more than *98% of the total time in GC and after the GC less than 2% of the heap is recovered*.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I am not sure what is exact cause for memory leak, is it OK by simply increase the heap? Here is my storm.yaml:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> supervisor.slots.ports:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>      - 6700
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>      - 6701
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>      - 6702
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>      - 6703
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> supervisor.childopts: "-Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> worker.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Anyone has similar issues, and what will be the best way to
>>>>>>>>>>>>>>> overcome?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> thanks in advance
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> AL
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>

Re: java.lang.OutOfMemoryError: GC overhead limit exceeded

Posted by Nathan Leung <nc...@gmail.com>.
How much data do you have in Kafka? How is your max spout pending set? If
you have a high max spout pending (or if you emit unanchored tuples) you
could be using up a lot of memory.
On Mar 6, 2015 5:14 PM, "Sa Li" <sa...@gmail.com> wrote:

> Hi, Nathan
>
> I have met a strange issue, when I set spoutConf.forceFromStart=true, it
> will quickly run into GC overhead limit, even I already increase the heap
> size, but I if I remove this setting
> it will work fine, I was thinking maybe the kafkaSpout consuming data much
> faster than the data being written into postgresDB, and data will quick
> take the memory and causing heap overflow. But I did the same test on my
> DEV cluster, it will working fine, even I set
> spoutConf.forceFromStart=true. I check the storm config for DEV and
> production, they are all same.
>
> Any hints?
>
> thanks
>
> AL
>
>
> On Thu, Mar 5, 2015 at 3:26 PM, Nathan Leung <nc...@gmail.com> wrote:
>
>> I don't see anything glaring.  I would try increasing heap size.  It
>> could be that you're right on the threshold of what you've allocated and
>> you just need more memory.
>>
>> On Thu, Mar 5, 2015 at 5:41 PM, Sa Li <sa...@gmail.com> wrote:
>>
>>> Hi, All,
>>> ,
>>> I kind locate where the problem come from, in my running command, I will
>>> specify the clientid of TridentKafkaConfig, if I keep the clientid as the
>>> one I used before, it will cause GC error, otherwise I am completely OK.
>>> Here is the code:
>>>
>>> if (parameters.containsKey("clientid")) {
>>>     logger.info("topic=>" + parameters.get("clientid") + "/" + parameters.get("topic"));
>>>     spoutConf = new TridentKafkaConfig(zk, parameters.get("topic"), parameters.get("clientid"));
>>>
>>> Any idea about this error?
>>>
>>>
>>> Thanks
>>>
>>>
>>> AL
>>>
>>>
>>> On Thu, Mar 5, 2015 at 12:02 PM, Sa Li <sa...@gmail.com> wrote:
>>>
>>>> Sorry, continue last thread:
>>>>
>>>> 2015-03-05T11:48:08.418-0800 b.s.util [ERROR] Async loop died!
>>>> java.lang.RuntimeException: java.lang.RuntimeException: Remote address
>>>> is not reachable. We will close this client Netty-Client-complicated-laugh/
>>>> 10.100.98.103:6703
>>>>         at
>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>         at
>>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>         at
>>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>         at
>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>         at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463)
>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
>>>> Caused by: java.lang.RuntimeException: Remote address is not reachable.
>>>> We will close this client Netty-Client-complicated-laugh/
>>>> 10.100.98.103:6703
>>>>         at
>>>> backtype.storm.messaging.netty.Client.connect(Client.java:171)
>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>         at backtype.storm.messaging.netty.Client.send(Client.java:194)
>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>         at
>>>> backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54)
>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>         at
>>>> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730$fn__3731.invoke(worker.clj:330)
>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>         at
>>>> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730.invoke(worker.clj:328)
>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>         at
>>>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>         at
>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
>>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>>         ... 6 common frames omitted
>>>> 2015-03-05T11:48:08.423-0800 b.s.util [ERROR] Halting process: ("Async
>>>> loop died!")
>>>> java.lang.RuntimeException: ("Async loop died!")
>>>>         at
>>>> backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325)
>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>         at clojure.lang.RestFn.invoke(RestFn.java:423)
>>>> [clojure-1.5.1.jar:na]
>>>>         at
>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1458.invoke(disruptor.clj:92)
>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>         at backtype.storm.util$async_loop$fn__464.invoke(util.clj:473)
>>>> [storm-core-0.9.3.jar:0.9.3]
>>>>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
>>>> 2015-03-05T11:48:08.425-0800 b.s.d.worker [INFO] Shutting down worker
>>>> eventsStreamerv1-48-1425499636 0673ece0-cea2-4185-9b3e-6c49ad585576 6703
>>>> 2015-03-05T11:48:08.426-0800 b.s.m.n.Client [INFO] Closing Netty Client
>>>> Netty-Client-beloved-judge/10.100.98.104:6703
>>>>
>>>> I doubt this is caused by my eventUpfater, which write data in batch
>>>>
>>>> static class EventUpdater implements ReducerAggregator<List<String>> {
>>>>
>>>>             @Override
>>>>             public List<String> init(){
>>>>                      return null;
>>>>             }
>>>>
>>>>             @Override
>>>>             public List<String> reduce(List<String> curr, TridentTuple tuple) {
>>>>                    List<String> updated = null ;
>>>>
>>>>                    if ( curr == null ) {
>>>>                                     String event = (String) tuple.getValue(1);
>>>>                                     System.out.println("===:" + event + ":");
>>>>                                     updated = Lists.newArrayList(event);
>>>>                    } else {
>>>>                                     System.out.println("===+" +  tuple + ":");
>>>>                                     updated = curr ;
>>>>                    }
>>>> //              System.out.println("(())");
>>>>               return updated ;
>>>>             }
>>>>         }
>>>>
>>>> How do you think
>>>>
>>>> THanks
>>>>
>>>> On Thu, Mar 5, 2015 at 11:57 AM, Sa Li <sa...@gmail.com> wrote:
>>>>
>>>>> Thank you very much for the reply, here is error I saw in production
>>>>> server worker-6703.log,
>>>>>
>>>>>
>>>>> On Thu, Mar 5, 2015 at 11:31 AM, Nathan Leung <nc...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Yeah, then in this case maybe you can install JDK / Yourkit in the
>>>>>> remote machines and run the tools over X or something.  I'm assuming this
>>>>>> is a development cluster (not live / production) and that installing
>>>>>> debugging tools and running remote UIs etc is not a problem.  :)
>>>>>>
>>>>>> On Thu, Mar 5, 2015 at 1:52 PM, Andrew Xor <
>>>>>> andreas.grammenos@gmail.com> wrote:
>>>>>>
>>>>>>> Nathan I think that if he wants to profile a bolt per se that runs
>>>>>>> in a worker that resides in a different cluster node than the one the
>>>>>>> profiling tool runs he won't be able to attach the process since it resides
>>>>>>> in a different physical machine, me thinks (well, now that I think of it
>>>>>>> better it can be done... via remote debugging but that's just a pain in the
>>>>>>> ***).
>>>>>>>
>>>>>>> Regards,
>>>>>>>
>>>>>>> A.
>>>>>>>
>>>>>>> On Thu, Mar 5, 2015 at 8:46 PM, Nathan Leung <nc...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> You don't need to change your code. As Andrew mentioned you can get
>>>>>>>> a lot of mileage by profiling your logic in a standalone program. For
>>>>>>>> jvisualvm, you can just run your program (a loop that runs for a long time
>>>>>>>> is best) then attach to the running process with jvisualvm.  It's pretty
>>>>>>>> straightforward to use and you can also find good guides with a Google
>>>>>>>> search.
>>>>>>>> On Mar 5, 2015 1:43 PM, "Andrew Xor" <an...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> ​
>>>>>>>>> Well...  detecting memory leaks in Java is a bit tricky as Java
>>>>>>>>> does a lot for you. Generally though, as long as you avoid using "new"
>>>>>>>>> operator and close any resources that you do not use you should be fine...
>>>>>>>>> but a Profiler such as the ones mentioned by Nathan will tell you the whole
>>>>>>>>> truth. YourKit is awesome and has a free trial, go ahead and test drive it.
>>>>>>>>> I am pretty sure that you need a working jar (or compilable code that has a
>>>>>>>>> main function in it) in order to profile it, although if you want to
>>>>>>>>> profile your bolts and spouts is a bit tricker. Hopefully your algorithm
>>>>>>>>> (or portions of it) can be put in a sample test program that is able to be
>>>>>>>>> executed locally for you to profile it.
>>>>>>>>>
>>>>>>>>> Hope this helped. Regards,
>>>>>>>>>
>>>>>>>>> A.
>>>>>>>>> ​
>>>>>>>>>
>>>>>>>>> On Thu, Mar 5, 2015 at 8:33 PM, Sa Li <sa...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Mar 5, 2015 at 10:26 AM, Andrew Xor <
>>>>>>>>>> andreas.grammenos@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Unfortunately that is not fixed, it depends on the computations
>>>>>>>>>>> and data-structures you have; in my case for example I use more than 2GB
>>>>>>>>>>> since I need to keep a large matrix in memory... having said that, in most
>>>>>>>>>>> cases it should be relatively easy to estimate how much memory you are
>>>>>>>>>>> going to need and use that... or if that's not possible you can just
>>>>>>>>>>> increase it and try the "set and see" approach. Check for memory leaks as
>>>>>>>>>>> well... (unclosed resources and so on...!)
>>>>>>>>>>>
>>>>>>>>>>> Regards.
>>>>>>>>>>>
>>>>>>>>>>> ​A.​
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Mar 5, 2015 at 8:21 PM, Sa Li <sa...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thanks, Nathan. How much is should be in general?
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Mar 5, 2015 at 10:15 AM, Nathan Leung <
>>>>>>>>>>>> ncleung@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Your worker is allocated a maximum of 768mb of heap. It's
>>>>>>>>>>>>> quite possible that this is not enough. Try increasing Xmx i
>>>>>>>>>>>>> worker.childopts.
>>>>>>>>>>>>> On Mar 5, 2015 1:10 PM, "Sa Li" <sa...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi, All
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I have been running a trident topology on production server,
>>>>>>>>>>>>>> code is like this:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> topology.newStream("spoutInit", kafkaSpout)
>>>>>>>>>>>>>>                 .each(new Fields("str"),
>>>>>>>>>>>>>>                         new JsonObjectParse(),
>>>>>>>>>>>>>>                         new Fields("eventType", "event"))
>>>>>>>>>>>>>>                 .parallelismHint(pHint)
>>>>>>>>>>>>>>                 .groupBy(new Fields("event"))
>>>>>>>>>>>>>>                 .persistentAggregate(PostgresqlState.newFactory(config), new Fields("eventType"), new EventUpdater(), new Fields("eventWord"))
>>>>>>>>>>>>>>         ;
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>         Config conf = new Config();
>>>>>>>>>>>>>>         conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 1);
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Basically, it does simple things to get data from kafka, parse to different field and write into postgresDB. But in storm UI, I did see such error, "java.lang.OutOfMemoryError: GC overhead limit exceeded". It all happens in same worker of each node - 6703. I understand this is because by default the JVM is configured to throw this error if you are spending more than *98% of the total time in GC and after the GC less than 2% of the heap is recovered*.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I am not sure what is exact cause for memory leak, is it OK by simply increase the heap? Here is my storm.yaml:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> supervisor.slots.ports:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>      - 6700
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>      - 6701
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>      - 6702
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>      - 6703
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> supervisor.childopts: "-Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> worker.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Anyone has similar issues, and what will be the best way to
>>>>>>>>>>>>>> overcome?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> thanks in advance
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> AL
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: java.lang.OutOfMemoryError: GC overhead limit exceeded

Posted by Sa Li <sa...@gmail.com>.
Hi, Nathan

I have met a strange issue, when I set spoutConf.forceFromStart=true, it
will quickly run into GC overhead limit, even I already increase the heap
size, but I if I remove this setting
it will work fine, I was thinking maybe the kafkaSpout consuming data much
faster than the data being written into postgresDB, and data will quick
take the memory and causing heap overflow. But I did the same test on my
DEV cluster, it will working fine, even I set
spoutConf.forceFromStart=true. I check the storm config for DEV and
production, they are all same.

Any hints?

thanks

AL


On Thu, Mar 5, 2015 at 3:26 PM, Nathan Leung <nc...@gmail.com> wrote:

> I don't see anything glaring.  I would try increasing heap size.  It could
> be that you're right on the threshold of what you've allocated and you just
> need more memory.
>
> On Thu, Mar 5, 2015 at 5:41 PM, Sa Li <sa...@gmail.com> wrote:
>
>> Hi, All,
>> ,
>> I kind locate where the problem come from, in my running command, I will
>> specify the clientid of TridentKafkaConfig, if I keep the clientid as the
>> one I used before, it will cause GC error, otherwise I am completely OK.
>> Here is the code:
>>
>> if (parameters.containsKey("clientid")) {
>>     logger.info("topic=>" + parameters.get("clientid") + "/" + parameters.get("topic"));
>>     spoutConf = new TridentKafkaConfig(zk, parameters.get("topic"), parameters.get("clientid"));
>>
>> Any idea about this error?
>>
>>
>> Thanks
>>
>>
>> AL
>>
>>
>> On Thu, Mar 5, 2015 at 12:02 PM, Sa Li <sa...@gmail.com> wrote:
>>
>>> Sorry, continue last thread:
>>>
>>> 2015-03-05T11:48:08.418-0800 b.s.util [ERROR] Async loop died!
>>> java.lang.RuntimeException: java.lang.RuntimeException: Remote address
>>> is not reachable. We will close this client Netty-Client-complicated-laugh/
>>> 10.100.98.103:6703
>>>         at
>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>         at
>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>         at
>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>         at
>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>         at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463)
>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
>>> Caused by: java.lang.RuntimeException: Remote address is not reachable.
>>> We will close this client Netty-Client-complicated-laugh/
>>> 10.100.98.103:6703
>>>         at
>>> backtype.storm.messaging.netty.Client.connect(Client.java:171)
>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>         at backtype.storm.messaging.netty.Client.send(Client.java:194)
>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>         at
>>> backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54)
>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>         at
>>> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730$fn__3731.invoke(worker.clj:330)
>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>         at
>>> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730.invoke(worker.clj:328)
>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>         at
>>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>         at
>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
>>> ~[storm-core-0.9.3.jar:0.9.3]
>>>         ... 6 common frames omitted
>>> 2015-03-05T11:48:08.423-0800 b.s.util [ERROR] Halting process: ("Async
>>> loop died!")
>>> java.lang.RuntimeException: ("Async loop died!")
>>>         at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325)
>>> [storm-core-0.9.3.jar:0.9.3]
>>>         at clojure.lang.RestFn.invoke(RestFn.java:423)
>>> [clojure-1.5.1.jar:na]
>>>         at
>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1458.invoke(disruptor.clj:92)
>>> [storm-core-0.9.3.jar:0.9.3]
>>>         at backtype.storm.util$async_loop$fn__464.invoke(util.clj:473)
>>> [storm-core-0.9.3.jar:0.9.3]
>>>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
>>> 2015-03-05T11:48:08.425-0800 b.s.d.worker [INFO] Shutting down worker
>>> eventsStreamerv1-48-1425499636 0673ece0-cea2-4185-9b3e-6c49ad585576 6703
>>> 2015-03-05T11:48:08.426-0800 b.s.m.n.Client [INFO] Closing Netty Client
>>> Netty-Client-beloved-judge/10.100.98.104:6703
>>>
>>> I doubt this is caused by my eventUpfater, which write data in batch
>>>
>>> static class EventUpdater implements ReducerAggregator<List<String>> {
>>>
>>>             @Override
>>>             public List<String> init(){
>>>                      return null;
>>>             }
>>>
>>>             @Override
>>>             public List<String> reduce(List<String> curr, TridentTuple tuple) {
>>>                    List<String> updated = null ;
>>>
>>>                    if ( curr == null ) {
>>>                                     String event = (String) tuple.getValue(1);
>>>                                     System.out.println("===:" + event + ":");
>>>                                     updated = Lists.newArrayList(event);
>>>                    } else {
>>>                                     System.out.println("===+" +  tuple + ":");
>>>                                     updated = curr ;
>>>                    }
>>> //              System.out.println("(())");
>>>               return updated ;
>>>             }
>>>         }
>>>
>>> How do you think
>>>
>>> THanks
>>>
>>> On Thu, Mar 5, 2015 at 11:57 AM, Sa Li <sa...@gmail.com> wrote:
>>>
>>>> Thank you very much for the reply, here is error I saw in production
>>>> server worker-6703.log,
>>>>
>>>>
>>>> On Thu, Mar 5, 2015 at 11:31 AM, Nathan Leung <nc...@gmail.com>
>>>> wrote:
>>>>
>>>>> Yeah, then in this case maybe you can install JDK / Yourkit in the
>>>>> remote machines and run the tools over X or something.  I'm assuming this
>>>>> is a development cluster (not live / production) and that installing
>>>>> debugging tools and running remote UIs etc is not a problem.  :)
>>>>>
>>>>> On Thu, Mar 5, 2015 at 1:52 PM, Andrew Xor <
>>>>> andreas.grammenos@gmail.com> wrote:
>>>>>
>>>>>> Nathan I think that if he wants to profile a bolt per se that runs in
>>>>>> a worker that resides in a different cluster node than the one the
>>>>>> profiling tool runs he won't be able to attach the process since it resides
>>>>>> in a different physical machine, me thinks (well, now that I think of it
>>>>>> better it can be done... via remote debugging but that's just a pain in the
>>>>>> ***).
>>>>>>
>>>>>> Regards,
>>>>>>
>>>>>> A.
>>>>>>
>>>>>> On Thu, Mar 5, 2015 at 8:46 PM, Nathan Leung <nc...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> You don't need to change your code. As Andrew mentioned you can get
>>>>>>> a lot of mileage by profiling your logic in a standalone program. For
>>>>>>> jvisualvm, you can just run your program (a loop that runs for a long time
>>>>>>> is best) then attach to the running process with jvisualvm.  It's pretty
>>>>>>> straightforward to use and you can also find good guides with a Google
>>>>>>> search.
>>>>>>> On Mar 5, 2015 1:43 PM, "Andrew Xor" <an...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> ​
>>>>>>>> Well...  detecting memory leaks in Java is a bit tricky as Java
>>>>>>>> does a lot for you. Generally though, as long as you avoid using "new"
>>>>>>>> operator and close any resources that you do not use you should be fine...
>>>>>>>> but a Profiler such as the ones mentioned by Nathan will tell you the whole
>>>>>>>> truth. YourKit is awesome and has a free trial, go ahead and test drive it.
>>>>>>>> I am pretty sure that you need a working jar (or compilable code that has a
>>>>>>>> main function in it) in order to profile it, although if you want to
>>>>>>>> profile your bolts and spouts is a bit tricker. Hopefully your algorithm
>>>>>>>> (or portions of it) can be put in a sample test program that is able to be
>>>>>>>> executed locally for you to profile it.
>>>>>>>>
>>>>>>>> Hope this helped. Regards,
>>>>>>>>
>>>>>>>> A.
>>>>>>>> ​
>>>>>>>>
>>>>>>>> On Thu, Mar 5, 2015 at 8:33 PM, Sa Li <sa...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Mar 5, 2015 at 10:26 AM, Andrew Xor <
>>>>>>>>> andreas.grammenos@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Unfortunately that is not fixed, it depends on the computations
>>>>>>>>>> and data-structures you have; in my case for example I use more than 2GB
>>>>>>>>>> since I need to keep a large matrix in memory... having said that, in most
>>>>>>>>>> cases it should be relatively easy to estimate how much memory you are
>>>>>>>>>> going to need and use that... or if that's not possible you can just
>>>>>>>>>> increase it and try the "set and see" approach. Check for memory leaks as
>>>>>>>>>> well... (unclosed resources and so on...!)
>>>>>>>>>>
>>>>>>>>>> Regards.
>>>>>>>>>>
>>>>>>>>>> ​A.​
>>>>>>>>>>
>>>>>>>>>> On Thu, Mar 5, 2015 at 8:21 PM, Sa Li <sa...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks, Nathan. How much is should be in general?
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Mar 5, 2015 at 10:15 AM, Nathan Leung <ncleung@gmail.com
>>>>>>>>>>> > wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Your worker is allocated a maximum of 768mb of heap. It's quite
>>>>>>>>>>>> possible that this is not enough. Try increasing Xmx i worker.childopts.
>>>>>>>>>>>> On Mar 5, 2015 1:10 PM, "Sa Li" <sa...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi, All
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have been running a trident topology on production server,
>>>>>>>>>>>>> code is like this:
>>>>>>>>>>>>>
>>>>>>>>>>>>> topology.newStream("spoutInit", kafkaSpout)
>>>>>>>>>>>>>                 .each(new Fields("str"),
>>>>>>>>>>>>>                         new JsonObjectParse(),
>>>>>>>>>>>>>                         new Fields("eventType", "event"))
>>>>>>>>>>>>>                 .parallelismHint(pHint)
>>>>>>>>>>>>>                 .groupBy(new Fields("event"))
>>>>>>>>>>>>>                 .persistentAggregate(PostgresqlState.newFactory(config), new Fields("eventType"), new EventUpdater(), new Fields("eventWord"))
>>>>>>>>>>>>>         ;
>>>>>>>>>>>>>
>>>>>>>>>>>>>         Config conf = new Config();
>>>>>>>>>>>>>         conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 1);
>>>>>>>>>>>>>
>>>>>>>>>>>>> Basically, it does simple things to get data from kafka, parse to different field and write into postgresDB. But in storm UI, I did see such error, "java.lang.OutOfMemoryError: GC overhead limit exceeded". It all happens in same worker of each node - 6703. I understand this is because by default the JVM is configured to throw this error if you are spending more than *98% of the total time in GC and after the GC less than 2% of the heap is recovered*.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I am not sure what is exact cause for memory leak, is it OK by simply increase the heap? Here is my storm.yaml:
>>>>>>>>>>>>>
>>>>>>>>>>>>> supervisor.slots.ports:
>>>>>>>>>>>>>
>>>>>>>>>>>>>      - 6700
>>>>>>>>>>>>>
>>>>>>>>>>>>>      - 6701
>>>>>>>>>>>>>
>>>>>>>>>>>>>      - 6702
>>>>>>>>>>>>>
>>>>>>>>>>>>>      - 6703
>>>>>>>>>>>>>
>>>>>>>>>>>>> nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>
>>>>>>>>>>>>> ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>
>>>>>>>>>>>>> supervisor.childopts: "-Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>
>>>>>>>>>>>>> worker.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Anyone has similar issues, and what will be the best way to
>>>>>>>>>>>>> overcome?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> thanks in advance
>>>>>>>>>>>>>
>>>>>>>>>>>>> AL
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: java.lang.OutOfMemoryError: GC overhead limit exceeded

Posted by Nathan Leung <nc...@gmail.com>.
I don't see anything glaring.  I would try increasing heap size.  It could
be that you're right on the threshold of what you've allocated and you just
need more memory.

On Thu, Mar 5, 2015 at 5:41 PM, Sa Li <sa...@gmail.com> wrote:

> Hi, All,
> ,
> I kind locate where the problem come from, in my running command, I will
> specify the clientid of TridentKafkaConfig, if I keep the clientid as the
> one I used before, it will cause GC error, otherwise I am completely OK.
> Here is the code:
>
> if (parameters.containsKey("clientid")) {
>     logger.info("topic=>" + parameters.get("clientid") + "/" + parameters.get("topic"));
>     spoutConf = new TridentKafkaConfig(zk, parameters.get("topic"), parameters.get("clientid"));
>
> Any idea about this error?
>
>
> Thanks
>
>
> AL
>
>
> On Thu, Mar 5, 2015 at 12:02 PM, Sa Li <sa...@gmail.com> wrote:
>
>> Sorry, continue last thread:
>>
>> 2015-03-05T11:48:08.418-0800 b.s.util [ERROR] Async loop died!
>> java.lang.RuntimeException: java.lang.RuntimeException: Remote address is
>> not reachable. We will close this client Netty-Client-complicated-laugh/
>> 10.100.98.103:6703
>>         at
>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>> ~[storm-core-0.9.3.jar:0.9.3]
>>         at
>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
>> ~[storm-core-0.9.3.jar:0.9.3]
>>         at
>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>> ~[storm-core-0.9.3.jar:0.9.3]
>>         at
>> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
>> ~[storm-core-0.9.3.jar:0.9.3]
>>         at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463)
>> ~[storm-core-0.9.3.jar:0.9.3]
>>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
>> Caused by: java.lang.RuntimeException: Remote address is not reachable.
>> We will close this client Netty-Client-complicated-laugh/
>> 10.100.98.103:6703
>>         at backtype.storm.messaging.netty.Client.connect(Client.java:171)
>> ~[storm-core-0.9.3.jar:0.9.3]
>>         at backtype.storm.messaging.netty.Client.send(Client.java:194)
>> ~[storm-core-0.9.3.jar:0.9.3]
>>         at
>> backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54)
>> ~[storm-core-0.9.3.jar:0.9.3]
>>         at
>> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730$fn__3731.invoke(worker.clj:330)
>> ~[storm-core-0.9.3.jar:0.9.3]
>>         at
>> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730.invoke(worker.clj:328)
>> ~[storm-core-0.9.3.jar:0.9.3]
>>         at
>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
>> ~[storm-core-0.9.3.jar:0.9.3]
>>         at
>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
>> ~[storm-core-0.9.3.jar:0.9.3]
>>         ... 6 common frames omitted
>> 2015-03-05T11:48:08.423-0800 b.s.util [ERROR] Halting process: ("Async
>> loop died!")
>> java.lang.RuntimeException: ("Async loop died!")
>>         at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325)
>> [storm-core-0.9.3.jar:0.9.3]
>>         at clojure.lang.RestFn.invoke(RestFn.java:423)
>> [clojure-1.5.1.jar:na]
>>         at
>> backtype.storm.disruptor$consume_loop_STAR_$fn__1458.invoke(disruptor.clj:92)
>> [storm-core-0.9.3.jar:0.9.3]
>>         at backtype.storm.util$async_loop$fn__464.invoke(util.clj:473)
>> [storm-core-0.9.3.jar:0.9.3]
>>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
>> 2015-03-05T11:48:08.425-0800 b.s.d.worker [INFO] Shutting down worker
>> eventsStreamerv1-48-1425499636 0673ece0-cea2-4185-9b3e-6c49ad585576 6703
>> 2015-03-05T11:48:08.426-0800 b.s.m.n.Client [INFO] Closing Netty Client
>> Netty-Client-beloved-judge/10.100.98.104:6703
>>
>> I doubt this is caused by my eventUpfater, which write data in batch
>>
>> static class EventUpdater implements ReducerAggregator<List<String>> {
>>
>>             @Override
>>             public List<String> init(){
>>                      return null;
>>             }
>>
>>             @Override
>>             public List<String> reduce(List<String> curr, TridentTuple tuple) {
>>                    List<String> updated = null ;
>>
>>                    if ( curr == null ) {
>>                                     String event = (String) tuple.getValue(1);
>>                                     System.out.println("===:" + event + ":");
>>                                     updated = Lists.newArrayList(event);
>>                    } else {
>>                                     System.out.println("===+" +  tuple + ":");
>>                                     updated = curr ;
>>                    }
>> //              System.out.println("(())");
>>               return updated ;
>>             }
>>         }
>>
>> How do you think
>>
>> THanks
>>
>> On Thu, Mar 5, 2015 at 11:57 AM, Sa Li <sa...@gmail.com> wrote:
>>
>>> Thank you very much for the reply, here is error I saw in production
>>> server worker-6703.log,
>>>
>>>
>>> On Thu, Mar 5, 2015 at 11:31 AM, Nathan Leung <nc...@gmail.com> wrote:
>>>
>>>> Yeah, then in this case maybe you can install JDK / Yourkit in the
>>>> remote machines and run the tools over X or something.  I'm assuming this
>>>> is a development cluster (not live / production) and that installing
>>>> debugging tools and running remote UIs etc is not a problem.  :)
>>>>
>>>> On Thu, Mar 5, 2015 at 1:52 PM, Andrew Xor <andreas.grammenos@gmail.com
>>>> > wrote:
>>>>
>>>>> Nathan I think that if he wants to profile a bolt per se that runs in
>>>>> a worker that resides in a different cluster node than the one the
>>>>> profiling tool runs he won't be able to attach the process since it resides
>>>>> in a different physical machine, me thinks (well, now that I think of it
>>>>> better it can be done... via remote debugging but that's just a pain in the
>>>>> ***).
>>>>>
>>>>> Regards,
>>>>>
>>>>> A.
>>>>>
>>>>> On Thu, Mar 5, 2015 at 8:46 PM, Nathan Leung <nc...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> You don't need to change your code. As Andrew mentioned you can get a
>>>>>> lot of mileage by profiling your logic in a standalone program. For
>>>>>> jvisualvm, you can just run your program (a loop that runs for a long time
>>>>>> is best) then attach to the running process with jvisualvm.  It's pretty
>>>>>> straightforward to use and you can also find good guides with a Google
>>>>>> search.
>>>>>> On Mar 5, 2015 1:43 PM, "Andrew Xor" <an...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> ​
>>>>>>> Well...  detecting memory leaks in Java is a bit tricky as Java does
>>>>>>> a lot for you. Generally though, as long as you avoid using "new" operator
>>>>>>> and close any resources that you do not use you should be fine... but a
>>>>>>> Profiler such as the ones mentioned by Nathan will tell you the whole
>>>>>>> truth. YourKit is awesome and has a free trial, go ahead and test drive it.
>>>>>>> I am pretty sure that you need a working jar (or compilable code that has a
>>>>>>> main function in it) in order to profile it, although if you want to
>>>>>>> profile your bolts and spouts is a bit tricker. Hopefully your algorithm
>>>>>>> (or portions of it) can be put in a sample test program that is able to be
>>>>>>> executed locally for you to profile it.
>>>>>>>
>>>>>>> Hope this helped. Regards,
>>>>>>>
>>>>>>> A.
>>>>>>> ​
>>>>>>>
>>>>>>> On Thu, Mar 5, 2015 at 8:33 PM, Sa Li <sa...@gmail.com> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Mar 5, 2015 at 10:26 AM, Andrew Xor <
>>>>>>>> andreas.grammenos@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Unfortunately that is not fixed, it depends on the computations
>>>>>>>>> and data-structures you have; in my case for example I use more than 2GB
>>>>>>>>> since I need to keep a large matrix in memory... having said that, in most
>>>>>>>>> cases it should be relatively easy to estimate how much memory you are
>>>>>>>>> going to need and use that... or if that's not possible you can just
>>>>>>>>> increase it and try the "set and see" approach. Check for memory leaks as
>>>>>>>>> well... (unclosed resources and so on...!)
>>>>>>>>>
>>>>>>>>> Regards.
>>>>>>>>>
>>>>>>>>> ​A.​
>>>>>>>>>
>>>>>>>>> On Thu, Mar 5, 2015 at 8:21 PM, Sa Li <sa...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks, Nathan. How much is should be in general?
>>>>>>>>>>
>>>>>>>>>> On Thu, Mar 5, 2015 at 10:15 AM, Nathan Leung <nc...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Your worker is allocated a maximum of 768mb of heap. It's quite
>>>>>>>>>>> possible that this is not enough. Try increasing Xmx i worker.childopts.
>>>>>>>>>>> On Mar 5, 2015 1:10 PM, "Sa Li" <sa...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi, All
>>>>>>>>>>>>
>>>>>>>>>>>> I have been running a trident topology on production server,
>>>>>>>>>>>> code is like this:
>>>>>>>>>>>>
>>>>>>>>>>>> topology.newStream("spoutInit", kafkaSpout)
>>>>>>>>>>>>                 .each(new Fields("str"),
>>>>>>>>>>>>                         new JsonObjectParse(),
>>>>>>>>>>>>                         new Fields("eventType", "event"))
>>>>>>>>>>>>                 .parallelismHint(pHint)
>>>>>>>>>>>>                 .groupBy(new Fields("event"))
>>>>>>>>>>>>                 .persistentAggregate(PostgresqlState.newFactory(config), new Fields("eventType"), new EventUpdater(), new Fields("eventWord"))
>>>>>>>>>>>>         ;
>>>>>>>>>>>>
>>>>>>>>>>>>         Config conf = new Config();
>>>>>>>>>>>>         conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 1);
>>>>>>>>>>>>
>>>>>>>>>>>> Basically, it does simple things to get data from kafka, parse to different field and write into postgresDB. But in storm UI, I did see such error, "java.lang.OutOfMemoryError: GC overhead limit exceeded". It all happens in same worker of each node - 6703. I understand this is because by default the JVM is configured to throw this error if you are spending more than *98% of the total time in GC and after the GC less than 2% of the heap is recovered*.
>>>>>>>>>>>>
>>>>>>>>>>>> I am not sure what is exact cause for memory leak, is it OK by simply increase the heap? Here is my storm.yaml:
>>>>>>>>>>>>
>>>>>>>>>>>> supervisor.slots.ports:
>>>>>>>>>>>>
>>>>>>>>>>>>      - 6700
>>>>>>>>>>>>
>>>>>>>>>>>>      - 6701
>>>>>>>>>>>>
>>>>>>>>>>>>      - 6702
>>>>>>>>>>>>
>>>>>>>>>>>>      - 6703
>>>>>>>>>>>>
>>>>>>>>>>>> nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>
>>>>>>>>>>>> ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>
>>>>>>>>>>>> supervisor.childopts: "-Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>
>>>>>>>>>>>> worker.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Anyone has similar issues, and what will be the best way to
>>>>>>>>>>>> overcome?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> thanks in advance
>>>>>>>>>>>>
>>>>>>>>>>>> AL
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: java.lang.OutOfMemoryError: GC overhead limit exceeded

Posted by Sa Li <sa...@gmail.com>.
Hi, All,
,
I kind locate where the problem come from, in my running command, I will
specify the clientid of TridentKafkaConfig, if I keep the clientid as the
one I used before, it will cause GC error, otherwise I am completely OK.
Here is the code:

if (parameters.containsKey("clientid")) {
    logger.info("topic=>" + parameters.get("clientid") + "/" +
parameters.get("topic"));
    spoutConf = new TridentKafkaConfig(zk, parameters.get("topic"),
parameters.get("clientid"));

Any idea about this error?


Thanks


AL


On Thu, Mar 5, 2015 at 12:02 PM, Sa Li <sa...@gmail.com> wrote:

> Sorry, continue last thread:
>
> 2015-03-05T11:48:08.418-0800 b.s.util [ERROR] Async loop died!
> java.lang.RuntimeException: java.lang.RuntimeException: Remote address is
> not reachable. We will close this client Netty-Client-complicated-laugh/
> 10.100.98.103:6703
>         at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
> Caused by: java.lang.RuntimeException: Remote address is not reachable. We
> will close this client Netty-Client-complicated-laugh/10.100.98.103:6703
>         at backtype.storm.messaging.netty.Client.connect(Client.java:171)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at backtype.storm.messaging.netty.Client.send(Client.java:194)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730$fn__3731.invoke(worker.clj:330)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730.invoke(worker.clj:328)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
> ~[storm-core-0.9.3.jar:0.9.3]
>         at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
> ~[storm-core-0.9.3.jar:0.9.3]
>         ... 6 common frames omitted
> 2015-03-05T11:48:08.423-0800 b.s.util [ERROR] Halting process: ("Async
> loop died!")
> java.lang.RuntimeException: ("Async loop died!")
>         at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325)
> [storm-core-0.9.3.jar:0.9.3]
>         at clojure.lang.RestFn.invoke(RestFn.java:423)
> [clojure-1.5.1.jar:na]
>         at
> backtype.storm.disruptor$consume_loop_STAR_$fn__1458.invoke(disruptor.clj:92)
> [storm-core-0.9.3.jar:0.9.3]
>         at backtype.storm.util$async_loop$fn__464.invoke(util.clj:473)
> [storm-core-0.9.3.jar:0.9.3]
>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
> 2015-03-05T11:48:08.425-0800 b.s.d.worker [INFO] Shutting down worker
> eventsStreamerv1-48-1425499636 0673ece0-cea2-4185-9b3e-6c49ad585576 6703
> 2015-03-05T11:48:08.426-0800 b.s.m.n.Client [INFO] Closing Netty Client
> Netty-Client-beloved-judge/10.100.98.104:6703
>
> I doubt this is caused by my eventUpfater, which write data in batch
>
> static class EventUpdater implements ReducerAggregator<List<String>> {
>
>             @Override
>             public List<String> init(){
>                      return null;
>             }
>
>             @Override
>             public List<String> reduce(List<String> curr, TridentTuple tuple) {
>                    List<String> updated = null ;
>
>                    if ( curr == null ) {
>                                     String event = (String) tuple.getValue(1);
>                                     System.out.println("===:" + event + ":");
>                                     updated = Lists.newArrayList(event);
>                    } else {
>                                     System.out.println("===+" +  tuple + ":");
>                                     updated = curr ;
>                    }
> //              System.out.println("(())");
>               return updated ;
>             }
>         }
>
> How do you think
>
> THanks
>
> On Thu, Mar 5, 2015 at 11:57 AM, Sa Li <sa...@gmail.com> wrote:
>
>> Thank you very much for the reply, here is error I saw in production
>> server worker-6703.log,
>>
>>
>> On Thu, Mar 5, 2015 at 11:31 AM, Nathan Leung <nc...@gmail.com> wrote:
>>
>>> Yeah, then in this case maybe you can install JDK / Yourkit in the
>>> remote machines and run the tools over X or something.  I'm assuming this
>>> is a development cluster (not live / production) and that installing
>>> debugging tools and running remote UIs etc is not a problem.  :)
>>>
>>> On Thu, Mar 5, 2015 at 1:52 PM, Andrew Xor <an...@gmail.com>
>>> wrote:
>>>
>>>> Nathan I think that if he wants to profile a bolt per se that runs in a
>>>> worker that resides in a different cluster node than the one the profiling
>>>> tool runs he won't be able to attach the process since it resides in a
>>>> different physical machine, me thinks (well, now that I think of it better
>>>> it can be done... via remote debugging but that's just a pain in the ***).
>>>>
>>>> Regards,
>>>>
>>>> A.
>>>>
>>>> On Thu, Mar 5, 2015 at 8:46 PM, Nathan Leung <nc...@gmail.com> wrote:
>>>>
>>>>> You don't need to change your code. As Andrew mentioned you can get a
>>>>> lot of mileage by profiling your logic in a standalone program. For
>>>>> jvisualvm, you can just run your program (a loop that runs for a long time
>>>>> is best) then attach to the running process with jvisualvm.  It's pretty
>>>>> straightforward to use and you can also find good guides with a Google
>>>>> search.
>>>>> On Mar 5, 2015 1:43 PM, "Andrew Xor" <an...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> ​
>>>>>> Well...  detecting memory leaks in Java is a bit tricky as Java does
>>>>>> a lot for you. Generally though, as long as you avoid using "new" operator
>>>>>> and close any resources that you do not use you should be fine... but a
>>>>>> Profiler such as the ones mentioned by Nathan will tell you the whole
>>>>>> truth. YourKit is awesome and has a free trial, go ahead and test drive it.
>>>>>> I am pretty sure that you need a working jar (or compilable code that has a
>>>>>> main function in it) in order to profile it, although if you want to
>>>>>> profile your bolts and spouts is a bit tricker. Hopefully your algorithm
>>>>>> (or portions of it) can be put in a sample test program that is able to be
>>>>>> executed locally for you to profile it.
>>>>>>
>>>>>> Hope this helped. Regards,
>>>>>>
>>>>>> A.
>>>>>> ​
>>>>>>
>>>>>> On Thu, Mar 5, 2015 at 8:33 PM, Sa Li <sa...@gmail.com> wrote:
>>>>>>
>>>>>>>
>>>>>>> On Thu, Mar 5, 2015 at 10:26 AM, Andrew Xor <
>>>>>>> andreas.grammenos@gmail.com> wrote:
>>>>>>>
>>>>>>>> Unfortunately that is not fixed, it depends on the computations and
>>>>>>>> data-structures you have; in my case for example I use more than 2GB since
>>>>>>>> I need to keep a large matrix in memory... having said that, in most cases
>>>>>>>> it should be relatively easy to estimate how much memory you are going to
>>>>>>>> need and use that... or if that's not possible you can just increase it and
>>>>>>>> try the "set and see" approach. Check for memory leaks as well... (unclosed
>>>>>>>> resources and so on...!)
>>>>>>>>
>>>>>>>> Regards.
>>>>>>>>
>>>>>>>> ​A.​
>>>>>>>>
>>>>>>>> On Thu, Mar 5, 2015 at 8:21 PM, Sa Li <sa...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Thanks, Nathan. How much is should be in general?
>>>>>>>>>
>>>>>>>>> On Thu, Mar 5, 2015 at 10:15 AM, Nathan Leung <nc...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Your worker is allocated a maximum of 768mb of heap. It's quite
>>>>>>>>>> possible that this is not enough. Try increasing Xmx i worker.childopts.
>>>>>>>>>> On Mar 5, 2015 1:10 PM, "Sa Li" <sa...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi, All
>>>>>>>>>>>
>>>>>>>>>>> I have been running a trident topology on production server,
>>>>>>>>>>> code is like this:
>>>>>>>>>>>
>>>>>>>>>>> topology.newStream("spoutInit", kafkaSpout)
>>>>>>>>>>>                 .each(new Fields("str"),
>>>>>>>>>>>                         new JsonObjectParse(),
>>>>>>>>>>>                         new Fields("eventType", "event"))
>>>>>>>>>>>                 .parallelismHint(pHint)
>>>>>>>>>>>                 .groupBy(new Fields("event"))
>>>>>>>>>>>                 .persistentAggregate(PostgresqlState.newFactory(config), new Fields("eventType"), new EventUpdater(), new Fields("eventWord"))
>>>>>>>>>>>         ;
>>>>>>>>>>>
>>>>>>>>>>>         Config conf = new Config();
>>>>>>>>>>>         conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 1);
>>>>>>>>>>>
>>>>>>>>>>> Basically, it does simple things to get data from kafka, parse to different field and write into postgresDB. But in storm UI, I did see such error, "java.lang.OutOfMemoryError: GC overhead limit exceeded". It all happens in same worker of each node - 6703. I understand this is because by default the JVM is configured to throw this error if you are spending more than *98% of the total time in GC and after the GC less than 2% of the heap is recovered*.
>>>>>>>>>>>
>>>>>>>>>>> I am not sure what is exact cause for memory leak, is it OK by simply increase the heap? Here is my storm.yaml:
>>>>>>>>>>>
>>>>>>>>>>> supervisor.slots.ports:
>>>>>>>>>>>
>>>>>>>>>>>      - 6700
>>>>>>>>>>>
>>>>>>>>>>>      - 6701
>>>>>>>>>>>
>>>>>>>>>>>      - 6702
>>>>>>>>>>>
>>>>>>>>>>>      - 6703
>>>>>>>>>>>
>>>>>>>>>>> nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>
>>>>>>>>>>> ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>
>>>>>>>>>>> supervisor.childopts: "-Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>
>>>>>>>>>>> worker.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Anyone has similar issues, and what will be the best way to
>>>>>>>>>>> overcome?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> thanks in advance
>>>>>>>>>>>
>>>>>>>>>>> AL
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>
>>>
>>
>

Re: java.lang.OutOfMemoryError: GC overhead limit exceeded

Posted by Sa Li <sa...@gmail.com>.
Sorry, continue last thread:

2015-03-05T11:48:08.418-0800 b.s.util [ERROR] Async loop died!
java.lang.RuntimeException: java.lang.RuntimeException: Remote address is
not reachable. We will close this client Netty-Client-complicated-laugh/
10.100.98.103:6703
        at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
~[storm-core-0.9.3.jar:0.9.3]
        at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
~[storm-core-0.9.3.jar:0.9.3]
        at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
~[storm-core-0.9.3.jar:0.9.3]
        at
backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
~[storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463)
~[storm-core-0.9.3.jar:0.9.3]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
Caused by: java.lang.RuntimeException: Remote address is not reachable. We
will close this client Netty-Client-complicated-laugh/10.100.98.103:6703
        at backtype.storm.messaging.netty.Client.connect(Client.java:171)
~[storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.messaging.netty.Client.send(Client.java:194)
~[storm-core-0.9.3.jar:0.9.3]
        at
backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54)
~[storm-core-0.9.3.jar:0.9.3]
        at
backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730$fn__3731.invoke(worker.clj:330)
~[storm-core-0.9.3.jar:0.9.3]
        at
backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__3730.invoke(worker.clj:328)
~[storm-core-0.9.3.jar:0.9.3]
        at
backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
~[storm-core-0.9.3.jar:0.9.3]
        at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
~[storm-core-0.9.3.jar:0.9.3]
        ... 6 common frames omitted
2015-03-05T11:48:08.423-0800 b.s.util [ERROR] Halting process: ("Async loop
died!")
java.lang.RuntimeException: ("Async loop died!")
        at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325)
[storm-core-0.9.3.jar:0.9.3]
        at clojure.lang.RestFn.invoke(RestFn.java:423)
[clojure-1.5.1.jar:na]
        at
backtype.storm.disruptor$consume_loop_STAR_$fn__1458.invoke(disruptor.clj:92)
[storm-core-0.9.3.jar:0.9.3]
        at backtype.storm.util$async_loop$fn__464.invoke(util.clj:473)
[storm-core-0.9.3.jar:0.9.3]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
2015-03-05T11:48:08.425-0800 b.s.d.worker [INFO] Shutting down worker
eventsStreamerv1-48-1425499636 0673ece0-cea2-4185-9b3e-6c49ad585576 6703
2015-03-05T11:48:08.426-0800 b.s.m.n.Client [INFO] Closing Netty Client
Netty-Client-beloved-judge/10.100.98.104:6703

I doubt this is caused by my eventUpfater, which write data in batch

static class EventUpdater implements ReducerAggregator<List<String>> {

            @Override
            public List<String> init(){
                     return null;
            }

            @Override
            public List<String> reduce(List<String> curr, TridentTuple tuple) {
                   List<String> updated = null ;

                   if ( curr == null ) {
                                    String event = (String) tuple.getValue(1);
                                    System.out.println("===:" + event + ":");
                                    updated = Lists.newArrayList(event);
                   } else {
                                    System.out.println("===+" +  tuple + ":");
                                    updated = curr ;
                   }
//              System.out.println("(())");
              return updated ;
            }
        }

How do you think

THanks

On Thu, Mar 5, 2015 at 11:57 AM, Sa Li <sa...@gmail.com> wrote:

> Thank you very much for the reply, here is error I saw in production
> server worker-6703.log,
>
>
> On Thu, Mar 5, 2015 at 11:31 AM, Nathan Leung <nc...@gmail.com> wrote:
>
>> Yeah, then in this case maybe you can install JDK / Yourkit in the remote
>> machines and run the tools over X or something.  I'm assuming this is a
>> development cluster (not live / production) and that installing debugging
>> tools and running remote UIs etc is not a problem.  :)
>>
>> On Thu, Mar 5, 2015 at 1:52 PM, Andrew Xor <an...@gmail.com>
>> wrote:
>>
>>> Nathan I think that if he wants to profile a bolt per se that runs in a
>>> worker that resides in a different cluster node than the one the profiling
>>> tool runs he won't be able to attach the process since it resides in a
>>> different physical machine, me thinks (well, now that I think of it better
>>> it can be done... via remote debugging but that's just a pain in the ***).
>>>
>>> Regards,
>>>
>>> A.
>>>
>>> On Thu, Mar 5, 2015 at 8:46 PM, Nathan Leung <nc...@gmail.com> wrote:
>>>
>>>> You don't need to change your code. As Andrew mentioned you can get a
>>>> lot of mileage by profiling your logic in a standalone program. For
>>>> jvisualvm, you can just run your program (a loop that runs for a long time
>>>> is best) then attach to the running process with jvisualvm.  It's pretty
>>>> straightforward to use and you can also find good guides with a Google
>>>> search.
>>>> On Mar 5, 2015 1:43 PM, "Andrew Xor" <an...@gmail.com>
>>>> wrote:
>>>>
>>>>> ​
>>>>> Well...  detecting memory leaks in Java is a bit tricky as Java does a
>>>>> lot for you. Generally though, as long as you avoid using "new" operator
>>>>> and close any resources that you do not use you should be fine... but a
>>>>> Profiler such as the ones mentioned by Nathan will tell you the whole
>>>>> truth. YourKit is awesome and has a free trial, go ahead and test drive it.
>>>>> I am pretty sure that you need a working jar (or compilable code that has a
>>>>> main function in it) in order to profile it, although if you want to
>>>>> profile your bolts and spouts is a bit tricker. Hopefully your algorithm
>>>>> (or portions of it) can be put in a sample test program that is able to be
>>>>> executed locally for you to profile it.
>>>>>
>>>>> Hope this helped. Regards,
>>>>>
>>>>> A.
>>>>> ​
>>>>>
>>>>> On Thu, Mar 5, 2015 at 8:33 PM, Sa Li <sa...@gmail.com> wrote:
>>>>>
>>>>>>
>>>>>> On Thu, Mar 5, 2015 at 10:26 AM, Andrew Xor <
>>>>>> andreas.grammenos@gmail.com> wrote:
>>>>>>
>>>>>>> Unfortunately that is not fixed, it depends on the computations and
>>>>>>> data-structures you have; in my case for example I use more than 2GB since
>>>>>>> I need to keep a large matrix in memory... having said that, in most cases
>>>>>>> it should be relatively easy to estimate how much memory you are going to
>>>>>>> need and use that... or if that's not possible you can just increase it and
>>>>>>> try the "set and see" approach. Check for memory leaks as well... (unclosed
>>>>>>> resources and so on...!)
>>>>>>>
>>>>>>> Regards.
>>>>>>>
>>>>>>> ​A.​
>>>>>>>
>>>>>>> On Thu, Mar 5, 2015 at 8:21 PM, Sa Li <sa...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Thanks, Nathan. How much is should be in general?
>>>>>>>>
>>>>>>>> On Thu, Mar 5, 2015 at 10:15 AM, Nathan Leung <nc...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Your worker is allocated a maximum of 768mb of heap. It's quite
>>>>>>>>> possible that this is not enough. Try increasing Xmx i worker.childopts.
>>>>>>>>> On Mar 5, 2015 1:10 PM, "Sa Li" <sa...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi, All
>>>>>>>>>>
>>>>>>>>>> I have been running a trident topology on production server, code
>>>>>>>>>> is like this:
>>>>>>>>>>
>>>>>>>>>> topology.newStream("spoutInit", kafkaSpout)
>>>>>>>>>>                 .each(new Fields("str"),
>>>>>>>>>>                         new JsonObjectParse(),
>>>>>>>>>>                         new Fields("eventType", "event"))
>>>>>>>>>>                 .parallelismHint(pHint)
>>>>>>>>>>                 .groupBy(new Fields("event"))
>>>>>>>>>>                 .persistentAggregate(PostgresqlState.newFactory(config), new Fields("eventType"), new EventUpdater(), new Fields("eventWord"))
>>>>>>>>>>         ;
>>>>>>>>>>
>>>>>>>>>>         Config conf = new Config();
>>>>>>>>>>         conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 1);
>>>>>>>>>>
>>>>>>>>>> Basically, it does simple things to get data from kafka, parse to different field and write into postgresDB. But in storm UI, I did see such error, "java.lang.OutOfMemoryError: GC overhead limit exceeded". It all happens in same worker of each node - 6703. I understand this is because by default the JVM is configured to throw this error if you are spending more than *98% of the total time in GC and after the GC less than 2% of the heap is recovered*.
>>>>>>>>>>
>>>>>>>>>> I am not sure what is exact cause for memory leak, is it OK by simply increase the heap? Here is my storm.yaml:
>>>>>>>>>>
>>>>>>>>>> supervisor.slots.ports:
>>>>>>>>>>
>>>>>>>>>>      - 6700
>>>>>>>>>>
>>>>>>>>>>      - 6701
>>>>>>>>>>
>>>>>>>>>>      - 6702
>>>>>>>>>>
>>>>>>>>>>      - 6703
>>>>>>>>>>
>>>>>>>>>> nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>
>>>>>>>>>> ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>
>>>>>>>>>> supervisor.childopts: "-Djava.net.preferIPv4Stack=true"
>>>>>>>>>>
>>>>>>>>>> worker.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Anyone has similar issues, and what will be the best way to
>>>>>>>>>> overcome?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> thanks in advance
>>>>>>>>>>
>>>>>>>>>> AL
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>
>>
>

Re: java.lang.OutOfMemoryError: GC overhead limit exceeded

Posted by Sa Li <sa...@gmail.com>.
Thank you very much for the reply, here is error I saw in production server
worker-6703.log,


On Thu, Mar 5, 2015 at 11:31 AM, Nathan Leung <nc...@gmail.com> wrote:

> Yeah, then in this case maybe you can install JDK / Yourkit in the remote
> machines and run the tools over X or something.  I'm assuming this is a
> development cluster (not live / production) and that installing debugging
> tools and running remote UIs etc is not a problem.  :)
>
> On Thu, Mar 5, 2015 at 1:52 PM, Andrew Xor <an...@gmail.com>
> wrote:
>
>> Nathan I think that if he wants to profile a bolt per se that runs in a
>> worker that resides in a different cluster node than the one the profiling
>> tool runs he won't be able to attach the process since it resides in a
>> different physical machine, me thinks (well, now that I think of it better
>> it can be done... via remote debugging but that's just a pain in the ***).
>>
>> Regards,
>>
>> A.
>>
>> On Thu, Mar 5, 2015 at 8:46 PM, Nathan Leung <nc...@gmail.com> wrote:
>>
>>> You don't need to change your code. As Andrew mentioned you can get a
>>> lot of mileage by profiling your logic in a standalone program. For
>>> jvisualvm, you can just run your program (a loop that runs for a long time
>>> is best) then attach to the running process with jvisualvm.  It's pretty
>>> straightforward to use and you can also find good guides with a Google
>>> search.
>>> On Mar 5, 2015 1:43 PM, "Andrew Xor" <an...@gmail.com>
>>> wrote:
>>>
>>>> ​
>>>> Well...  detecting memory leaks in Java is a bit tricky as Java does a
>>>> lot for you. Generally though, as long as you avoid using "new" operator
>>>> and close any resources that you do not use you should be fine... but a
>>>> Profiler such as the ones mentioned by Nathan will tell you the whole
>>>> truth. YourKit is awesome and has a free trial, go ahead and test drive it.
>>>> I am pretty sure that you need a working jar (or compilable code that has a
>>>> main function in it) in order to profile it, although if you want to
>>>> profile your bolts and spouts is a bit tricker. Hopefully your algorithm
>>>> (or portions of it) can be put in a sample test program that is able to be
>>>> executed locally for you to profile it.
>>>>
>>>> Hope this helped. Regards,
>>>>
>>>> A.
>>>> ​
>>>>
>>>> On Thu, Mar 5, 2015 at 8:33 PM, Sa Li <sa...@gmail.com> wrote:
>>>>
>>>>>
>>>>> On Thu, Mar 5, 2015 at 10:26 AM, Andrew Xor <
>>>>> andreas.grammenos@gmail.com> wrote:
>>>>>
>>>>>> Unfortunately that is not fixed, it depends on the computations and
>>>>>> data-structures you have; in my case for example I use more than 2GB since
>>>>>> I need to keep a large matrix in memory... having said that, in most cases
>>>>>> it should be relatively easy to estimate how much memory you are going to
>>>>>> need and use that... or if that's not possible you can just increase it and
>>>>>> try the "set and see" approach. Check for memory leaks as well... (unclosed
>>>>>> resources and so on...!)
>>>>>>
>>>>>> Regards.
>>>>>>
>>>>>> ​A.​
>>>>>>
>>>>>> On Thu, Mar 5, 2015 at 8:21 PM, Sa Li <sa...@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks, Nathan. How much is should be in general?
>>>>>>>
>>>>>>> On Thu, Mar 5, 2015 at 10:15 AM, Nathan Leung <nc...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Your worker is allocated a maximum of 768mb of heap. It's quite
>>>>>>>> possible that this is not enough. Try increasing Xmx i worker.childopts.
>>>>>>>> On Mar 5, 2015 1:10 PM, "Sa Li" <sa...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi, All
>>>>>>>>>
>>>>>>>>> I have been running a trident topology on production server, code
>>>>>>>>> is like this:
>>>>>>>>>
>>>>>>>>> topology.newStream("spoutInit", kafkaSpout)
>>>>>>>>>                 .each(new Fields("str"),
>>>>>>>>>                         new JsonObjectParse(),
>>>>>>>>>                         new Fields("eventType", "event"))
>>>>>>>>>                 .parallelismHint(pHint)
>>>>>>>>>                 .groupBy(new Fields("event"))
>>>>>>>>>                 .persistentAggregate(PostgresqlState.newFactory(config), new Fields("eventType"), new EventUpdater(), new Fields("eventWord"))
>>>>>>>>>         ;
>>>>>>>>>
>>>>>>>>>         Config conf = new Config();
>>>>>>>>>         conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 1);
>>>>>>>>>
>>>>>>>>> Basically, it does simple things to get data from kafka, parse to different field and write into postgresDB. But in storm UI, I did see such error, "java.lang.OutOfMemoryError: GC overhead limit exceeded". It all happens in same worker of each node - 6703. I understand this is because by default the JVM is configured to throw this error if you are spending more than *98% of the total time in GC and after the GC less than 2% of the heap is recovered*.
>>>>>>>>>
>>>>>>>>> I am not sure what is exact cause for memory leak, is it OK by simply increase the heap? Here is my storm.yaml:
>>>>>>>>>
>>>>>>>>> supervisor.slots.ports:
>>>>>>>>>
>>>>>>>>>      - 6700
>>>>>>>>>
>>>>>>>>>      - 6701
>>>>>>>>>
>>>>>>>>>      - 6702
>>>>>>>>>
>>>>>>>>>      - 6703
>>>>>>>>>
>>>>>>>>> nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>
>>>>>>>>> ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>
>>>>>>>>> supervisor.childopts: "-Djava.net.preferIPv4Stack=true"
>>>>>>>>>
>>>>>>>>> worker.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Anyone has similar issues, and what will be the best way to
>>>>>>>>> overcome?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> thanks in advance
>>>>>>>>>
>>>>>>>>> AL
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>
>

Re: java.lang.OutOfMemoryError: GC overhead limit exceeded

Posted by Nathan Leung <nc...@gmail.com>.
Yeah, then in this case maybe you can install JDK / Yourkit in the remote
machines and run the tools over X or something.  I'm assuming this is a
development cluster (not live / production) and that installing debugging
tools and running remote UIs etc is not a problem.  :)

On Thu, Mar 5, 2015 at 1:52 PM, Andrew Xor <an...@gmail.com>
wrote:

> Nathan I think that if he wants to profile a bolt per se that runs in a
> worker that resides in a different cluster node than the one the profiling
> tool runs he won't be able to attach the process since it resides in a
> different physical machine, me thinks (well, now that I think of it better
> it can be done... via remote debugging but that's just a pain in the ***).
>
> Regards,
>
> A.
>
> On Thu, Mar 5, 2015 at 8:46 PM, Nathan Leung <nc...@gmail.com> wrote:
>
>> You don't need to change your code. As Andrew mentioned you can get a lot
>> of mileage by profiling your logic in a standalone program. For jvisualvm,
>> you can just run your program (a loop that runs for a long time is best)
>> then attach to the running process with jvisualvm.  It's pretty
>> straightforward to use and you can also find good guides with a Google
>> search.
>> On Mar 5, 2015 1:43 PM, "Andrew Xor" <an...@gmail.com> wrote:
>>
>>> ​
>>> Well...  detecting memory leaks in Java is a bit tricky as Java does a
>>> lot for you. Generally though, as long as you avoid using "new" operator
>>> and close any resources that you do not use you should be fine... but a
>>> Profiler such as the ones mentioned by Nathan will tell you the whole
>>> truth. YourKit is awesome and has a free trial, go ahead and test drive it.
>>> I am pretty sure that you need a working jar (or compilable code that has a
>>> main function in it) in order to profile it, although if you want to
>>> profile your bolts and spouts is a bit tricker. Hopefully your algorithm
>>> (or portions of it) can be put in a sample test program that is able to be
>>> executed locally for you to profile it.
>>>
>>> Hope this helped. Regards,
>>>
>>> A.
>>> ​
>>>
>>> On Thu, Mar 5, 2015 at 8:33 PM, Sa Li <sa...@gmail.com> wrote:
>>>
>>>>
>>>> On Thu, Mar 5, 2015 at 10:26 AM, Andrew Xor <
>>>> andreas.grammenos@gmail.com> wrote:
>>>>
>>>>> Unfortunately that is not fixed, it depends on the computations and
>>>>> data-structures you have; in my case for example I use more than 2GB since
>>>>> I need to keep a large matrix in memory... having said that, in most cases
>>>>> it should be relatively easy to estimate how much memory you are going to
>>>>> need and use that... or if that's not possible you can just increase it and
>>>>> try the "set and see" approach. Check for memory leaks as well... (unclosed
>>>>> resources and so on...!)
>>>>>
>>>>> Regards.
>>>>>
>>>>> ​A.​
>>>>>
>>>>> On Thu, Mar 5, 2015 at 8:21 PM, Sa Li <sa...@gmail.com> wrote:
>>>>>
>>>>>> Thanks, Nathan. How much is should be in general?
>>>>>>
>>>>>> On Thu, Mar 5, 2015 at 10:15 AM, Nathan Leung <nc...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Your worker is allocated a maximum of 768mb of heap. It's quite
>>>>>>> possible that this is not enough. Try increasing Xmx i worker.childopts.
>>>>>>> On Mar 5, 2015 1:10 PM, "Sa Li" <sa...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi, All
>>>>>>>>
>>>>>>>> I have been running a trident topology on production server, code
>>>>>>>> is like this:
>>>>>>>>
>>>>>>>> topology.newStream("spoutInit", kafkaSpout)
>>>>>>>>                 .each(new Fields("str"),
>>>>>>>>                         new JsonObjectParse(),
>>>>>>>>                         new Fields("eventType", "event"))
>>>>>>>>                 .parallelismHint(pHint)
>>>>>>>>                 .groupBy(new Fields("event"))
>>>>>>>>                 .persistentAggregate(PostgresqlState.newFactory(config), new Fields("eventType"), new EventUpdater(), new Fields("eventWord"))
>>>>>>>>         ;
>>>>>>>>
>>>>>>>>         Config conf = new Config();
>>>>>>>>         conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 1);
>>>>>>>>
>>>>>>>> Basically, it does simple things to get data from kafka, parse to different field and write into postgresDB. But in storm UI, I did see such error, "java.lang.OutOfMemoryError: GC overhead limit exceeded". It all happens in same worker of each node - 6703. I understand this is because by default the JVM is configured to throw this error if you are spending more than *98% of the total time in GC and after the GC less than 2% of the heap is recovered*.
>>>>>>>>
>>>>>>>> I am not sure what is exact cause for memory leak, is it OK by simply increase the heap? Here is my storm.yaml:
>>>>>>>>
>>>>>>>> supervisor.slots.ports:
>>>>>>>>
>>>>>>>>      - 6700
>>>>>>>>
>>>>>>>>      - 6701
>>>>>>>>
>>>>>>>>      - 6702
>>>>>>>>
>>>>>>>>      - 6703
>>>>>>>>
>>>>>>>> nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true"
>>>>>>>>
>>>>>>>> ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>>>
>>>>>>>> supervisor.childopts: "-Djava.net.preferIPv4Stack=true"
>>>>>>>>
>>>>>>>> worker.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>>>
>>>>>>>>
>>>>>>>> Anyone has similar issues, and what will be the best way to
>>>>>>>> overcome?
>>>>>>>>
>>>>>>>>
>>>>>>>> thanks in advance
>>>>>>>>
>>>>>>>> AL
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>>
>

Re: java.lang.OutOfMemoryError: GC overhead limit exceeded

Posted by Andrew Xor <an...@gmail.com>.
Nathan I think that if he wants to profile a bolt per se that runs in a
worker that resides in a different cluster node than the one the profiling
tool runs he won't be able to attach the process since it resides in a
different physical machine, me thinks (well, now that I think of it better
it can be done... via remote debugging but that's just a pain in the ***).

Regards,

A.

On Thu, Mar 5, 2015 at 8:46 PM, Nathan Leung <nc...@gmail.com> wrote:

> You don't need to change your code. As Andrew mentioned you can get a lot
> of mileage by profiling your logic in a standalone program. For jvisualvm,
> you can just run your program (a loop that runs for a long time is best)
> then attach to the running process with jvisualvm.  It's pretty
> straightforward to use and you can also find good guides with a Google
> search.
> On Mar 5, 2015 1:43 PM, "Andrew Xor" <an...@gmail.com> wrote:
>
>> ​
>> Well...  detecting memory leaks in Java is a bit tricky as Java does a
>> lot for you. Generally though, as long as you avoid using "new" operator
>> and close any resources that you do not use you should be fine... but a
>> Profiler such as the ones mentioned by Nathan will tell you the whole
>> truth. YourKit is awesome and has a free trial, go ahead and test drive it.
>> I am pretty sure that you need a working jar (or compilable code that has a
>> main function in it) in order to profile it, although if you want to
>> profile your bolts and spouts is a bit tricker. Hopefully your algorithm
>> (or portions of it) can be put in a sample test program that is able to be
>> executed locally for you to profile it.
>>
>> Hope this helped. Regards,
>>
>> A.
>> ​
>>
>> On Thu, Mar 5, 2015 at 8:33 PM, Sa Li <sa...@gmail.com> wrote:
>>
>>>
>>> On Thu, Mar 5, 2015 at 10:26 AM, Andrew Xor <andreas.grammenos@gmail.com
>>> > wrote:
>>>
>>>> Unfortunately that is not fixed, it depends on the computations and
>>>> data-structures you have; in my case for example I use more than 2GB since
>>>> I need to keep a large matrix in memory... having said that, in most cases
>>>> it should be relatively easy to estimate how much memory you are going to
>>>> need and use that... or if that's not possible you can just increase it and
>>>> try the "set and see" approach. Check for memory leaks as well... (unclosed
>>>> resources and so on...!)
>>>>
>>>> Regards.
>>>>
>>>> ​A.​
>>>>
>>>> On Thu, Mar 5, 2015 at 8:21 PM, Sa Li <sa...@gmail.com> wrote:
>>>>
>>>>> Thanks, Nathan. How much is should be in general?
>>>>>
>>>>> On Thu, Mar 5, 2015 at 10:15 AM, Nathan Leung <nc...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Your worker is allocated a maximum of 768mb of heap. It's quite
>>>>>> possible that this is not enough. Try increasing Xmx i worker.childopts.
>>>>>> On Mar 5, 2015 1:10 PM, "Sa Li" <sa...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi, All
>>>>>>>
>>>>>>> I have been running a trident topology on production server, code is
>>>>>>> like this:
>>>>>>>
>>>>>>> topology.newStream("spoutInit", kafkaSpout)
>>>>>>>                 .each(new Fields("str"),
>>>>>>>                         new JsonObjectParse(),
>>>>>>>                         new Fields("eventType", "event"))
>>>>>>>                 .parallelismHint(pHint)
>>>>>>>                 .groupBy(new Fields("event"))
>>>>>>>                 .persistentAggregate(PostgresqlState.newFactory(config), new Fields("eventType"), new EventUpdater(), new Fields("eventWord"))
>>>>>>>         ;
>>>>>>>
>>>>>>>         Config conf = new Config();
>>>>>>>         conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 1);
>>>>>>>
>>>>>>> Basically, it does simple things to get data from kafka, parse to different field and write into postgresDB. But in storm UI, I did see such error, "java.lang.OutOfMemoryError: GC overhead limit exceeded". It all happens in same worker of each node - 6703. I understand this is because by default the JVM is configured to throw this error if you are spending more than *98% of the total time in GC and after the GC less than 2% of the heap is recovered*.
>>>>>>>
>>>>>>> I am not sure what is exact cause for memory leak, is it OK by simply increase the heap? Here is my storm.yaml:
>>>>>>>
>>>>>>> supervisor.slots.ports:
>>>>>>>
>>>>>>>      - 6700
>>>>>>>
>>>>>>>      - 6701
>>>>>>>
>>>>>>>      - 6702
>>>>>>>
>>>>>>>      - 6703
>>>>>>>
>>>>>>> nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true"
>>>>>>>
>>>>>>> ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>>
>>>>>>> supervisor.childopts: "-Djava.net.preferIPv4Stack=true"
>>>>>>>
>>>>>>> worker.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>>
>>>>>>>
>>>>>>> Anyone has similar issues, and what will be the best way to overcome?
>>>>>>>
>>>>>>>
>>>>>>> thanks in advance
>>>>>>>
>>>>>>> AL
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>>
>>>

Re: java.lang.OutOfMemoryError: GC overhead limit exceeded

Posted by Nathan Leung <nc...@gmail.com>.
You don't need to change your code. As Andrew mentioned you can get a lot
of mileage by profiling your logic in a standalone program. For jvisualvm,
you can just run your program (a loop that runs for a long time is best)
then attach to the running process with jvisualvm.  It's pretty
straightforward to use and you can also find good guides with a Google
search.
On Mar 5, 2015 1:43 PM, "Andrew Xor" <an...@gmail.com> wrote:

> ​
> Well...  detecting memory leaks in Java is a bit tricky as Java does a lot
> for you. Generally though, as long as you avoid using "new" operator and
> close any resources that you do not use you should be fine... but a
> Profiler such as the ones mentioned by Nathan will tell you the whole
> truth. YourKit is awesome and has a free trial, go ahead and test drive it.
> I am pretty sure that you need a working jar (or compilable code that has a
> main function in it) in order to profile it, although if you want to
> profile your bolts and spouts is a bit tricker. Hopefully your algorithm
> (or portions of it) can be put in a sample test program that is able to be
> executed locally for you to profile it.
>
> Hope this helped. Regards,
>
> A.
> ​
>
> On Thu, Mar 5, 2015 at 8:33 PM, Sa Li <sa...@gmail.com> wrote:
>
>>
>> On Thu, Mar 5, 2015 at 10:26 AM, Andrew Xor <an...@gmail.com>
>> wrote:
>>
>>> Unfortunately that is not fixed, it depends on the computations and
>>> data-structures you have; in my case for example I use more than 2GB since
>>> I need to keep a large matrix in memory... having said that, in most cases
>>> it should be relatively easy to estimate how much memory you are going to
>>> need and use that... or if that's not possible you can just increase it and
>>> try the "set and see" approach. Check for memory leaks as well... (unclosed
>>> resources and so on...!)
>>>
>>> Regards.
>>>
>>> ​A.​
>>>
>>> On Thu, Mar 5, 2015 at 8:21 PM, Sa Li <sa...@gmail.com> wrote:
>>>
>>>> Thanks, Nathan. How much is should be in general?
>>>>
>>>> On Thu, Mar 5, 2015 at 10:15 AM, Nathan Leung <nc...@gmail.com>
>>>> wrote:
>>>>
>>>>> Your worker is allocated a maximum of 768mb of heap. It's quite
>>>>> possible that this is not enough. Try increasing Xmx i worker.childopts.
>>>>> On Mar 5, 2015 1:10 PM, "Sa Li" <sa...@gmail.com> wrote:
>>>>>
>>>>>> Hi, All
>>>>>>
>>>>>> I have been running a trident topology on production server, code is
>>>>>> like this:
>>>>>>
>>>>>> topology.newStream("spoutInit", kafkaSpout)
>>>>>>                 .each(new Fields("str"),
>>>>>>                         new JsonObjectParse(),
>>>>>>                         new Fields("eventType", "event"))
>>>>>>                 .parallelismHint(pHint)
>>>>>>                 .groupBy(new Fields("event"))
>>>>>>                 .persistentAggregate(PostgresqlState.newFactory(config), new Fields("eventType"), new EventUpdater(), new Fields("eventWord"))
>>>>>>         ;
>>>>>>
>>>>>>         Config conf = new Config();
>>>>>>         conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 1);
>>>>>>
>>>>>> Basically, it does simple things to get data from kafka, parse to different field and write into postgresDB. But in storm UI, I did see such error, "java.lang.OutOfMemoryError: GC overhead limit exceeded". It all happens in same worker of each node - 6703. I understand this is because by default the JVM is configured to throw this error if you are spending more than *98% of the total time in GC and after the GC less than 2% of the heap is recovered*.
>>>>>>
>>>>>> I am not sure what is exact cause for memory leak, is it OK by simply increase the heap? Here is my storm.yaml:
>>>>>>
>>>>>> supervisor.slots.ports:
>>>>>>
>>>>>>      - 6700
>>>>>>
>>>>>>      - 6701
>>>>>>
>>>>>>      - 6702
>>>>>>
>>>>>>      - 6703
>>>>>>
>>>>>> nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true"
>>>>>>
>>>>>> ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>
>>>>>> supervisor.childopts: "-Djava.net.preferIPv4Stack=true"
>>>>>>
>>>>>> worker.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>>
>>>>>>
>>>>>> Anyone has similar issues, and what will be the best way to overcome?
>>>>>>
>>>>>>
>>>>>> thanks in advance
>>>>>>
>>>>>> AL
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>
>>>
>>

Re: java.lang.OutOfMemoryError: GC overhead limit exceeded

Posted by Andrew Xor <an...@gmail.com>.
​
Well...  detecting memory leaks in Java is a bit tricky as Java does a lot
for you. Generally though, as long as you avoid using "new" operator and
close any resources that you do not use you should be fine... but a
Profiler such as the ones mentioned by Nathan will tell you the whole
truth. YourKit is awesome and has a free trial, go ahead and test drive it.
I am pretty sure that you need a working jar (or compilable code that has a
main function in it) in order to profile it, although if you want to
profile your bolts and spouts is a bit tricker. Hopefully your algorithm
(or portions of it) can be put in a sample test program that is able to be
executed locally for you to profile it.

Hope this helped. Regards,

A.
​

On Thu, Mar 5, 2015 at 8:33 PM, Sa Li <sa...@gmail.com> wrote:

>
> On Thu, Mar 5, 2015 at 10:26 AM, Andrew Xor <an...@gmail.com>
> wrote:
>
>> Unfortunately that is not fixed, it depends on the computations and
>> data-structures you have; in my case for example I use more than 2GB since
>> I need to keep a large matrix in memory... having said that, in most cases
>> it should be relatively easy to estimate how much memory you are going to
>> need and use that... or if that's not possible you can just increase it and
>> try the "set and see" approach. Check for memory leaks as well... (unclosed
>> resources and so on...!)
>>
>> Regards.
>>
>> ​A.​
>>
>> On Thu, Mar 5, 2015 at 8:21 PM, Sa Li <sa...@gmail.com> wrote:
>>
>>> Thanks, Nathan. How much is should be in general?
>>>
>>> On Thu, Mar 5, 2015 at 10:15 AM, Nathan Leung <nc...@gmail.com> wrote:
>>>
>>>> Your worker is allocated a maximum of 768mb of heap. It's quite
>>>> possible that this is not enough. Try increasing Xmx i worker.childopts.
>>>> On Mar 5, 2015 1:10 PM, "Sa Li" <sa...@gmail.com> wrote:
>>>>
>>>>> Hi, All
>>>>>
>>>>> I have been running a trident topology on production server, code is
>>>>> like this:
>>>>>
>>>>> topology.newStream("spoutInit", kafkaSpout)
>>>>>                 .each(new Fields("str"),
>>>>>                         new JsonObjectParse(),
>>>>>                         new Fields("eventType", "event"))
>>>>>                 .parallelismHint(pHint)
>>>>>                 .groupBy(new Fields("event"))
>>>>>                 .persistentAggregate(PostgresqlState.newFactory(config), new Fields("eventType"), new EventUpdater(), new Fields("eventWord"))
>>>>>         ;
>>>>>
>>>>>         Config conf = new Config();
>>>>>         conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 1);
>>>>>
>>>>> Basically, it does simple things to get data from kafka, parse to different field and write into postgresDB. But in storm UI, I did see such error, "java.lang.OutOfMemoryError: GC overhead limit exceeded". It all happens in same worker of each node - 6703. I understand this is because by default the JVM is configured to throw this error if you are spending more than *98% of the total time in GC and after the GC less than 2% of the heap is recovered*.
>>>>>
>>>>> I am not sure what is exact cause for memory leak, is it OK by simply increase the heap? Here is my storm.yaml:
>>>>>
>>>>> supervisor.slots.ports:
>>>>>
>>>>>      - 6700
>>>>>
>>>>>      - 6701
>>>>>
>>>>>      - 6702
>>>>>
>>>>>      - 6703
>>>>>
>>>>> nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true"
>>>>>
>>>>> ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>
>>>>> supervisor.childopts: "-Djava.net.preferIPv4Stack=true"
>>>>>
>>>>> worker.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>>
>>>>>
>>>>> Anyone has similar issues, and what will be the best way to overcome?
>>>>>
>>>>>
>>>>> thanks in advance
>>>>>
>>>>> AL
>>>>>
>>>>>
>>>>>
>>>>>
>>>
>>
>

Re: java.lang.OutOfMemoryError: GC overhead limit exceeded

Posted by Sa Li <sa...@gmail.com>.
Thanks for the answers, I know it is kind of naive, but I am still asking,
how to check memory leaks, specifically in my code. Also Nathan mentioned
to use yourkit and jvisualvm, I am not familiar with such tools, is there a
way to plug a snippet in the code, since I am using maven to compile my
code on dev server.

Thanks

On Thu, Mar 5, 2015 at 10:26 AM, Andrew Xor <an...@gmail.com>
wrote:

> Unfortunately that is not fixed, it depends on the computations and
> data-structures you have; in my case for example I use more than 2GB since
> I need to keep a large matrix in memory... having said that, in most cases
> it should be relatively easy to estimate how much memory you are going to
> need and use that... or if that's not possible you can just increase it and
> try the "set and see" approach. Check for memory leaks as well... (unclosed
> resources and so on...!)
>
> Regards.
>
> ​A.​
>
> On Thu, Mar 5, 2015 at 8:21 PM, Sa Li <sa...@gmail.com> wrote:
>
>> Thanks, Nathan. How much is should be in general?
>>
>> On Thu, Mar 5, 2015 at 10:15 AM, Nathan Leung <nc...@gmail.com> wrote:
>>
>>> Your worker is allocated a maximum of 768mb of heap. It's quite possible
>>> that this is not enough. Try increasing Xmx i worker.childopts.
>>> On Mar 5, 2015 1:10 PM, "Sa Li" <sa...@gmail.com> wrote:
>>>
>>>> Hi, All
>>>>
>>>> I have been running a trident topology on production server, code is
>>>> like this:
>>>>
>>>> topology.newStream("spoutInit", kafkaSpout)
>>>>                 .each(new Fields("str"),
>>>>                         new JsonObjectParse(),
>>>>                         new Fields("eventType", "event"))
>>>>                 .parallelismHint(pHint)
>>>>                 .groupBy(new Fields("event"))
>>>>                 .persistentAggregate(PostgresqlState.newFactory(config), new Fields("eventType"), new EventUpdater(), new Fields("eventWord"))
>>>>         ;
>>>>
>>>>         Config conf = new Config();
>>>>         conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 1);
>>>>
>>>> Basically, it does simple things to get data from kafka, parse to different field and write into postgresDB. But in storm UI, I did see such error, "java.lang.OutOfMemoryError: GC overhead limit exceeded". It all happens in same worker of each node - 6703. I understand this is because by default the JVM is configured to throw this error if you are spending more than *98% of the total time in GC and after the GC less than 2% of the heap is recovered*.
>>>>
>>>> I am not sure what is exact cause for memory leak, is it OK by simply increase the heap? Here is my storm.yaml:
>>>>
>>>> supervisor.slots.ports:
>>>>
>>>>      - 6700
>>>>
>>>>      - 6701
>>>>
>>>>      - 6702
>>>>
>>>>      - 6703
>>>>
>>>> nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true"
>>>>
>>>> ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>
>>>> supervisor.childopts: "-Djava.net.preferIPv4Stack=true"
>>>>
>>>> worker.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>>
>>>>
>>>> Anyone has similar issues, and what will be the best way to overcome?
>>>>
>>>>
>>>> thanks in advance
>>>>
>>>> AL
>>>>
>>>>
>>>>
>>>>
>>
>

Re: java.lang.OutOfMemoryError: GC overhead limit exceeded

Posted by Andrew Xor <an...@gmail.com>.
Unfortunately that is not fixed, it depends on the computations and
data-structures you have; in my case for example I use more than 2GB since
I need to keep a large matrix in memory... having said that, in most cases
it should be relatively easy to estimate how much memory you are going to
need and use that... or if that's not possible you can just increase it and
try the "set and see" approach. Check for memory leaks as well... (unclosed
resources and so on...!)

Regards.

​A.​

On Thu, Mar 5, 2015 at 8:21 PM, Sa Li <sa...@gmail.com> wrote:

> Thanks, Nathan. How much is should be in general?
>
> On Thu, Mar 5, 2015 at 10:15 AM, Nathan Leung <nc...@gmail.com> wrote:
>
>> Your worker is allocated a maximum of 768mb of heap. It's quite possible
>> that this is not enough. Try increasing Xmx i worker.childopts.
>> On Mar 5, 2015 1:10 PM, "Sa Li" <sa...@gmail.com> wrote:
>>
>>> Hi, All
>>>
>>> I have been running a trident topology on production server, code is
>>> like this:
>>>
>>> topology.newStream("spoutInit", kafkaSpout)
>>>                 .each(new Fields("str"),
>>>                         new JsonObjectParse(),
>>>                         new Fields("eventType", "event"))
>>>                 .parallelismHint(pHint)
>>>                 .groupBy(new Fields("event"))
>>>                 .persistentAggregate(PostgresqlState.newFactory(config), new Fields("eventType"), new EventUpdater(), new Fields("eventWord"))
>>>         ;
>>>
>>>         Config conf = new Config();
>>>         conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 1);
>>>
>>> Basically, it does simple things to get data from kafka, parse to different field and write into postgresDB. But in storm UI, I did see such error, "java.lang.OutOfMemoryError: GC overhead limit exceeded". It all happens in same worker of each node - 6703. I understand this is because by default the JVM is configured to throw this error if you are spending more than *98% of the total time in GC and after the GC less than 2% of the heap is recovered*.
>>>
>>> I am not sure what is exact cause for memory leak, is it OK by simply increase the heap? Here is my storm.yaml:
>>>
>>> supervisor.slots.ports:
>>>
>>>      - 6700
>>>
>>>      - 6701
>>>
>>>      - 6702
>>>
>>>      - 6703
>>>
>>> nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true"
>>>
>>> ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>
>>> supervisor.childopts: "-Djava.net.preferIPv4Stack=true"
>>>
>>> worker.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>>
>>>
>>> Anyone has similar issues, and what will be the best way to overcome?
>>>
>>>
>>> thanks in advance
>>>
>>> AL
>>>
>>>
>>>
>>>
>

Re: java.lang.OutOfMemoryError: GC overhead limit exceeded

Posted by Sa Li <sa...@gmail.com>.
Thanks, Nathan. How much is should be in general?

On Thu, Mar 5, 2015 at 10:15 AM, Nathan Leung <nc...@gmail.com> wrote:

> Your worker is allocated a maximum of 768mb of heap. It's quite possible
> that this is not enough. Try increasing Xmx i worker.childopts.
> On Mar 5, 2015 1:10 PM, "Sa Li" <sa...@gmail.com> wrote:
>
>> Hi, All
>>
>> I have been running a trident topology on production server, code is like
>> this:
>>
>> topology.newStream("spoutInit", kafkaSpout)
>>                 .each(new Fields("str"),
>>                         new JsonObjectParse(),
>>                         new Fields("eventType", "event"))
>>                 .parallelismHint(pHint)
>>                 .groupBy(new Fields("event"))
>>                 .persistentAggregate(PostgresqlState.newFactory(config), new Fields("eventType"), new EventUpdater(), new Fields("eventWord"))
>>         ;
>>
>>         Config conf = new Config();
>>         conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 1);
>>
>> Basically, it does simple things to get data from kafka, parse to different field and write into postgresDB. But in storm UI, I did see such error, "java.lang.OutOfMemoryError: GC overhead limit exceeded". It all happens in same worker of each node - 6703. I understand this is because by default the JVM is configured to throw this error if you are spending more than *98% of the total time in GC and after the GC less than 2% of the heap is recovered*.
>>
>> I am not sure what is exact cause for memory leak, is it OK by simply increase the heap? Here is my storm.yaml:
>>
>> supervisor.slots.ports:
>>
>>      - 6700
>>
>>      - 6701
>>
>>      - 6702
>>
>>      - 6703
>>
>> nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true"
>>
>> ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>
>> supervisor.childopts: "-Djava.net.preferIPv4Stack=true"
>>
>> worker.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>>
>>
>> Anyone has similar issues, and what will be the best way to overcome?
>>
>>
>> thanks in advance
>>
>> AL
>>
>>
>>
>>

Re: java.lang.OutOfMemoryError: GC overhead limit exceeded

Posted by Nathan Leung <nc...@gmail.com>.
Your worker is allocated a maximum of 768mb of heap. It's quite possible
that this is not enough. Try increasing Xmx i worker.childopts.
On Mar 5, 2015 1:10 PM, "Sa Li" <sa...@gmail.com> wrote:

> Hi, All
>
> I have been running a trident topology on production server, code is like
> this:
>
> topology.newStream("spoutInit", kafkaSpout)
>                 .each(new Fields("str"),
>                         new JsonObjectParse(),
>                         new Fields("eventType", "event"))
>                 .parallelismHint(pHint)
>                 .groupBy(new Fields("event"))
>                 .persistentAggregate(PostgresqlState.newFactory(config), new Fields("eventType"), new EventUpdater(), new Fields("eventWord"))
>         ;
>
>         Config conf = new Config();
>         conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 1);
>
> Basically, it does simple things to get data from kafka, parse to different field and write into postgresDB. But in storm UI, I did see such error, "java.lang.OutOfMemoryError: GC overhead limit exceeded". It all happens in same worker of each node - 6703. I understand this is because by default the JVM is configured to throw this error if you are spending more than *98% of the total time in GC and after the GC less than 2% of the heap is recovered*.
>
> I am not sure what is exact cause for memory leak, is it OK by simply increase the heap? Here is my storm.yaml:
>
> supervisor.slots.ports:
>
>      - 6700
>
>      - 6701
>
>      - 6702
>
>      - 6703
>
> nimbus.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true"
>
> ui.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>
> supervisor.childopts: "-Djava.net.preferIPv4Stack=true"
>
> worker.childopts: "-Xmx768m -Djava.net.preferIPv4Stack=true"
>
>
> Anyone has similar issues, and what will be the best way to overcome?
>
>
> thanks in advance
>
> AL
>
>
>
>