You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Adam Lewis <su...@gmail.com> on 2014/03/18 20:40:43 UTC

nimbus.thrift.max_buffer_size

Upon upgrading from 0.9.0.1 to 0.9.1-incubating, I'm exceeding the new
thrift max buffer size (nicely logged on the server side, although the
client just gets a broken pipe stack trace form thrift) with an approx 6 MB
message(!).  Increasing the configured limit solves the problem, but I
would have thought the 1MB default should be enough.

Does the storm submitter encode the entire topology as a single thrift
message?  I'm really surprised that the message is coming out so large, my
topology isn't exactly small, but it only has about 20 bolts...does anyone
have any suggestions on how to determine why the message is so large?  Is
this within the realm of what others have seen or am I doing something
wrong?

Thanks,
Adam

Re: nimbus.thrift.max_buffer_size

Posted by "P. Taylor Goetz" <pt...@gmail.com>.
Cool. I'm going back to the public list to share the knowledge.

If your trident topology is compiling down to 35 bolts, then it sound like you have a lot of partitioning operations. Is there any way you can reduce that? That's going to introduce a decent amount of network transfer.

And I would definitely try to figure out what's making the serialized topology so big. In that department, the prepare() method is your friend in terms of initializing state.

-Taylor

> On Mar 18, 2014, at 7:37 PM, Adam Lewis <gm...@adamlewis.com> wrote:
> 
> Sure, sharing with the list is a great idea.  I already checked the log excerpts I sent you to ensure they are clean of anything proprietary so all good there.  
> 
> I did a little more digging and it seems there is definitely something...interesting...happening in the compiling process.  My uncompiled topology is pretty small memory wise (according to the profiler), but once it gets built into the generated storm bolts, it becomes 35 bolts each of which serializes to about 200kb...I was struck by how small the variance is amongst the serialized sizes, as if something big-ish is getting duplicated into each bolt.  Some more experimentation revealed that part of the problem may be that I have multiple DRPC spouts in a single topology.  I noticed super-linear growth in serialized topology size for each additional DRPC spout I create.
> 
> I'm thinking I will break each DRPC spout into its own topology, which I've needed to do anyway since right now they seem to block each other from processing even though I don't need requests to separate DRPCs to be wholly ordered in the way I imagine trident would try to do.
> 
> 
>> On Tue, Mar 18, 2014 at 7:24 PM, P. Taylor Goetz <pt...@gmail.com> wrote:
>> Glad I could help.
>> 
>> Trident does a lot in terms of "compiling" down to storms primitive structures. The relevant classes are Stream and TridentTopology in the storm.trident package (if my memory serves me correctly, I'm not at a computer...).
>> 
>> It's not for the faint of heart, but once you start to wrap your head around it, it pretty cool.
>> 
>> Do you mind if I share this thread with the user group? Some of the information could be beneficial to others. At the very least I think we should document what you found... And the Apache line is always "If it didn't happen on the list, it didn't happen."
>> 
>> -Taylor
>> 
>>> On Mar 18, 2014, at 6:04 PM, Adam Lewis <gm...@adamlewis.com> wrote:
>>> 
>>> Hi Taylor,
>>> 
>>> The submitter isn't multithreaded or anything funky...it is basically a typical submitter from the docs that just happens to call submit multiple times.  
>>> 
>>> I did just check and the java-serialized topology and it is indeed the culprit (6662137 bytes, 17k larger than the thrift message).  I can now look for any obvious bugs in my topology component's non-transient state, but I'm wondering if there is a more systematic way? These topologies are all trident built, so the indirection that occurs between the trident DSL API and the final storm topology is a bit opaque to me.  The generated thrift classes make this difficult to penetrate from the other direction.  Do you have any suggestion on where I can look to better understand the overall building process?
>>> 
>>> Thanks a lot for your help.
>>> 
>>> Adam
>>> 
>>> 
>>>> On Tue, Mar 18, 2014 at 5:40 PM, P. Taylor Goetz <pt...@gmail.com> wrote:
>>>> Just to give you a little background… the thrift max buffer size was introduced to prevent a situation where a wayward connection (SSH, telnet, security port scanner, etc.) to the nimbus thrift port would cause nimbus to hang.
>>>> 
>>>> When you submit a topology, three things get sent over thrift:
>>>> 
>>>> 1. the topology jar file (big — so it gets uploaded in small chunks)
>>>> 2. the serialized Config object (typically a relatively small bit of JSON)
>>>> 3. the serialized Topology itself (this all depends…)
>>>> 
>>>> My initial guess is that #3 is potentially really big (as you mention). You could test this by serializing the topology to disk to see how big it is.
>>>> 
>>>> My second guess is that something abut your submitter app might be overflowing thrifts server-side buffer. Is it multi-threaded and submitting topos in parallel?
>>>> 
>>>> If you submit that topology with the storm jar command, do you still get the error?
>>>> 
>>>> - Taylor
>>>> 
>>>> 
>>>>> On Mar 18, 2014, at 5:17 PM, Adam Lewis <gm...@adamlewis.com> wrote:
>>>>> 
>>>>> I noticed that StormSubmitter has a code path for only doing a single upload of the jar file if it has "already uploaded to master" (StormSubmitter.java:142), but the only case where that will actually happen is if you submit multiple topologies during the lifetime of the VM started with the "storm jar" command (since "submittedJar" is just a private static field).  My main() class (passed to the "storm jar" command) simply delegates to a series of classes which build StormTopology instances, and then calls StormSubmitter.submitTopology(..) in a loop in turn with each of the topologies I've built.
>>>>> 
>>>>> I catch any exceptions inside the loop, so if some topology is already running, I log a "java.lang.RuntimeException: Topology with name `...` already exists on cluster" and continue on in the loop.  This effectively enforces that all my topologies are running, and if I want to redeploy one or more, I kill them in the storm UI and re-run my deployer program (via storm jar).
>>>>> 
>>>>> So...in effect, after running the program the first time and having three topologies deploy and one fail, subsequent runs would only try to deploy that fourth topology not already running.  One potentially important detail of my submitter program is that I reuse the storm Config object across multiple calls to submitTopology, as well as an internal configuration class built from the command line arguments used during topology building.
>>>>> 
>>>>> This approach has been working without any (apparent) ill effects on 0.9.0.1, and also seems to be working on 0.9.1 now that I've increased that buffer to accommodate this 6.6MB thrift message.  But how did that message get to be 6.6MB?  Does that message contain all the serialized bolts which are perhaps inadvertently pulling in something big?
>>>>> 
>>>>> 
>>>>>> On Tue, Mar 18, 2014 at 4:48 PM, P. Taylor Goetz <pt...@gmail.com> wrote:
>>>>>> What happens when you just use the `storm jar` command to submit the topologies separately (i.e. same jar, different topo class name)?
>>>>>> 
>>>>>> Can you give me more details on your submitter program and how it works? How are you “reusing” the uploaded jar file?
>>>>>> 
>>>>>> - Taylor
>>>>>> 
>>>>>>> On Mar 18, 2014, at 4:25 PM, Adam Lewis <gm...@adamlewis.com> wrote:
>>>>>>> 
>>>>>>> It isn't the jar file, but something about the topology itself; I have a submitter program that submits four topologies all from the same jar.  Upon submitting the first topology, the jar is uploaded and topology starts, then the submitter submits two more topologies whilst "reusing" the uploaded jar.  The broken pipe occurs when trying to submit the fourth (large) topology.  That is why I was assuming the large message was actually the encoded topology itself.  This is reproducible and the errors are as follows:
>>>>>>> 
>>>>>>> nimbus.log:
>>>>>>> 
>>>>>>>> 2014-03-18 18:16:39 o.a.t.s.TNonblockingServer [ERROR] Read a frame size of 6644632, which is bigger than the maximum allowable buffer size for ALL connections.
>>>>>>> 
>>>>>>> storm jar console:
>>>>>>> 
>>>>>>>>> 2321 [main] INFO  backtype.storm.StormSubmitter - Uploading topology jar /Users/adam/git/impl/impl-storm/target/impl-storm-0.0.1-SNAPSHOT.jar to assigned location: /mnt/storm/nimbus/inbox/stormjar-04acf27a-a6d1-4a9e-9231-9f4f5f30fd03.jar
>>>>>>>>> 97762 [main] INFO  backtype.storm.StormSubmitter - Successfully uploaded topology jar to assigned location: /mnt/storm/nimbus/inbox/stormjar-04acf27a-a6d1-4a9e-9231-9f4f5f30fd03.jar
>>>>>>>>> 97762 [main] INFO  backtype.storm.StormSubmitter - Submitting topology global__topo_forecastRuntime in distributed mode with conf {"topology.fall.back.on.java.serialization":false,"topology.workers":2,"drpc.servers":["10.118.57.229"],"topology.debug":false,"topology.kryo.register":[{"org.joda.time.DateTime":"de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer"},{"org.joda.time.Interval":"de.javakaffee.kryoserializers.jodatime.JodaIntervalSerializer"},"com.mycompany.data.Simulated","com.mycompany.data.SomeClass1","com.mycompany.ml.SomeClass2","com.mycompany.model.SomeClass3","com.mycompany.model.SomeClass4",{"com.mycompany.ml.SomeClass4":"com.esotericsoftware.kryo.serializers.DefaultSerializers$EnumSerializer"},{"java.math.BigDecimal":"com.esotericsoftware.kryo.serializers.DefaultSerializers$BigDecimalSerializer"},{"java.sql.Date":"de.javakaffee.kryoserializers.DateSerializer"},{"com.tdunning.math.stats.TDigest":"com.mycompany.trident.tdigest.TDigestSerializer"},{"java.lang.Class":"com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer"},{"java.util.UUID":"de.javakaffee.kryoserializers.UUIDSerializer"},{"com.google.common.collect.RegularImmutableList":"backtype.storm.serialization.SerializableSerializer"}],"topology.max.spout.pending":16,"topology.message.timeout.secs":900,"drpc.request.timeout.secs":45}
>>>>>>>>> java.lang.RuntimeException: org.apache.thrift7.transport.TTransportException: java.net.SocketException: Broken pipe
>>>>>>>>> 	at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:112)
>>>>>>>>> 	at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:58)
>>>>>>>>> 	at com.mycompany.runtime.DeployStormTopologies.main(DeployStormTopologies.java:92)
>>>>>>>>> Caused by: org.apache.thrift7.transport.TTransportException: java.net.SocketException: Broken pipe
>>>>>>>>> 	at org.apache.thrift7.transport.TIOStreamTransport.write(TIOStreamTransport.java:147)
>>>>>>>>> 	at org.apache.thrift7.transport.TFramedTransport.flush(TFramedTransport.java:157)
>>>>>>>>> 	at org.apache.thrift7.TServiceClient.sendBase(TServiceClient.java:65)
>>>>>>>>> 	at backtype.storm.generated.Nimbus$Client.send_submitTopology(Nimbus.java:156)
>>>>>>>>> 	at backtype.storm.generated.Nimbus$Client.submitTopology(Nimbus.java:145)
>>>>>>>>> 	at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:98)
>>>>>>>>> 	... 2 more
>>>>>>>>> Caused by: java.net.SocketException: Broken pipe
>>>>>>>>> 	at java.net.SocketOutputStream.socketWrite0(Native Method)
>>>>>>>>> 	at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
>>>>>>>>> 	at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
>>>>>>>>> 	at org.apache.thrift7.transport.TIOStreamTransport.write(TIOStreamTransport.java:145)
>>>>>>>>> 	... 7 more
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>>> On Tue, Mar 18, 2014 at 4:12 PM, P. Taylor Goetz <pt...@gmail.com> wrote:
>>>>>>>> It uploads the file in small (1024*5 bytes) chunks.
>>>>>>>> 
>>>>>>>> Does this happen every time (i.e. reproducible)? What is the size of your topology jar?
>>>>>>>> 
>>>>>>>> Can you post the server side message (I want to see the length it output).
>>>>>>>> 
>>>>>>>> - Taylor
>>>>>>>> 
>>>>>>>>> On Mar 18, 2014, at 3:40 PM, Adam Lewis <su...@gmail.com> wrote:
>>>>>>>>> 
>>>>>>>>> Upon upgrading from 0.9.0.1 to 0.9.1-incubating, I'm exceeding the new thrift max buffer size (nicely logged on the server side, although the client just gets a broken pipe stack trace form thrift) with an approx 6 MB message(!).  Increasing the configured limit solves the problem, but I would have thought the 1MB default should be enough.
>>>>>>>>> 
>>>>>>>>> Does the storm submitter encode the entire topology as a single thrift message?  I'm really surprised that the message is coming out so large, my topology isn't exactly small, but it only has about 20 bolts...does anyone have any suggestions on how to determine why the message is so large?  Is this within the realm of what others have seen or am I doing something wrong?
>>>>>>>>> 
>>>>>>>>> Thanks,
>>>>>>>>> Adam
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
> 

Re: nimbus.thrift.max_buffer_size

Posted by Adam Lewis <gm...@adamlewis.com>.
It isn't the jar file, but something about the topology itself; I have a
submitter program that submits four topologies all from the same jar.  Upon
submitting the first topology, the jar is uploaded and topology starts,
then the submitter submits two more topologies whilst "reusing" the
uploaded jar.  The broken pipe occurs when trying to submit the fourth
(large) topology.  That is why I was assuming the large message was
actually the encoded topology itself.  This is reproducible and the errors
are as follows:

nimbus.log:

2014-03-18 18:16:39 o.a.t.s.TNonblockingServer [ERROR] Read a frame size of
> 6644632, which is bigger than the maximum allowable buffer size for ALL
> connections.


storm jar console:

> 2321 [main] INFO  backtype.storm.StormSubmitter - Uploading topology jar
>> /Users/adam/git/impl/impl-storm/target/impl-storm-0.0.1-SNAPSHOT.jar to
>> assigned location:
>> /mnt/storm/nimbus/inbox/stormjar-04acf27a-a6d1-4a9e-9231-9f4f5f30fd03.jar
>
> 97762 [main] INFO  backtype.storm.StormSubmitter - Successfully uploaded
>> topology jar to assigned location:
>> /mnt/storm/nimbus/inbox/stormjar-04acf27a-a6d1-4a9e-9231-9f4f5f30fd03.jar
>
> 97762 [main] INFO  backtype.storm.StormSubmitter - Submitting topology
>> global__topo_forecastRuntime in distributed mode with conf
>> {"topology.fall.back.on.java.serialization":false,"topology.workers":2,"drpc.servers":["10.118.57.229"],"topology.debug":false,"topology.kryo.register":[{"org.joda.time.DateTime":"de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer"},{"org.joda.time.Interval":"de.javakaffee.kryoserializers.jodatime.JodaIntervalSerializer"},"com.mycompany.data.Simulated","com.mycompany.data.SomeClass1","com.mycompany.ml.SomeClass2","com.mycompany.model.SomeClass3","com.mycompany.model.SomeClass4",{"com.mycompany.ml.SomeClass4":"com.esotericsoftware.kryo.serializers.DefaultSerializers$EnumSerializer"},{"java.math.BigDecimal":"com.esotericsoftware.kryo.serializers.DefaultSerializers$BigDecimalSerializer"},{"java.sql.Date":"de.javakaffee.kryoserializers.DateSerializer"},{"com.tdunning.math.stats.TDigest":"com.mycompany.trident.tdigest.TDigestSerializer"},{"java.lang.Class":"com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer"},{"java.util.UUID":"de.javakaffee.kryoserializers.UUIDSerializer"},{"com.google.common.collect.RegularImmutableList":"backtype.storm.serialization.SerializableSerializer"}],"topology.max.spout.pending":16,"topology.message.timeout.secs":900,"drpc.request.timeout.secs":45}
>
> java.lang.RuntimeException:
>> org.apache.thrift7.transport.TTransportException: java.net.SocketException:
>> Broken pipe
>
>  at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:112)
>
>  at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:58)
>
>  at
>> com.mycompany.runtime.DeployStormTopologies.main(DeployStormTopologies.java:92)
>
> Caused by: org.apache.thrift7.transport.TTransportException:
>> java.net.SocketException: Broken pipe
>
>  at
>> org.apache.thrift7.transport.TIOStreamTransport.write(TIOStreamTransport.java:147)
>
>  at
>> org.apache.thrift7.transport.TFramedTransport.flush(TFramedTransport.java:157)
>
>  at org.apache.thrift7.TServiceClient.sendBase(TServiceClient.java:65)
>
>  at
>> backtype.storm.generated.Nimbus$Client.send_submitTopology(Nimbus.java:156)
>
>  at backtype.storm.generated.Nimbus$Client.submitTopology(Nimbus.java:145)
>
>  at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:98)
>
>  ... 2 more
>
> Caused by: java.net.SocketException: Broken pipe
>
>  at java.net.SocketOutputStream.socketWrite0(Native Method)
>
>  at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
>
>  at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
>
>  at
>> org.apache.thrift7.transport.TIOStreamTransport.write(TIOStreamTransport.java:145)
>
>  ... 7 more
>
>

On Tue, Mar 18, 2014 at 4:12 PM, P. Taylor Goetz <pt...@gmail.com> wrote:

> It uploads the file in small (1024*5 bytes) chunks.
>
> Does this happen every time (i.e. reproducible)? What is the size of your
> topology jar?
>
> Can you post the server side message (I want to see the length it output).
>
> - Taylor
>
> On Mar 18, 2014, at 3:40 PM, Adam Lewis <su...@gmail.com> wrote:
>
> Upon upgrading from 0.9.0.1 to 0.9.1-incubating, I'm exceeding the new
> thrift max buffer size (nicely logged on the server side, although the
> client just gets a broken pipe stack trace form thrift) with an approx 6 MB
> message(!).  Increasing the configured limit solves the problem, but I
> would have thought the 1MB default should be enough.
>
> Does the storm submitter encode the entire topology as a single thrift
> message?  I'm really surprised that the message is coming out so large, my
> topology isn't exactly small, but it only has about 20 bolts...does anyone
> have any suggestions on how to determine why the message is so large?  Is
> this within the realm of what others have seen or am I doing something
> wrong?
>
> Thanks,
> Adam
>
>
>

Re: nimbus.thrift.max_buffer_size

Posted by "P. Taylor Goetz" <pt...@gmail.com>.
It uploads the file in small (1024*5 bytes) chunks.

Does this happen every time (i.e. reproducible)? What is the size of your topology jar?

Can you post the server side message (I want to see the length it output).

- Taylor

On Mar 18, 2014, at 3:40 PM, Adam Lewis <su...@gmail.com> wrote:

> Upon upgrading from 0.9.0.1 to 0.9.1-incubating, I'm exceeding the new thrift max buffer size (nicely logged on the server side, although the client just gets a broken pipe stack trace form thrift) with an approx 6 MB message(!).  Increasing the configured limit solves the problem, but I would have thought the 1MB default should be enough.
> 
> Does the storm submitter encode the entire topology as a single thrift message?  I'm really surprised that the message is coming out so large, my topology isn't exactly small, but it only has about 20 bolts...does anyone have any suggestions on how to determine why the message is so large?  Is this within the realm of what others have seen or am I doing something wrong?
> 
> Thanks,
> Adam