You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Petr Janeček <Ja...@seznam.cz> on 2017/01/23 12:52:45 UTC

Running a local cluster under a nondefault classloader

Hello!

The text below is an edited question from Stack Overflow (http://
stackoverflow.com/questions/41745602/
(http://stackoverflow.com/questions/41745602/)), I'm aware that markdown is 
not really supported.


A Local Cluster from a web classloader
======================================

I'm trying to run a local cluster from a web container (yes, it's only for 
dev & testing purposes) and am having difficulty with classloaders.


Direct approach
---------------

When I do it the easy and recommended (http://storm.apache.org/releases/
current/Local-mode.html) way,

    ILocalCluster localCluster = new LocalCluster();
    localCluster.submitTopology(topologyName, stormConf, topology);

I get rewarded with

    Async loop died!: java.lang.ClassCastException: my.company.storm.bolt.
SomeFilteringBolt cannot be cast to org.apache.storm.task.IBolt
        at org.apache.storm.daemon.executor$fn__7953$fn__7966.invoke
(executor.clj:787)
        at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:482)
        at clojure.lang.AFn.run(AFn.java:22)
        at java.lang.Thread.run(Thread.java:745)

This is because the classloader used to load and instantiate the `
StormTopology` is an instance of a Jetty `WebAppClassLoader`, but the (sub)
process spawned by `LocalCluster.submitTopology()` apparently uses the 
system classloader. I confirmed this by logging the classloader in the 
static block of the `SomeFilteringBolt` - the class is indeed loaded twice 
and the bolt from WebAppCL obviously cannot be cast to a bolt on the system 
classloader later on.

Expected behaviour
------------------

Now, this is surprising to me as I thought Storm would serialize the `
StormTopology` instance, "send" it locally, deserialize it and run it. If it
did that, though, it definitely would work. Rather it seems that it's 
directly using the provided `StormTopology` instance which is problematic 
under a different classloader.

What I have tried since
-----------------------

I tried setting these to `true` to force Storm to serialize my topology 
locally. No change.

- TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE (http://storm.apache.org/releases/
current/javadocs/org/apache/storm/Config.html#TOPOLOGY_TESTING_ALWAYS_TRY_
SERIALIZE)
- and STORM_LOCAL_MODE_ZMQ (http://storm.apache.org/releases/current/
javadocs/org/apache/storm/Config.html#STORM_LOCAL_MODE_ZMQ)


I tried running the LocalCluster under the system classloader:

    ClassLoader originalClassloader = Thread.currentThread().
getContextClassLoader();
    try {
        Thread.currentThread().setContextClassLoader(ClassLoader.
getSystemClassLoader());
            
        Config topologyConf = createTopologyConfig();
        Map<String, Object> stormConf = createStormConfig(topologyConf);
        StormTopology topology = createTopology(topologyConf);
            
        ILocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology(topologyName, stormConf, topology);
    } finally {
        Thread.currentThread().setContextClassLoader(originalClassloader);
    }

This actually got me a bit further:

    Thread  died: java.lang.ExceptionInInitializerError
        at clojure.core__init.__init0(Unknown Source)
        at clojure.core__init.<clinit>(Unknown Source)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at clojure.lang.RT.classForName(RT.java:2154)
        at clojure.lang.RT.classForName(RT.java:2163)
        at clojure.lang.RT.loadClassForName(RT.java:2182)
        at clojure.lang.RT.load(RT.java:436)
        at clojure.lang.RT.load(RT.java:412)
        at clojure.lang.RT.doInit(RT.java:454)
        at clojure.lang.RT.<clinit>(RT.java:330)
        at clojure.lang.Namespace.<init>(Namespace.java:34)
        at clojure.lang.Namespace.findOrCreate(Namespace.java:176)
        at clojure.lang.Var.internPrivate(Var.java:151)
        at org.apache.storm.LocalCluster.<clinit>(Unknown Source)
        at my.company.storm.LocalTopologyRunner.startTopology
(LocalTopologyRunner.java:146)
        ... 10 more
    Caused by: java.lang.IllegalStateException: Attempting to call unbound 
fn: #'clojure.core/refer
        at clojure.lang.Var$Unbound.throwArity(Var.java:43)
        at clojure.lang.AFn.invoke(AFn.java:32)
        at clojure.lang.Var.invoke(Var.java:379)
        at clojure.lang.RT.doInit(RT.java:467)
        at clojure.lang.RT.<clinit>(RT.java:330)
        ... 18 more

Wat?!

The question
------------

How can I run a Storm topology in local mode safely from a classloader other
than the system classloader?

I'm running on Apache Storm 1.0.1, Jetty 8.1, Java 8u112 x64, Windows 7 x64.