You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Petr Janeček <Ja...@seznam.cz> on 2017/01/23 12:52:45 UTC
Running a local cluster under a nondefault classloader
Hello!
The text below is an edited question from Stack Overflow (http://
stackoverflow.com/questions/41745602/
(http://stackoverflow.com/questions/41745602/)), I'm aware that markdown is
not really supported.
A Local Cluster from a web classloader
======================================
I'm trying to run a local cluster from a web container (yes, it's only for
dev & testing purposes) and am having difficulty with classloaders.
Direct approach
---------------
When I do it the easy and recommended (http://storm.apache.org/releases/
current/Local-mode.html) way,
ILocalCluster localCluster = new LocalCluster();
localCluster.submitTopology(topologyName, stormConf, topology);
I get rewarded with
Async loop died!: java.lang.ClassCastException: my.company.storm.bolt.
SomeFilteringBolt cannot be cast to org.apache.storm.task.IBolt
at org.apache.storm.daemon.executor$fn__7953$fn__7966.invoke
(executor.clj:787)
at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:482)
at clojure.lang.AFn.run(AFn.java:22)
at java.lang.Thread.run(Thread.java:745)
This is because the classloader used to load and instantiate the `
StormTopology` is an instance of a Jetty `WebAppClassLoader`, but the (sub)
process spawned by `LocalCluster.submitTopology()` apparently uses the
system classloader. I confirmed this by logging the classloader in the
static block of the `SomeFilteringBolt` - the class is indeed loaded twice
and the bolt from WebAppCL obviously cannot be cast to a bolt on the system
classloader later on.
Expected behaviour
------------------
Now, this is surprising to me as I thought Storm would serialize the `
StormTopology` instance, "send" it locally, deserialize it and run it. If it
did that, though, it definitely would work. Rather it seems that it's
directly using the provided `StormTopology` instance which is problematic
under a different classloader.
What I have tried since
-----------------------
I tried setting these to `true` to force Storm to serialize my topology
locally. No change.
- TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE (http://storm.apache.org/releases/
current/javadocs/org/apache/storm/Config.html#TOPOLOGY_TESTING_ALWAYS_TRY_
SERIALIZE)
- and STORM_LOCAL_MODE_ZMQ (http://storm.apache.org/releases/current/
javadocs/org/apache/storm/Config.html#STORM_LOCAL_MODE_ZMQ)
I tried running the LocalCluster under the system classloader:
ClassLoader originalClassloader = Thread.currentThread().
getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(ClassLoader.
getSystemClassLoader());
Config topologyConf = createTopologyConfig();
Map<String, Object> stormConf = createStormConfig(topologyConf);
StormTopology topology = createTopology(topologyConf);
ILocalCluster localCluster = new LocalCluster();
localCluster.submitTopology(topologyName, stormConf, topology);
} finally {
Thread.currentThread().setContextClassLoader(originalClassloader);
}
This actually got me a bit further:
Thread died: java.lang.ExceptionInInitializerError
at clojure.core__init.__init0(Unknown Source)
at clojure.core__init.<clinit>(Unknown Source)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at clojure.lang.RT.classForName(RT.java:2154)
at clojure.lang.RT.classForName(RT.java:2163)
at clojure.lang.RT.loadClassForName(RT.java:2182)
at clojure.lang.RT.load(RT.java:436)
at clojure.lang.RT.load(RT.java:412)
at clojure.lang.RT.doInit(RT.java:454)
at clojure.lang.RT.<clinit>(RT.java:330)
at clojure.lang.Namespace.<init>(Namespace.java:34)
at clojure.lang.Namespace.findOrCreate(Namespace.java:176)
at clojure.lang.Var.internPrivate(Var.java:151)
at org.apache.storm.LocalCluster.<clinit>(Unknown Source)
at my.company.storm.LocalTopologyRunner.startTopology
(LocalTopologyRunner.java:146)
... 10 more
Caused by: java.lang.IllegalStateException: Attempting to call unbound
fn: #'clojure.core/refer
at clojure.lang.Var$Unbound.throwArity(Var.java:43)
at clojure.lang.AFn.invoke(AFn.java:32)
at clojure.lang.Var.invoke(Var.java:379)
at clojure.lang.RT.doInit(RT.java:467)
at clojure.lang.RT.<clinit>(RT.java:330)
... 18 more
Wat?!
The question
------------
How can I run a Storm topology in local mode safely from a classloader other
than the system classloader?
I'm running on Apache Storm 1.0.1, Jetty 8.1, Java 8u112 x64, Windows 7 x64.