You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2014/05/02 18:08:17 UTC

[jira] [Commented] (STORM-113) TridentUtils.thriftDeserialize() is used unsafely when running in local mode.

    [ https://issues.apache.org/jira/browse/STORM-113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13987849#comment-13987849 ] 

ASF GitHub Bot commented on STORM-113:
--------------------------------------

Github user revans2 commented on the pull request:

    https://github.com/apache/incubator-storm/pull/26#issuecomment-42048346
  
    +1 the code changes look good to me.


> TridentUtils.thriftDeserialize() is used unsafely when running in local mode.
> -----------------------------------------------------------------------------
>
>                 Key: STORM-113
>                 URL: https://issues.apache.org/jira/browse/STORM-113
>             Project: Apache Storm (Incubating)
>          Issue Type: Bug
>            Reporter: James Xu
>
> https://github.com/nathanmarz/storm/issues/500
> When running a trident topology with multiple workers in local mode, each worker runs as a thread in a single jvm. These worker threads call TridentUtils.thriftDeserialize() concurrently when loading the (serialized) topology. However, this method is not threadsafe. In particular, it uses a static reference to a TDeserializer, which in turn uses an instance of TMemoryInputTransport, which is stateful. The end result is sporadic and nondeterministic Thrift exceptions when starting the workers.
> This is not difficult to fix, but I'm not sure which approach the committers want to take. I made the deserializer (and serializer) ThreadLocal vars.
> ----------
> juhoautio: Thanks for explaining the problem! This issue made my test fail* occasionally when trying to submit a topology to a local cluster.
> For now I set number of workers & max parallelism to 1, but eventually I'd like to be able to enable some parallelism. I'm not sure if both parameters are significant, but here's a code extract for rerefence:
> conf = new Config();
> conf.setNumWorkers(1);
> conf.setMaxTaskParallelism(1);
> // ...
> cluster = new LocalCluster();
> cluster.submitTopology(.., conf, ..);
> With this configuration the problem hasn't happened any more.
> *) Stacktrace for the error (happens occasionally when number of workers is set to 2) :
> 2629 [Thread-9] ERROR backtype.storm.daemon.worker  - Error on initialization of server mk-worker
> java.lang.RuntimeException: org.apache.thrift7.TException: Negative length: -72553216
>     at storm.trident.util.TridentUtils.thriftDeserialize(TridentUtils.java:111)
>     at storm.trident.planner.PartitionNode.readObject(PartitionNode.java:33)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:606)
>     at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1891)
>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>     at java.util.HashMap.readObject(HashMap.java:1183)
>     at sun.reflect.GeneratedMethodAccessor29.invoke(Unknown Source)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:606)
>     at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1891)
>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
>     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>     at backtype.storm.utils.Utils.deserialize(Utils.java:64)
>     at backtype.storm.utils.Utils.getSetComponentObject(Utils.java:199)
>     at backtype.storm.daemon.task$get_task_object.invoke(task.clj:56)
>     at backtype.storm.daemon.task$mk_task_data$fn__3766.invoke(task.clj:158)
>     at backtype.storm.util$assoc_apply_self.invoke(util.clj:731)
>     at backtype.storm.daemon.task$mk_task_data.invoke(task.clj:152)
>     at backtype.storm.daemon.task$mk_task.invoke(task.clj:163)
>     at backtype.storm.daemon.executor$mk_executor$fn__3922.invoke(executor.clj:267)
>     at clojure.core$map$fn__4087.invoke(core.clj:2432)
>     at clojure.lang.LazySeq.sval(LazySeq.java:42)
>     at clojure.lang.LazySeq.seq(LazySeq.java:60)
>     at clojure.lang.RT.seq(RT.java:473)
>     at clojure.core$seq.invoke(core.clj:133)
>     at clojure.core.protocols$seq_reduce.invoke(protocols.clj:30)
>     at clojure.core.protocols$fn__5875.invoke(protocols.clj:54)
>     at clojure.core.protocols$fn__5828$G__5823__5841.invoke(protocols.clj:13)
>     at clojure.core$reduce.invoke(core.clj:6030)
>     at clojure.core$into.invoke(core.clj:6077)
>     at backtype.storm.daemon.executor$mk_executor.invoke(executor.clj:267)
>     at backtype.storm.daemon.worker$fn__4348$exec_fn__1228__auto____4349$iter__4354__4358$fn__4359.invoke(worker.clj:354)
>     at clojure.lang.LazySeq.sval(LazySeq.java:42)
>     at clojure.lang.LazySeq.seq(LazySeq.java:60)
>     at clojure.lang.Cons.next(Cons.java:39)
>     at clojure.lang.LazySeq.next(LazySeq.java:92)
>     at clojure.lang.RT.next(RT.java:587)
>     at clojure.core$next.invoke(core.clj:64)
>     at clojure.core$dorun.invoke(core.clj:2726)
>     at clojure.core$doall.invoke(core.clj:2741)
>     at backtype.storm.daemon.worker$fn__4348$exec_fn__1228__auto____4349.invoke(worker.clj:354)
>     at clojure.lang.AFn.applyToHelper(AFn.java:185)
>     at clojure.lang.AFn.applyTo(AFn.java:151)
>     at clojure.core$apply.invoke(core.clj:601)
>     at backtype.storm.daemon.worker$fn__4348$mk_worker__4404.doInvoke(worker.clj:323)
>     at clojure.lang.RestFn.invoke(RestFn.java:512)
>     at backtype.storm.daemon.supervisor$fn__4807.invoke(supervisor.clj:467)
>     at clojure.lang.MultiFn.invoke(MultiFn.java:177)
>     at backtype.storm.daemon.supervisor$sync_processes$iter__4684__4688$fn__4689.invoke(supervisor.clj:249)
>     at clojure.lang.LazySeq.sval(LazySeq.java:42)
>     at clojure.lang.LazySeq.seq(LazySeq.java:60)
>     at clojure.lang.RT.seq(RT.java:473)
>     at clojure.core$seq.invoke(core.clj:133)
>     at clojure.core$dorun.invoke(core.clj:2725)
>     at clojure.core$doall.invoke(core.clj:2741)
>     at backtype.storm.daemon.supervisor$sync_processes.invoke(supervisor.clj:237)
>     at clojure.lang.AFn.applyToHelper(AFn.java:161)
>     at clojure.lang.AFn.applyTo(AFn.java:151)
>     at clojure.core$apply.invoke(core.clj:603)
>     at clojure.core$partial$fn__4070.doInvoke(core.clj:2343)
>     at clojure.lang.RestFn.invoke(RestFn.java:397)
>     at backtype.storm.event$event_manager$fn__2507.invoke(event.clj:24)
>     at clojure.lang.AFn.run(AFn.java:24)
>     at java.lang.Thread.run(Thread.java:724)
> Caused by: org.apache.thrift7.TException: Negative length: -72553216
>     at org.apache.thrift7.protocol.TBinaryProtocol.checkReadLength(TBinaryProtocol.java:388)
>     at org.apache.thrift7.protocol.TBinaryProtocol.readBinary(TBinaryProtocol.java:363)
>     at org.apache.thrift7.protocol.TProtocolUtil.skip(TProtocolUtil.java:102)
>     at org.apache.thrift7.protocol.TProtocolUtil.skip(TProtocolUtil.java:60)
>     at backtype.storm.generated.Grouping.readValue(Grouping.java:353)
>     at org.apache.thrift7.TUnion.read(TUnion.java:135)
>     at org.apache.thrift7.TDeserializer.deserialize(TDeserializer.java:69)
>     at storm.trident.util.TridentUtils.thriftDeserialize(TridentUtils.java:108)
>     ... 79 more
> 2653 [Thread-9] INFO  backtype.storm.util  - Halting process: ("Error on initialization")



--
This message was sent by Atlassian JIRA
(v6.2#6252)