You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by "James Xu (JIRA)" <ji...@apache.org> on 2013/12/14 17:09:06 UTC

[jira] [Created] (STORM-113) TridentUtils.thriftDeserialize() is used unsafely when running in local mode.

James Xu created STORM-113:
------------------------------

             Summary: TridentUtils.thriftDeserialize() is used unsafely when running in local mode.
                 Key: STORM-113
                 URL: https://issues.apache.org/jira/browse/STORM-113
             Project: Apache Storm (Incubating)
          Issue Type: Bug
            Reporter: James Xu


https://github.com/nathanmarz/storm/issues/500

When running a trident topology with multiple workers in local mode, each worker runs as a thread in a single jvm. These worker threads call TridentUtils.thriftDeserialize() concurrently when loading the (serialized) topology. However, this method is not threadsafe. In particular, it uses a static reference to a TDeserializer, which in turn uses an instance of TMemoryInputTransport, which is stateful. The end result is sporadic and nondeterministic Thrift exceptions when starting the workers.

This is not difficult to fix, but I'm not sure which approach the committers want to take. I made the deserializer (and serializer) ThreadLocal vars.

----------
juhoautio: Thanks for explaining the problem! This issue made my test fail* occasionally when trying to submit a topology to a local cluster.

For now I set number of workers & max parallelism to 1, but eventually I'd like to be able to enable some parallelism. I'm not sure if both parameters are significant, but here's a code extract for rerefence:

conf = new Config();
conf.setNumWorkers(1);
conf.setMaxTaskParallelism(1);
// ...
cluster = new LocalCluster();
cluster.submitTopology(.., conf, ..);
With this configuration the problem hasn't happened any more.

*) Stacktrace for the error (happens occasionally when number of workers is set to 2) :

2629 [Thread-9] ERROR backtype.storm.daemon.worker  - Error on initialization of server mk-worker
java.lang.RuntimeException: org.apache.thrift7.TException: Negative length: -72553216
    at storm.trident.util.TridentUtils.thriftDeserialize(TridentUtils.java:111)
    at storm.trident.planner.PartitionNode.readObject(PartitionNode.java:33)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1891)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
    at java.util.HashMap.readObject(HashMap.java:1183)
    at sun.reflect.GeneratedMethodAccessor29.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1891)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
    at backtype.storm.utils.Utils.deserialize(Utils.java:64)
    at backtype.storm.utils.Utils.getSetComponentObject(Utils.java:199)
    at backtype.storm.daemon.task$get_task_object.invoke(task.clj:56)
    at backtype.storm.daemon.task$mk_task_data$fn__3766.invoke(task.clj:158)
    at backtype.storm.util$assoc_apply_self.invoke(util.clj:731)
    at backtype.storm.daemon.task$mk_task_data.invoke(task.clj:152)
    at backtype.storm.daemon.task$mk_task.invoke(task.clj:163)
    at backtype.storm.daemon.executor$mk_executor$fn__3922.invoke(executor.clj:267)
    at clojure.core$map$fn__4087.invoke(core.clj:2432)
    at clojure.lang.LazySeq.sval(LazySeq.java:42)
    at clojure.lang.LazySeq.seq(LazySeq.java:60)
    at clojure.lang.RT.seq(RT.java:473)
    at clojure.core$seq.invoke(core.clj:133)
    at clojure.core.protocols$seq_reduce.invoke(protocols.clj:30)
    at clojure.core.protocols$fn__5875.invoke(protocols.clj:54)
    at clojure.core.protocols$fn__5828$G__5823__5841.invoke(protocols.clj:13)
    at clojure.core$reduce.invoke(core.clj:6030)
    at clojure.core$into.invoke(core.clj:6077)
    at backtype.storm.daemon.executor$mk_executor.invoke(executor.clj:267)
    at backtype.storm.daemon.worker$fn__4348$exec_fn__1228__auto____4349$iter__4354__4358$fn__4359.invoke(worker.clj:354)
    at clojure.lang.LazySeq.sval(LazySeq.java:42)
    at clojure.lang.LazySeq.seq(LazySeq.java:60)
    at clojure.lang.Cons.next(Cons.java:39)
    at clojure.lang.LazySeq.next(LazySeq.java:92)
    at clojure.lang.RT.next(RT.java:587)
    at clojure.core$next.invoke(core.clj:64)
    at clojure.core$dorun.invoke(core.clj:2726)
    at clojure.core$doall.invoke(core.clj:2741)
    at backtype.storm.daemon.worker$fn__4348$exec_fn__1228__auto____4349.invoke(worker.clj:354)
    at clojure.lang.AFn.applyToHelper(AFn.java:185)
    at clojure.lang.AFn.applyTo(AFn.java:151)
    at clojure.core$apply.invoke(core.clj:601)
    at backtype.storm.daemon.worker$fn__4348$mk_worker__4404.doInvoke(worker.clj:323)
    at clojure.lang.RestFn.invoke(RestFn.java:512)
    at backtype.storm.daemon.supervisor$fn__4807.invoke(supervisor.clj:467)
    at clojure.lang.MultiFn.invoke(MultiFn.java:177)
    at backtype.storm.daemon.supervisor$sync_processes$iter__4684__4688$fn__4689.invoke(supervisor.clj:249)
    at clojure.lang.LazySeq.sval(LazySeq.java:42)
    at clojure.lang.LazySeq.seq(LazySeq.java:60)
    at clojure.lang.RT.seq(RT.java:473)
    at clojure.core$seq.invoke(core.clj:133)
    at clojure.core$dorun.invoke(core.clj:2725)
    at clojure.core$doall.invoke(core.clj:2741)
    at backtype.storm.daemon.supervisor$sync_processes.invoke(supervisor.clj:237)
    at clojure.lang.AFn.applyToHelper(AFn.java:161)
    at clojure.lang.AFn.applyTo(AFn.java:151)
    at clojure.core$apply.invoke(core.clj:603)
    at clojure.core$partial$fn__4070.doInvoke(core.clj:2343)
    at clojure.lang.RestFn.invoke(RestFn.java:397)
    at backtype.storm.event$event_manager$fn__2507.invoke(event.clj:24)
    at clojure.lang.AFn.run(AFn.java:24)
    at java.lang.Thread.run(Thread.java:724)
Caused by: org.apache.thrift7.TException: Negative length: -72553216
    at org.apache.thrift7.protocol.TBinaryProtocol.checkReadLength(TBinaryProtocol.java:388)
    at org.apache.thrift7.protocol.TBinaryProtocol.readBinary(TBinaryProtocol.java:363)
    at org.apache.thrift7.protocol.TProtocolUtil.skip(TProtocolUtil.java:102)
    at org.apache.thrift7.protocol.TProtocolUtil.skip(TProtocolUtil.java:60)
    at backtype.storm.generated.Grouping.readValue(Grouping.java:353)
    at org.apache.thrift7.TUnion.read(TUnion.java:135)
    at org.apache.thrift7.TDeserializer.deserialize(TDeserializer.java:69)
    at storm.trident.util.TridentUtils.thriftDeserialize(TridentUtils.java:108)
    ... 79 more
2653 [Thread-9] INFO  backtype.storm.util  - Halting process: ("Error on initialization")



--
This message was sent by Atlassian JIRA
(v6.1.4#6159)