You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by Sachin Pasalkar <Sa...@symantec.com> on 2015/08/22 16:19:30 UTC

Getting a big memory hit

Hi,

 We are reading whole file in memory around 5 MB, which is send through Kafaka to Storm. In next bolt, we have a bolt which performs the operation on file and sends out tuple to next bolt. After profiling we found that file (bytes of file) does not get garbage collected. So after further investigation we found that  backtype.storm.coordination.CoordinatedBolt.CoordinatedOutputCollector.emit(String, Collection<Tuple>, List<Object>) API gets the first object and use it for tracking :(. Can you confirm reason behind this? Is there any way we can send different unique id as first element in list or the unique id of tuple used as indicator.

However, for time being we have made changes in schema assigned to KafkaSpout, so that it will parse the file and send out list of values. Can you also explain why the list approach is used instead of map as we do declare the out fiels in getOutputFields() API

Thanks,
Sachin

Re: Getting a big memory hit

Posted by Bobby Evans <ev...@yahoo-inc.com.INVALID>.
I'm not sure what you mean by the concern team.  You can file a JIRA yourself a https://issues.apache.org/jira/ under the STORM project.  I am not convinced that this is a bug though.  It could just be a documentation issue, but I don't know the code well enough to be able to say what the correct way to use it is. - Bobby 


     On Monday, August 24, 2015 10:37 PM, Sachin Pasalkar <Sa...@symantec.com> wrote:
   

 Can you raise with concern team then? If you ask me if it's done in wrong way, let's get it fixed.

Thanks,
sachin

-----Original Message-----
From: Bobby Evans [mailto:evans@yahoo-inc.com.INVALID] 
Sent: Monday, August 24, 2015 9:40 PM
To: dev@storm.apache.org
Subject: Re: Getting a big memory hit

This is part of the reason why Trident is really preferred over the Batch API.  The first item in the tuple is intended to be a tracking id.  In trident they are able to hide all of this from you using the TridentTuple, but in the BatchAPI it does not seem to be hidden as well.  I am honestly not as familiar with the Batch API as I am with trident, I don't know how much more help I can be beyond that at this point.
 - Bobby 


    On Monday, August 24, 2015 9:54 AM, Sachin Pasalkar <Sa...@symantec.com> wrote:
  

 I don’t have a heap dump. Yes but I can point out to code where we see them being cached.

The below code is from void backtype<eclipse-javadoc:%E2%98%82=kafka-storm-css/%5C/Users%5C/sachin_pasalkar%5C/.m2%5C/repository%5C/org%5C/apache%5C/storm%5C/storm-core%5C/0.9.3%5C/storm-core-0.9.3.jar%3Cbacktype>.storm<eclipse-javadoc:%E2%98%82=kafka-storm-css/%5C/Users%5C/sachin_pasalkar%5C/.m2%5C/repository%5C/org%5C/apache%5C/storm%5C/storm-core%5C/0.9.3%5C/storm-core-0.9.3.jar%3Cbacktype.storm>.coordination<eclipse-javadoc:%E2%98%82=kafka-storm-css/%5C/Users%5C/sachin_pasalkar%5C/.m2%5C/repository%5C/org%5C/apache%5C/storm%5C/storm-core%5C/0.9.3%5C/storm-core-0.9.3.jar%3Cbacktype.storm.coordination>.CoordinatedBolt<eclipse-javadoc:%E2%98%82=kafka-storm-css/%5C/Users%5C/sachin_pasalkar%5C/.m2%5C/repository%5C/org%5C/apache%5C/storm%5C/storm-core%5C/0.9.3%5C/storm-core-0.9.3.jar%3Cbacktype.storm.coordination(CoordinatedBolt.class%E2%98%83CoordinatedBolt> class. If you see "Object id = tuple.getValue(0);” takes the 1st element from tuple instead of taking id of tuple. This id is then saved to _tracked hashhMap. This hashMap is timeCache. In our case the 0th element is files byte data. This gets stored in the _tracked map till tree of tuple doesn’t get complete. As we are processing huge data we run outofMemory issue.


 public void execute(Tuple tuple) {

        Object id = tuple.getValue(0);

        TrackingInfo track;

        TupleType type = getTupleType(tuple);

        synchronized(_tracked) {

            track = _tracked.get(id);

            if(track==null) {

                track = new TrackingInfo();

                if(_idStreamSpec==null) track.receivedId = true;

                _tracked.put(id, track);

            }

        }



        if(type==TupleType.ID) {

            synchronized(_tracked) {

                track.receivedId = true;

            }

            checkFinishId(tuple, type);

        } else if(type==TupleType.COORD) {

            int count = (Integer) tuple.getValue(1);

            synchronized(_tracked) {

                track.reportCount++;

                track.expectedTupleCount+=count;

            }

            checkFinishId(tuple, type);

        } else {

            synchronized(_tracked) {

                _delegate.execute(tuple);

            }

        }

    }


Let me know if you want more information from me :)


Thanks,

Sachin


From: Bobby Evans <ev...@yahoo-inc.com.INVALID>>
Reply-To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>, Bobby Evans <ev...@yahoo-inc.com>>
Date: Monday, 24 August 2015 6:50 pm
To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>
Subject: Re: Getting a big memory hit

Do you have a heap dump or something that shows exactly which data structure those Tuples are being cached in?  In most cases the tuples should just have a tuple id extracted from it so it can be sent to the acker.  Once it is extracted GC should happen.

- Bobby


    On Saturday, August 22, 2015 11:16 AM, Sachin Pasalkar <Sa...@symantec.com>> wrote:


Hi,

We are reading whole file in memory around 5 MB, which is send through Kafaka to Storm. In next bolt, we have a bolt which performs the operation on file and sends out tuple to next bolt. After profiling we found that file (bytes of file) does not get garbage collected. So after further investigation we found that  backtype.storm.coordination.CoordinatedBolt.CoordinatedOutputCollector.emit(String, Collection<Tuple>, List<Object>) API gets the first object and use it for tracking :(. Can you confirm reason behind this? Is there any way we can send different unique id as first element in list or the unique id of tuple used as indicator.

However, for time being we have made changes in schema assigned to KafkaSpout, so that it will parse the file and send out list of values. Can you also explain why the list approach is used instead of map as we do declare the out fiels in getOutputFields() API

Thanks,
Sachin



  


  

RE: Getting a big memory hit

Posted by Sachin Pasalkar <Sa...@symantec.com>.
Can you raise with concern team then? If you ask me if it's done in wrong way, let's get it fixed.

Thanks,
sachin

-----Original Message-----
From: Bobby Evans [mailto:evans@yahoo-inc.com.INVALID] 
Sent: Monday, August 24, 2015 9:40 PM
To: dev@storm.apache.org
Subject: Re: Getting a big memory hit

This is part of the reason why Trident is really preferred over the Batch API.  The first item in the tuple is intended to be a tracking id.  In trident they are able to hide all of this from you using the TridentTuple, but in the BatchAPI it does not seem to be hidden as well.  I am honestly not as familiar with the Batch API as I am with trident, I don't know how much more help I can be beyond that at this point.
 - Bobby 


     On Monday, August 24, 2015 9:54 AM, Sachin Pasalkar <Sa...@symantec.com> wrote:
   

 I don’t have a heap dump. Yes but I can point out to code where we see them being cached.

The below code is from void backtype<eclipse-javadoc:%E2%98%82=kafka-storm-css/%5C/Users%5C/sachin_pasalkar%5C/.m2%5C/repository%5C/org%5C/apache%5C/storm%5C/storm-core%5C/0.9.3%5C/storm-core-0.9.3.jar%3Cbacktype>.storm<eclipse-javadoc:%E2%98%82=kafka-storm-css/%5C/Users%5C/sachin_pasalkar%5C/.m2%5C/repository%5C/org%5C/apache%5C/storm%5C/storm-core%5C/0.9.3%5C/storm-core-0.9.3.jar%3Cbacktype.storm>.coordination<eclipse-javadoc:%E2%98%82=kafka-storm-css/%5C/Users%5C/sachin_pasalkar%5C/.m2%5C/repository%5C/org%5C/apache%5C/storm%5C/storm-core%5C/0.9.3%5C/storm-core-0.9.3.jar%3Cbacktype.storm.coordination>.CoordinatedBolt<eclipse-javadoc:%E2%98%82=kafka-storm-css/%5C/Users%5C/sachin_pasalkar%5C/.m2%5C/repository%5C/org%5C/apache%5C/storm%5C/storm-core%5C/0.9.3%5C/storm-core-0.9.3.jar%3Cbacktype.storm.coordination(CoordinatedBolt.class%E2%98%83CoordinatedBolt> class. If you see "Object id = tuple.getValue(0);” takes the 1st element from tuple instead of taking id of tuple. This id is then saved to _tracked hashhMap. This hashMap is timeCache. In our case the 0th element is files byte data. This gets stored in the _tracked map till tree of tuple doesn’t get complete. As we are processing huge data we run outofMemory issue.


 public void execute(Tuple tuple) {

        Object id = tuple.getValue(0);

        TrackingInfo track;

        TupleType type = getTupleType(tuple);

        synchronized(_tracked) {

            track = _tracked.get(id);

            if(track==null) {

                track = new TrackingInfo();

                if(_idStreamSpec==null) track.receivedId = true;

                _tracked.put(id, track);

            }

        }



        if(type==TupleType.ID) {

            synchronized(_tracked) {

                track.receivedId = true;

            }

            checkFinishId(tuple, type);

        } else if(type==TupleType.COORD) {

            int count = (Integer) tuple.getValue(1);

            synchronized(_tracked) {

                track.reportCount++;

                track.expectedTupleCount+=count;

            }

            checkFinishId(tuple, type);

        } else {

            synchronized(_tracked) {

                _delegate.execute(tuple);

            }

        }

    }


Let me know if you want more information from me :)


Thanks,

Sachin


From: Bobby Evans <ev...@yahoo-inc.com.INVALID>>
Reply-To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>, Bobby Evans <ev...@yahoo-inc.com>>
Date: Monday, 24 August 2015 6:50 pm
To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>
Subject: Re: Getting a big memory hit

Do you have a heap dump or something that shows exactly which data structure those Tuples are being cached in?  In most cases the tuples should just have a tuple id extracted from it so it can be sent to the acker.  Once it is extracted GC should happen.

- Bobby


    On Saturday, August 22, 2015 11:16 AM, Sachin Pasalkar <Sa...@symantec.com>> wrote:


Hi,

We are reading whole file in memory around 5 MB, which is send through Kafaka to Storm. In next bolt, we have a bolt which performs the operation on file and sends out tuple to next bolt. After profiling we found that file (bytes of file) does not get garbage collected. So after further investigation we found that  backtype.storm.coordination.CoordinatedBolt.CoordinatedOutputCollector.emit(String, Collection<Tuple>, List<Object>) API gets the first object and use it for tracking :(. Can you confirm reason behind this? Is there any way we can send different unique id as first element in list or the unique id of tuple used as indicator.

However, for time being we have made changes in schema assigned to KafkaSpout, so that it will parse the file and send out list of values. Can you also explain why the list approach is used instead of map as we do declare the out fiels in getOutputFields() API

Thanks,
Sachin



  

Re: Getting a big memory hit

Posted by Bobby Evans <ev...@yahoo-inc.com.INVALID>.
This is part of the reason why Trident is really preferred over the Batch API.  The first item in the tuple is intended to be a tracking id.  In trident they are able to hide all of this from you using the TridentTuple, but in the BatchAPI it does not seem to be hidden as well.  I am honestly not as familiar with the Batch API as I am with trident, I don't know how much more help I can be beyond that at this point.
 - Bobby 


     On Monday, August 24, 2015 9:54 AM, Sachin Pasalkar <Sa...@symantec.com> wrote:
   

 I don’t have a heap dump. Yes but I can point out to code where we see them being cached.

The below code is from void backtype<eclipse-javadoc:%E2%98%82=kafka-storm-css/%5C/Users%5C/sachin_pasalkar%5C/.m2%5C/repository%5C/org%5C/apache%5C/storm%5C/storm-core%5C/0.9.3%5C/storm-core-0.9.3.jar%3Cbacktype>.storm<eclipse-javadoc:%E2%98%82=kafka-storm-css/%5C/Users%5C/sachin_pasalkar%5C/.m2%5C/repository%5C/org%5C/apache%5C/storm%5C/storm-core%5C/0.9.3%5C/storm-core-0.9.3.jar%3Cbacktype.storm>.coordination<eclipse-javadoc:%E2%98%82=kafka-storm-css/%5C/Users%5C/sachin_pasalkar%5C/.m2%5C/repository%5C/org%5C/apache%5C/storm%5C/storm-core%5C/0.9.3%5C/storm-core-0.9.3.jar%3Cbacktype.storm.coordination>.CoordinatedBolt<eclipse-javadoc:%E2%98%82=kafka-storm-css/%5C/Users%5C/sachin_pasalkar%5C/.m2%5C/repository%5C/org%5C/apache%5C/storm%5C/storm-core%5C/0.9.3%5C/storm-core-0.9.3.jar%3Cbacktype.storm.coordination(CoordinatedBolt.class%E2%98%83CoordinatedBolt> class. If you see "Object id = tuple.getValue(0);” takes the 1st element from tuple instead of taking id of tuple. This id is then saved to _tracked hashhMap. This hashMap is timeCache. In our case the 0th element is files byte data. This gets stored in the _tracked map till tree of tuple doesn’t get complete. As we are processing huge data we run outofMemory issue.


 public void execute(Tuple tuple) {

        Object id = tuple.getValue(0);

        TrackingInfo track;

        TupleType type = getTupleType(tuple);

        synchronized(_tracked) {

            track = _tracked.get(id);

            if(track==null) {

                track = new TrackingInfo();

                if(_idStreamSpec==null) track.receivedId = true;

                _tracked.put(id, track);

            }

        }



        if(type==TupleType.ID) {

            synchronized(_tracked) {

                track.receivedId = true;

            }

            checkFinishId(tuple, type);

        } else if(type==TupleType.COORD) {

            int count = (Integer) tuple.getValue(1);

            synchronized(_tracked) {

                track.reportCount++;

                track.expectedTupleCount+=count;

            }

            checkFinishId(tuple, type);

        } else {

            synchronized(_tracked) {

                _delegate.execute(tuple);

            }

        }

    }


Let me know if you want more information from me :)


Thanks,

Sachin


From: Bobby Evans <ev...@yahoo-inc.com.INVALID>>
Reply-To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>, Bobby Evans <ev...@yahoo-inc.com>>
Date: Monday, 24 August 2015 6:50 pm
To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>
Subject: Re: Getting a big memory hit

Do you have a heap dump or something that shows exactly which data structure those Tuples are being cached in?  In most cases the tuples should just have a tuple id extracted from it so it can be sent to the acker.  Once it is extracted GC should happen.

- Bobby


    On Saturday, August 22, 2015 11:16 AM, Sachin Pasalkar <Sa...@symantec.com>> wrote:


Hi,

We are reading whole file in memory around 5 MB, which is send through Kafaka to Storm. In next bolt, we have a bolt which performs the operation on file and sends out tuple to next bolt. After profiling we found that file (bytes of file) does not get garbage collected. So after further investigation we found that  backtype.storm.coordination.CoordinatedBolt.CoordinatedOutputCollector.emit(String, Collection<Tuple>, List<Object>) API gets the first object and use it for tracking :(. Can you confirm reason behind this? Is there any way we can send different unique id as first element in list or the unique id of tuple used as indicator.

However, for time being we have made changes in schema assigned to KafkaSpout, so that it will parse the file and send out list of values. Can you also explain why the list approach is used instead of map as we do declare the out fiels in getOutputFields() API

Thanks,
Sachin



  

Re: Getting a big memory hit

Posted by Sachin Pasalkar <Sa...@symantec.com>.
I don’t have a heap dump. Yes but I can point out to code where we see them being cached.

The below code is from void backtype<eclipse-javadoc:%E2%98%82=kafka-storm-css/%5C/Users%5C/sachin_pasalkar%5C/.m2%5C/repository%5C/org%5C/apache%5C/storm%5C/storm-core%5C/0.9.3%5C/storm-core-0.9.3.jar%3Cbacktype>.storm<eclipse-javadoc:%E2%98%82=kafka-storm-css/%5C/Users%5C/sachin_pasalkar%5C/.m2%5C/repository%5C/org%5C/apache%5C/storm%5C/storm-core%5C/0.9.3%5C/storm-core-0.9.3.jar%3Cbacktype.storm>.coordination<eclipse-javadoc:%E2%98%82=kafka-storm-css/%5C/Users%5C/sachin_pasalkar%5C/.m2%5C/repository%5C/org%5C/apache%5C/storm%5C/storm-core%5C/0.9.3%5C/storm-core-0.9.3.jar%3Cbacktype.storm.coordination>.CoordinatedBolt<eclipse-javadoc:%E2%98%82=kafka-storm-css/%5C/Users%5C/sachin_pasalkar%5C/.m2%5C/repository%5C/org%5C/apache%5C/storm%5C/storm-core%5C/0.9.3%5C/storm-core-0.9.3.jar%3Cbacktype.storm.coordination(CoordinatedBolt.class%E2%98%83CoordinatedBolt> class. If you see "Object id = tuple.getValue(0);” takes the 1st element from tuple instead of taking id of tuple. This id is then saved to _tracked hashhMap. This hashMap is timeCache. In our case the 0th element is files byte data. This gets stored in the _tracked map till tree of tuple doesn’t get complete. As we are processing huge data we run outofMemory issue.


 public void execute(Tuple tuple) {

        Object id = tuple.getValue(0);

        TrackingInfo track;

        TupleType type = getTupleType(tuple);

        synchronized(_tracked) {

            track = _tracked.get(id);

            if(track==null) {

                track = new TrackingInfo();

                if(_idStreamSpec==null) track.receivedId = true;

                _tracked.put(id, track);

            }

        }



        if(type==TupleType.ID) {

            synchronized(_tracked) {

                track.receivedId = true;

            }

            checkFinishId(tuple, type);

        } else if(type==TupleType.COORD) {

            int count = (Integer) tuple.getValue(1);

            synchronized(_tracked) {

                track.reportCount++;

                track.expectedTupleCount+=count;

            }

            checkFinishId(tuple, type);

        } else {

            synchronized(_tracked) {

                _delegate.execute(tuple);

            }

        }

    }


Let me know if you want more information from me :)


Thanks,

Sachin


From: Bobby Evans <ev...@yahoo-inc.com.INVALID>>
Reply-To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>, Bobby Evans <ev...@yahoo-inc.com>>
Date: Monday, 24 August 2015 6:50 pm
To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>
Subject: Re: Getting a big memory hit

Do you have a heap dump or something that shows exactly which data structure those Tuples are being cached in?  In most cases the tuples should just have a tuple id extracted from it so it can be sent to the acker.  Once it is extracted GC should happen.

- Bobby


     On Saturday, August 22, 2015 11:16 AM, Sachin Pasalkar <Sa...@symantec.com>> wrote:


Hi,

We are reading whole file in memory around 5 MB, which is send through Kafaka to Storm. In next bolt, we have a bolt which performs the operation on file and sends out tuple to next bolt. After profiling we found that file (bytes of file) does not get garbage collected. So after further investigation we found that  backtype.storm.coordination.CoordinatedBolt.CoordinatedOutputCollector.emit(String, Collection<Tuple>, List<Object>) API gets the first object and use it for tracking :(. Can you confirm reason behind this? Is there any way we can send different unique id as first element in list or the unique id of tuple used as indicator.

However, for time being we have made changes in schema assigned to KafkaSpout, so that it will parse the file and send out list of values. Can you also explain why the list approach is used instead of map as we do declare the out fiels in getOutputFields() API

Thanks,
Sachin



Re: Getting a big memory hit

Posted by Bobby Evans <ev...@yahoo-inc.com.INVALID>.
Do you have a heap dump or something that shows exactly which data structure those Tuples are being cached in?  In most cases the tuples should just have a tuple id extracted from it so it can be sent to the acker.  Once it is extracted GC should happen.

- Bobby 


     On Saturday, August 22, 2015 11:16 AM, Sachin Pasalkar <Sa...@symantec.com> wrote:
   

 Hi,

 We are reading whole file in memory around 5 MB, which is send through Kafaka to Storm. In next bolt, we have a bolt which performs the operation on file and sends out tuple to next bolt. After profiling we found that file (bytes of file) does not get garbage collected. So after further investigation we found that  backtype.storm.coordination.CoordinatedBolt.CoordinatedOutputCollector.emit(String, Collection<Tuple>, List<Object>) API gets the first object and use it for tracking :(. Can you confirm reason behind this? Is there any way we can send different unique id as first element in list or the unique id of tuple used as indicator.

However, for time being we have made changes in schema assigned to KafkaSpout, so that it will parse the file and send out list of values. Can you also explain why the list approach is used instead of map as we do declare the out fiels in getOutputFields() API

Thanks,
Sachin