You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/12/19 00:54:13 UTC

[3/8] storm git commit: STORM-2217: Make DRPC pure java with Jersy and prepare for separating classpaths.

STORM-2217: Make DRPC pure java with Jersy and prepare for separating classpaths.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ea44062f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ea44062f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ea44062f

Branch: refs/heads/master
Commit: ea44062fdaa60cbc8faff54a26862848a527b25c
Parents: 607ba8a
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Nov 24 14:43:31 2016 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Dec 5 08:09:28 2016 -0600

----------------------------------------------------------------------
 bin/storm.py                                    |   2 +-
 pom.xml                                         |  16 +
 storm-core/pom.xml                              |  12 +
 .../src/clj/org/apache/storm/daemon/drpc.clj    |  82 --
 .../src/jvm/org/apache/storm/LocalDRPC.java     |  29 +-
 .../jvm/org/apache/storm/daemon/DrpcServer.java | 357 -------
 .../org/apache/storm/daemon/StormCommon.java    |   5 +-
 .../daemon/drpc/BlockingOutstandingRequest.java |  67 ++
 .../jvm/org/apache/storm/daemon/drpc/DRPC.java  | 215 ++++
 .../apache/storm/daemon/drpc/DRPCServer.java    | 168 ++++
 .../apache/storm/daemon/drpc/DRPCThrift.java    |  58 ++
 .../storm/daemon/drpc/OutstandingRequest.java   |  58 ++
 .../storm/daemon/drpc/RequestFactory.java       |  24 +
 .../webapp/AuthorizationExceptionMapper.java    |  39 +
 .../daemon/drpc/webapp/DRPCApplication.java     |  47 +
 .../daemon/drpc/webapp/DRPCExceptionMapper.java |  61 ++
 .../storm/daemon/drpc/webapp/DRPCResource.java  |  63 ++
 .../daemon/drpc/webapp/ReqContextFilter.java    |  69 ++
 .../storm/drpc/DRPCInvocationsClient.java       |  17 +
 .../storm/generated/DRPCExceptionType.java      |  68 ++
 .../storm/generated/DRPCExecutionException.java | 124 ++-
 .../generated/DistributedRPCInvocations.java    | 969 +++++++++++++++++++
 .../apache/storm/security/auth/AuthUtils.java   |  14 +-
 .../apache/storm/security/auth/ReqContext.java  |  14 +-
 .../authorizer/DRPCSimpleACLAuthorizer.java     |   5 +-
 .../src/jvm/org/apache/storm/ui/UIHelpers.java  |  68 +-
 .../src/jvm/org/apache/storm/utils/Utils.java   |   1 +
 .../py/storm/DistributedRPCInvocations-remote   |   7 +
 .../src/py/storm/DistributedRPCInvocations.py   | 209 ++++
 storm-core/src/py/storm/ttypes.py               |  35 +-
 storm-core/src/storm.thrift                     |   9 +
 .../test/clj/org/apache/storm/drpc_test.clj     |  28 -
 .../storm/security/auth/drpc_auth_test.clj      | 320 ------
 .../storm/daemon/drpc/DRPCServerTest.java       | 214 ++++
 .../org/apache/storm/daemon/drpc/DRPCTest.java  | 252 +++++
 35 files changed, 2887 insertions(+), 839 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/bin/storm.py
----------------------------------------------------------------------
diff --git a/bin/storm.py b/bin/storm.py
index c48a2b9..a2dff80 100755
--- a/bin/storm.py
+++ b/bin/storm.py
@@ -759,7 +759,7 @@ def drpc():
         "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(), "cluster.xml")
     ]
     exec_storm_class(
-        "org.apache.storm.daemon.drpc",
+        "org.apache.storm.daemon.drpc.DRPCServer",
         jvmtype="-server",
         daemonName="drpc",
         jvmopts=jvmopts,

http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 12f6e91..6f1ba86 100644
--- a/pom.xml
+++ b/pom.xml
@@ -298,6 +298,7 @@
         <wagonVersion>1.0</wagonVersion>
         <qpid.version>0.32</qpid.version>
         <eventhubs.client.version>1.0.1</eventhubs.client.version>
+        <jersey.version>2.24.1</jersey.version>
     </properties>
 
     <modules>
@@ -565,6 +566,21 @@
     <dependencyManagement>
         <dependencies>
             <dependency>
+                <groupId>org.glassfish.jersey.core</groupId>
+                <artifactId>jersey-server</artifactId>
+                <version>${jersey.version}</version>
+            </dependency> 
+            <dependency>
+                <groupId>org.glassfish.jersey.containers</groupId> 
+                <artifactId>jersey-container-servlet-core</artifactId>
+                <version>${jersey.version}</version>
+            </dependency> 
+             <dependency>
+                <groupId>org.glassfish.jersey.containers</groupId> 
+                <artifactId>jersey-container-jetty-http</artifactId>
+                <version>${jersey.version}</version>
+            </dependency> 
+            <dependency>
                 <groupId>org.hdrhistogram</groupId>
                 <artifactId>HdrHistogram</artifactId>
                 <version>${hdrhistogram.version}</version>

http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index b6424d7..9c47914 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -372,6 +372,18 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+           <groupId>org.glassfish.jersey.core</groupId>
+            <artifactId>jersey-server</artifactId>
+        </dependency> 
+        <dependency>
+            <groupId>org.glassfish.jersey.containers</groupId> 
+            <artifactId>jersey-container-servlet-core</artifactId>
+        </dependency> 
+        <dependency>
+            <groupId>org.glassfish.jersey.containers</groupId> 
+            <artifactId>jersey-container-jetty-http</artifactId>
+        </dependency>
     </dependencies>
     <build>
         <sourceDirectory>src/jvm</sourceDirectory>

http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/drpc.clj b/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
deleted file mode 100644
index 872407c..0000000
--- a/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
+++ /dev/null
@@ -1,82 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns org.apache.storm.daemon.drpc
-  (:import [org.apache.storm.security.auth AuthUtils ReqContext]
-           [org.apache.storm.daemon DrpcServer]
-           [org.apache.storm.metric StormMetricsRegistry])
-  (:import [org.apache.storm.utils Utils])
-  (:import [org.apache.storm.utils ConfigUtils])
-  (:use [org.apache.storm config log util])
-  (:use [org.apache.storm.ui helpers])
-  (:use compojure.core)
-  (:use ring.middleware.reload)
-  (:require [compojure.handler :as handler])
-  (:gen-class))
-
-(def drpc:num-execute-http-requests (StormMetricsRegistry/registerMeter "drpc:num-execute-http-requests"))
-
-(defn handle-request [handler]
-  (fn [request]
-    (handler request)))
-
-(defn populate-context!
-  "Populate the Storm RequestContext from an servlet-request. This should be called in each handler"
-  [http-creds-handler servlet-request]
-    (when http-creds-handler
-      (.populateContext http-creds-handler (ReqContext/context) servlet-request)))
-
-(defn webapp [handler http-creds-handler]
-  (.mark drpc:num-execute-http-requests)
-  (->
-    (routes
-      (POST "/drpc/:func" [:as {:keys [body servlet-request]} func & m]
-        (let [args (slurp body)]
-          (populate-context! http-creds-handler servlet-request)
-          (.execute handler func args)))
-      (POST "/drpc/:func/" [:as {:keys [body servlet-request]} func & m]
-        (let [args (slurp body)]
-          (populate-context! http-creds-handler servlet-request)
-          (.execute handler func args)))
-      (GET "/drpc/:func/:args" [:as {:keys [servlet-request]} func args & m]
-          (populate-context! http-creds-handler servlet-request)
-          (.execute handler func args))
-      (GET "/drpc/:func/" [:as {:keys [servlet-request]} func & m]
-          (populate-context! http-creds-handler servlet-request)
-          (.execute handler func ""))
-      (GET "/drpc/:func" [:as {:keys [servlet-request]} func & m]
-          (populate-context! http-creds-handler servlet-request)
-          (.execute handler func "")))
-    (wrap-reload '[org.apache.storm.daemon.drpc])
-    handle-request))
-
-
-(defn launch-server!
-  ([]
-    (let [conf (clojurify-structure (ConfigUtils/readStormConfig))
-          drpc-http-port (int (conf DRPC-HTTP-PORT))
-          drpc-server (DrpcServer. conf)
-          http-creds-handler (AuthUtils/GetDrpcHttpCredentialsPlugin conf)]
-      (when (> drpc-http-port 0)
-        (let [app (-> (webapp drpc-server http-creds-handler)
-                    requests-middleware)]
-          (.setHttpServlet drpc-server (ring.util.servlet/servlet app))))
-      (.launchServer drpc-server)))
-)
-
-(defn -main []
-  (Utils/setupDefaultUncaughtExceptionHandler)
-  (launch-server!))

http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/LocalDRPC.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/LocalDRPC.java b/storm-core/src/jvm/org/apache/storm/LocalDRPC.java
index e3f34d1..685c35f 100644
--- a/storm-core/src/jvm/org/apache/storm/LocalDRPC.java
+++ b/storm-core/src/jvm/org/apache/storm/LocalDRPC.java
@@ -19,7 +19,8 @@ package org.apache.storm;
 
 import java.util.Map;
 
-import org.apache.storm.daemon.DrpcServer;
+import org.apache.storm.daemon.drpc.DRPC;
+import org.apache.storm.daemon.drpc.DRPCThrift;
 import org.apache.storm.generated.AuthorizationException;
 import org.apache.storm.generated.DRPCExecutionException;
 import org.apache.storm.generated.DRPCRequest;
@@ -36,13 +37,13 @@ import org.apache.thrift.TException;
  */
 public class LocalDRPC implements ILocalDRPC {
 
-    private final DrpcServer handler;
+    private final DRPC drpc;
     private final String serviceId;
 
     public LocalDRPC() {
         Map<String, Object> conf = ConfigUtils.readStormConfig();
-        handler = new DrpcServer(conf);
-        serviceId = ServiceRegistry.registerService(handler);
+        drpc = new DRPC(conf);
+        serviceId = ServiceRegistry.registerService(new DRPCThrift(drpc));
     }
 
     @Override
@@ -52,32 +53,38 @@ public class LocalDRPC implements ILocalDRPC {
 
     @Override
     public void result(String id, String result) throws AuthorizationException, TException {
-        handler.result(id, result);
+        drpc.returnResult(id, result);
     }
 
     @Override
     public String execute(String functionName, String funcArgs) throws DRPCExecutionException, AuthorizationException, TException {
-        return handler.execute(functionName, funcArgs);
+        return drpc.executeBlocking(functionName, funcArgs);
     }
 
     @Override
     public void failRequest(String id) throws AuthorizationException, TException {
-        handler.failRequest(id);
+        drpc.failRequest(id, null);
     }
+    
 
     @Override
-    public void shutdown() {
-        close();
+    public void failRequestV2(String id, DRPCExecutionException e) throws AuthorizationException, TException {
+        drpc.failRequest(id, e);
     }
 
     @Override
     public DRPCRequest fetchRequest(String functionName) throws AuthorizationException, TException {
-        return handler.fetchRequest(functionName);
+        return drpc.fetchRequest(functionName);
     }
 
     @Override
     public void close() {
         ServiceRegistry.unregisterService(this.serviceId);
-        this.handler.close();
+        drpc.close();
+    }
+    
+    @Override
+    public void shutdown() {
+        close();
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/daemon/DrpcServer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/DrpcServer.java b/storm-core/src/jvm/org/apache/storm/daemon/DrpcServer.java
deleted file mode 100644
index 21308fd..0000000
--- a/storm-core/src/jvm/org/apache/storm/daemon/DrpcServer.java
+++ /dev/null
@@ -1,357 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon;
-
-import com.codahale.metrics.Meter;
-import com.google.common.collect.ImmutableMap;
-import org.apache.commons.lang.StringUtils;
-import org.apache.storm.Config;
-import org.apache.storm.generated.*;
-import org.apache.storm.logging.ThriftAccessLogger;
-import org.apache.storm.metric.StormMetricsRegistry;
-import org.apache.storm.security.auth.*;
-import org.apache.storm.security.auth.authorizer.DRPCAuthorizerBase;
-import org.apache.storm.ui.FilterConfiguration;
-import org.apache.storm.ui.IConfigurator;
-import org.apache.storm.ui.UIHelpers;
-import org.apache.storm.utils.Time;
-import org.apache.storm.utils.Utils;
-import org.apache.storm.utils.VersionInfo;
-import org.apache.thrift.TException;
-import org.eclipse.jetty.server.Server;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.servlet.Servlet;
-import java.security.Principal;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class DrpcServer implements DistributedRPC.Iface, DistributedRPCInvocations.Iface, AutoCloseable {
-
-    private static final Logger LOG = LoggerFactory.getLogger(DrpcServer.class);
-    private final Long timeoutCheckSecs = 5L;
-
-    private Map conf;
-
-    private ThriftServer handlerServer;
-    private ThriftServer invokeServer;
-    private IHttpCredentialsPlugin httpCredsHandler;
-
-    private Thread clearThread;
-
-    private IAuthorizer authorizer;
-
-    private Servlet httpServlet;
-
-    private AtomicInteger ctr = new AtomicInteger(0);
-    private ConcurrentHashMap<String, ConcurrentLinkedQueue<DRPCRequest>> requestQueues = new ConcurrentHashMap<String, ConcurrentLinkedQueue<DRPCRequest>>();
-
-
-    private static class InternalRequest {
-        public final Semaphore sem;
-        public final int startTimeSecs;
-        public final String function;
-        public final DRPCRequest request;
-        public volatile Object result;
-
-        public InternalRequest(String function, DRPCRequest request) {
-            sem = new Semaphore(0);
-            startTimeSecs = Time.currentTimeSecs();
-            this.function = function;
-            this.request = request;
-        }
-    }
-
-    private ConcurrentHashMap<String, InternalRequest> outstandingRequests = new ConcurrentHashMap<>();
-
-    private final static Meter meterHttpRequests = StormMetricsRegistry.registerMeter("drpc:num-execute-http-requests");
-    private final static Meter meterExecuteCalls = StormMetricsRegistry.registerMeter("drpc:num-execute-calls");
-    private final static Meter meterResultCalls = StormMetricsRegistry.registerMeter("drpc:num-result-calls");
-    private final static Meter meterFailRequestCalls = StormMetricsRegistry.registerMeter("drpc:num-failRequest-calls");
-    private final static Meter meterFetchRequestCalls = StormMetricsRegistry.registerMeter("drpc:num-fetchRequest-calls");
-    private final static Meter meterShutdownCalls = StormMetricsRegistry.registerMeter("drpc:num-shutdown-calls");
-
-    public DrpcServer(Map conf) {
-        this.conf = conf;
-        this.authorizer = mkAuthorizationHandler((String) (this.conf.get(Config.DRPC_AUTHORIZER)));
-        initClearThread();
-    }
-
-    public void setHttpServlet(Servlet httpServlet) {
-        this.httpServlet = httpServlet;
-    }
-
-    private ThriftServer initHandlerServer(final DrpcServer service) throws Exception {
-        int port = (int) conf.get(Config.DRPC_PORT);
-        if (port > 0) {
-            handlerServer = new ThriftServer(conf, new DistributedRPC.Processor<DistributedRPC.Iface>(service), ThriftConnectionType.DRPC);
-        }
-        return handlerServer;
-    }
-
-    private ThriftServer initInvokeServer(final DrpcServer service) throws Exception {
-        invokeServer = new ThriftServer(conf, new DistributedRPCInvocations.Processor<DistributedRPCInvocations.Iface>(service),
-                ThriftConnectionType.DRPC_INVOCATIONS);
-        return invokeServer;
-    }
-
-    private void initHttp() throws Exception {
-        LOG.info("Starting  RPC Http servers...");
-        Integer drpcHttpPort = (Integer) conf.get(Config.DRPC_HTTP_PORT);
-        if (drpcHttpPort != null && drpcHttpPort > 0) {
-            String filterClass = (String) (conf.get(Config.DRPC_HTTP_FILTER));
-            Map<String, String> filterParams = (Map<String, String>) (conf.get(Config.DRPC_HTTP_FILTER_PARAMS));
-            FilterConfiguration filterConfiguration = new FilterConfiguration(filterClass, filterParams);
-            final List<FilterConfiguration> filterConfigurations = Arrays.asList(filterConfiguration);
-            final Integer httpsPort = Utils.getInt(conf.get(Config.DRPC_HTTPS_PORT), 0);
-            final String httpsKsPath = (String) (conf.get(Config.DRPC_HTTPS_KEYSTORE_PATH));
-            final String httpsKsPassword = (String) (conf.get(Config.DRPC_HTTPS_KEYSTORE_PASSWORD));
-            final String httpsKsType = (String) (conf.get(Config.DRPC_HTTPS_KEYSTORE_TYPE));
-            final String httpsKeyPassword = (String) (conf.get(Config.DRPC_HTTPS_KEY_PASSWORD));
-            final String httpsTsPath = (String) (conf.get(Config.DRPC_HTTPS_TRUSTSTORE_PATH));
-            final String httpsTsPassword = (String) (conf.get(Config.DRPC_HTTPS_TRUSTSTORE_PASSWORD));
-            final String httpsTsType = (String) (conf.get(Config.DRPC_HTTPS_TRUSTSTORE_TYPE));
-            final Boolean httpsWantClientAuth = (Boolean) (conf.get(Config.DRPC_HTTPS_WANT_CLIENT_AUTH));
-            final Boolean httpsNeedClientAuth = (Boolean) (conf.get(Config.DRPC_HTTPS_NEED_CLIENT_AUTH));
-
-            UIHelpers.stormRunJetty(drpcHttpPort, new IConfigurator() {
-                @Override
-                public void execute(Server s) {
-                    UIHelpers.configSsl(s, httpsPort, httpsKsPath, httpsKsPassword, httpsKsType, httpsKeyPassword, httpsTsPath, httpsTsPassword, httpsTsType,
-                            httpsNeedClientAuth, httpsWantClientAuth);
-                    UIHelpers.configFilter(s, httpServlet, filterConfigurations);
-                }
-            });
-        }
-
-    }
-
-    private void initThrift() throws Exception {
-
-        handlerServer = initHandlerServer(this);
-        invokeServer = initInvokeServer(this);
-        httpCredsHandler = AuthUtils.GetDrpcHttpCredentialsPlugin(conf);
-        Utils.addShutdownHookWithForceKillIn1Sec(new Runnable() {
-            @Override
-            public void run() {
-                if (handlerServer != null)
-                    handlerServer.stop();
-                invokeServer.stop();
-            }
-        });
-        LOG.info("Starting Distributed RPC servers...");
-        new Thread(new Runnable() {
-
-            @Override
-            public void run() {
-                invokeServer.serve();
-            }
-        }).start();
-
-        StormMetricsRegistry.startMetricsReporters(conf);
-
-        if (handlerServer != null)
-            handlerServer.serve();
-    }
-
-    private void initClearThread() {
-        clearThread = Utils.asyncLoop(new Callable() {
-
-            @Override
-            public Object call() throws Exception {
-                for (Map.Entry<String, InternalRequest> e : outstandingRequests.entrySet()) {
-                    InternalRequest internalRequest = e.getValue();
-                    if (Time.deltaSecs(internalRequest.startTimeSecs) > Utils.getInt(conf.get(Config.DRPC_REQUEST_TIMEOUT_SECS))) {
-                        String id = e.getKey();
-                        Semaphore sem = internalRequest.sem;
-                        if (sem != null) {
-                            String func = internalRequest.function;
-                            acquireQueue(func).remove(internalRequest.request);
-                            LOG.warn("Timeout DRPC request id: {} start at {}", id, e.getValue());
-                            sem.release();
-                        }
-                        cleanup(id);
-                    }
-                }
-                return getTimeoutCheckSecs();
-            }
-        });
-    }
-
-    public Long getTimeoutCheckSecs() {
-        return timeoutCheckSecs;
-    }
-
-    public void launchServer() throws Exception {
-        LOG.info("Starting drpc server for storm version {}", VersionInfo.getVersion());
-        initHttp();
-        initThrift();
-    }
-
-    @Override
-    public void close() {
-        meterShutdownCalls.mark();
-        clearThread.interrupt();
-    }
-
-    public void cleanup(String id) {
-        outstandingRequests.remove(id);
-    }
-
-    @Override
-    public String execute(String functionName, String funcArgs) throws DRPCExecutionException, AuthorizationException, org.apache.thrift.TException {
-        meterExecuteCalls.mark();
-        LOG.debug("Received DRPC request for {} ({}) at {} ", functionName, funcArgs, System.currentTimeMillis());
-        Map<String, String> map = new HashMap<>();
-        map.put(DRPCAuthorizerBase.FUNCTION_NAME, functionName);
-        checkAuthorization(authorizer, map, "execute", functionName);
-
-        int newid = 0;
-        int orig = 0;
-        do {
-            orig = ctr.get();
-            newid = (orig + 1) % 1000000000;
-        } while (!ctr.compareAndSet(orig, newid));
-        String strid = String.valueOf(newid);
-
-        DRPCRequest req = new DRPCRequest(funcArgs, strid);
-        InternalRequest internalRequest = new InternalRequest(functionName, req);
-        this.outstandingRequests.put(strid, internalRequest);
-        ConcurrentLinkedQueue<DRPCRequest> queue = acquireQueue(functionName);
-        queue.add(req);
-        LOG.debug("Waiting for DRPC request for {} {} at {}", functionName, funcArgs, System.currentTimeMillis());
-        try {
-            internalRequest.sem.acquire();
-        } catch (InterruptedException e) {
-            LOG.error("acquire fail ", e);
-        }
-        LOG.debug("Acquired for DRPC request for {} {} at {}", functionName, funcArgs, System.currentTimeMillis());
-
-        Object result = internalRequest.result;
-
-        LOG.debug("Returning for DRPC request for " + functionName + " " + funcArgs + " at " + (System.currentTimeMillis()));
-
-        this.cleanup(strid);
-
-        if (result instanceof DRPCExecutionException) {
-            throw (DRPCExecutionException) result;
-        }
-        if (result == null) {
-            throw new DRPCExecutionException("Request timed out");
-        }
-        return (String) result;
-    }
-
-    @Override
-    public void result(String id, String result) throws AuthorizationException, TException {
-        meterResultCalls.mark();
-        InternalRequest internalRequest = this.outstandingRequests.get(id);
-        if (internalRequest != null) {
-            Map<String, String> map = ImmutableMap.of(DRPCAuthorizerBase.FUNCTION_NAME, internalRequest.function);
-            checkAuthorization(authorizer, map, "result", internalRequest.function);
-            Semaphore sem = internalRequest.sem;
-            LOG.debug("Received result {} for {} at {}", result, id, System.currentTimeMillis());
-            if (sem != null) {
-                internalRequest.result = result;
-                sem.release();
-            }
-        }
-    }
-
-    @Override
-    public DRPCRequest fetchRequest(String functionName) throws AuthorizationException, TException {
-        meterFetchRequestCalls.mark();
-        Map<String, String> map = new HashMap<>();
-        map.put(DRPCAuthorizerBase.FUNCTION_NAME, functionName);
-        checkAuthorization(authorizer, map, "fetchRequest", functionName);
-        ConcurrentLinkedQueue<DRPCRequest> queue = acquireQueue(functionName);
-        DRPCRequest req = queue.poll();
-        if (req != null) {
-            LOG.debug("Fetched request for {} at {}", functionName, System.currentTimeMillis());
-            return req;
-        } else {
-            return new DRPCRequest("", "");
-        }
-    }
-
-    @Override
-    public void failRequest(String id) throws AuthorizationException, TException {
-        meterFailRequestCalls.mark();
-        InternalRequest internalRequest = this.outstandingRequests.get(id);
-        if (internalRequest != null) {
-            Map<String, String> map = new HashMap<>();
-            map.put(DRPCAuthorizerBase.FUNCTION_NAME, internalRequest.function);
-            checkAuthorization(authorizer, map, "failRequest", internalRequest.function);
-            Semaphore sem = internalRequest.sem;
-            if (sem != null) {
-                internalRequest.result = new DRPCExecutionException("Request failed");
-                sem.release();
-            }
-        }
-    }
-
-    protected ConcurrentLinkedQueue<DRPCRequest> acquireQueue(String function) {
-        ConcurrentLinkedQueue<DRPCRequest> reqQueue = requestQueues.get(function);
-        if (reqQueue == null) {
-            reqQueue = new ConcurrentLinkedQueue<>();
-            ConcurrentLinkedQueue<DRPCRequest> old = requestQueues.putIfAbsent(function, reqQueue);
-            if (old != null) {
-                reqQueue = old;
-            }
-        }
-        return reqQueue;
-    }
-
-    private void checkAuthorization(IAuthorizer aclHandler, Map mapping, String operation, ReqContext reqContext, String function) throws AuthorizationException {
-        if (reqContext != null) {
-            ThriftAccessLogger.logAccessFunction(reqContext.requestID(), reqContext.remoteAddress(), reqContext.principal(), operation, function);
-        }
-        if (aclHandler != null) {
-            if (reqContext == null)
-                reqContext = ReqContext.context();
-            if (!aclHandler.permit(reqContext, operation, mapping)) {
-                Principal principal = reqContext.principal();
-                String user = (principal != null) ? principal.getName() : "unknown";
-                throw new AuthorizationException("DRPC request '" + operation + "' for '" + user + "' user is not authorized");
-            }
-        }
-    }
-
-    private void checkAuthorization(IAuthorizer aclHandler, Map mapping, String operation, String function) throws AuthorizationException {
-        checkAuthorization(aclHandler, mapping, operation, ReqContext.context(), function);
-    }
-
-    // TO be replaced by Common.mkAuthorizationHandler
-    private IAuthorizer mkAuthorizationHandler(String klassname) {
-        IAuthorizer authorizer = null;
-        Class aznClass = null;
-        if (StringUtils.isNotBlank(klassname)) {
-            try {
-                aznClass = Class.forName(klassname);
-                authorizer = (IAuthorizer) aznClass.newInstance();
-                authorizer.prepare(conf);
-            } catch (Exception e) {
-                LOG.error("mkAuthorizationHandler failed!", e);
-            }
-        }
-        LOG.debug("authorization class name: {} class: {} handler: {}", klassname, aznClass, authorizer);
-        return authorizer;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
index 874b607..90753a7 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.daemon;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.storm.Config;
 import org.apache.storm.Constants;
 import org.apache.storm.Thrift;
@@ -552,8 +553,8 @@ public class StormCommon {
     protected IAuthorizer mkAuthorizationHandlerImpl(String klassName, Map conf)
             throws ClassNotFoundException, IllegalAccessException, InstantiationException {
         IAuthorizer aznHandler = null;
-        if (klassName != null) {
-            Class aznClass = Class.forName(klassName);
+        if (StringUtils.isNotBlank(klassName)) {
+            Class<?> aznClass = Class.forName(klassName);
             if (aznClass != null) {
                 aznHandler = (IAuthorizer) aznClass.newInstance();
                 if (aznHandler != null) {

http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/daemon/drpc/BlockingOutstandingRequest.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/drpc/BlockingOutstandingRequest.java b/storm-core/src/jvm/org/apache/storm/daemon/drpc/BlockingOutstandingRequest.java
new file mode 100644
index 0000000..619af65
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/drpc/BlockingOutstandingRequest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.drpc;
+
+import java.util.concurrent.Semaphore;
+
+import org.apache.storm.generated.DRPCExceptionType;
+import org.apache.storm.generated.DRPCExecutionException;
+import org.apache.storm.generated.DRPCRequest;
+
+public class BlockingOutstandingRequest extends OutstandingRequest {
+    public static final RequestFactory<BlockingOutstandingRequest> FACTORY = 
+            (function, request) -> new BlockingOutstandingRequest(function, request);
+    private Semaphore _sem;
+    private volatile String _result = null;
+    private volatile DRPCExecutionException _e = null;
+
+    public BlockingOutstandingRequest(String function, DRPCRequest req) {
+        super(function, req);
+        _sem = new Semaphore(0);
+    }
+
+    public String getResult() throws DRPCExecutionException {
+        try {
+            _sem.acquire();
+        } catch (InterruptedException e) {
+            //Ignored
+        }
+
+        if (_result != null) {
+            return _result;
+        }
+
+        if (_e == null) {
+            _e = new DRPCExecutionException("Internal Error: No Result and No Exception");
+            _e.set_type(DRPCExceptionType.INTERNAL_ERROR);
+        } 
+        throw _e;
+    }
+
+    @Override
+    public void returnResult(String result) {
+        _result = result;
+        _sem.release();
+    }
+
+    @Override
+    public void fail(DRPCExecutionException e) {
+        _e = e;
+        _sem.release();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/daemon/drpc/DRPC.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/drpc/DRPC.java b/storm-core/src/jvm/org/apache/storm/daemon/drpc/DRPC.java
new file mode 100644
index 0000000..7ddb6a9
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/drpc/DRPC.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.drpc;
+
+import java.security.Principal;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.storm.Config;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.DRPCExceptionType;
+import org.apache.storm.generated.DRPCExecutionException;
+import org.apache.storm.generated.DRPCRequest;
+import org.apache.storm.logging.ThriftAccessLogger;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.security.auth.IAuthorizer;
+import org.apache.storm.security.auth.ReqContext;
+import org.apache.storm.security.auth.authorizer.DRPCAuthorizerBase;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.Meter;
+import com.google.common.annotations.VisibleForTesting;
+
+public class DRPC implements AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(DRPC.class);
+    private static final DRPCRequest NOTHING_REQUEST = new DRPCRequest("","");
+    private static final DRPCExecutionException TIMED_OUT = new DRPCExecutionException("Timed Out");
+    private static final DRPCExecutionException SHUT_DOWN = new DRPCExecutionException("Server Shutting Down");
+    private static final DRPCExecutionException DEFAULT_FAILED = new DRPCExecutionException("Request failed");
+    static {
+        TIMED_OUT.set_type(DRPCExceptionType.SERVER_TIMEOUT);
+        SHUT_DOWN.set_type(DRPCExceptionType.SERVER_SHUTDOWN);
+        DEFAULT_FAILED.set_type(DRPCExceptionType.FAILED_REQUEST);
+    }
+    private static final Meter meterServerTimedOut = StormMetricsRegistry.registerMeter("drpc:num-server-timedout-requests");
+    private static final Meter meterExecuteCalls = StormMetricsRegistry.registerMeter("drpc:num-execute-calls");
+    private static final Meter meterResultCalls = StormMetricsRegistry.registerMeter("drpc:num-result-calls");
+    private static final Meter meterFailRequestCalls = StormMetricsRegistry.registerMeter("drpc:num-failRequest-calls");
+    private static final Meter meterFetchRequestCalls = StormMetricsRegistry.registerMeter("drpc:num-fetchRequest-calls");
+    
+    private static IAuthorizer mkAuthorizationHandler(String klassname, Map<String, Object> conf) {
+        try {
+            return StormCommon.mkAuthorizationHandler(klassname, conf);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+    
+    @VisibleForTesting
+    static void checkAuthorization(ReqContext reqContext, IAuthorizer auth, String operation, String function) throws AuthorizationException {
+        if (reqContext != null) {
+            ThriftAccessLogger.logAccessFunction(reqContext.requestID(), reqContext.remoteAddress(), reqContext.principal(), operation, function);
+        }
+        if (auth != null) {
+            Map<String, String> map = new HashMap<>();
+            map.put(DRPCAuthorizerBase.FUNCTION_NAME, function);
+            if (!auth.permit(reqContext, operation, map)) {
+                Principal principal = reqContext.principal();
+                String user = (principal != null) ? principal.getName() : "unknown";
+                throw new AuthorizationException("DRPC request '" + operation + "' for '" + user + "' user is not authorized");
+            }
+        }
+    }
+    
+    //Waiting to be fetched
+    private final ConcurrentHashMap<String, ConcurrentLinkedQueue<OutstandingRequest>> _queues = 
+            new ConcurrentHashMap<>();
+    //Waiting to be returned
+    private final ConcurrentHashMap<String, OutstandingRequest> _requests = 
+            new ConcurrentHashMap<>();
+    private final Timer _timer = new Timer();
+    private final AtomicLong _ctr = new AtomicLong(0);
+    private final IAuthorizer _auth;
+    
+    public DRPC(Map<String, Object> conf) {
+        this(mkAuthorizationHandler((String)conf.get(Config.DRPC_AUTHORIZER), conf),
+                Utils.getInt(conf.get(Config.DRPC_REQUEST_TIMEOUT_SECS), 600) * 1000);
+    }
+    
+    public DRPC(IAuthorizer auth, long timeoutMs) {
+        _auth = auth;
+        _timer.scheduleAtFixedRate(new TimerTask() {
+            @Override
+            public void run() {
+                cleanupAll(timeoutMs, TIMED_OUT);
+            }
+        }, timeoutMs/2, timeoutMs/2);
+    }
+    
+    
+    private void checkAuthorization(String operation, String function) throws AuthorizationException {
+        checkAuthorization(ReqContext.context(), _auth, operation, function);
+    }
+    
+    private void cleanup(String id) {
+        OutstandingRequest req = _requests.remove(id);
+        if (req != null && !req.wasFetched()) {
+            _queues.get(req.getFunction()).remove(req);
+        }
+    }
+
+    private void cleanupAll(long timeoutMs, DRPCExecutionException exp) {
+        for (Entry<String, OutstandingRequest> e : _requests.entrySet()) {
+            OutstandingRequest req = e.getValue();
+            if (req.isTimedOut(timeoutMs)) {
+                req.fail(exp);
+                cleanup(e.getKey());
+                meterServerTimedOut.mark();
+            }
+        }
+    }
+    
+    private String nextId() {
+        return String.valueOf(_ctr.incrementAndGet());
+    }
+
+    private ConcurrentLinkedQueue<OutstandingRequest> getQueue(String function) {
+        ConcurrentLinkedQueue<OutstandingRequest> queue = _queues.get(function);
+        if (queue == null) {
+            _queues.putIfAbsent(function, new ConcurrentLinkedQueue<>());
+            queue = _queues.get(function);
+        }
+        return queue;
+    }
+
+    public void returnResult(String id, String result) throws AuthorizationException {
+        meterResultCalls.mark();
+        LOG.debug("Got a result {} {}", id, result);
+        OutstandingRequest req = _requests.get(id);
+        if (req != null) {
+            checkAuthorization("result", req.getFunction());
+            req.returnResult(result);
+        }
+    }
+
+    public DRPCRequest fetchRequest(String functionName) throws AuthorizationException {
+        meterFetchRequestCalls.mark();
+        checkAuthorization("fetchRequest", functionName);
+        ConcurrentLinkedQueue<OutstandingRequest> q = getQueue(functionName);
+        OutstandingRequest req = q.poll();
+        if (req != null) {
+            req.fetched();
+            DRPCRequest ret = req.getRequest();
+            return ret;
+        }
+        return NOTHING_REQUEST;
+    }
+
+    public void failRequest(String id, DRPCExecutionException e) throws AuthorizationException {
+        meterFailRequestCalls.mark();
+        LOG.debug("Got a fail {}", id);
+        OutstandingRequest req = _requests.get(id);
+        if (req != null) {
+            checkAuthorization("failRequest", req.getFunction());
+            if (e == null) {
+                e = DEFAULT_FAILED;
+            }
+            req.fail(e);
+        }
+    }
+
+    public <T extends OutstandingRequest> T execute(String functionName, String funcArgs, RequestFactory<T> factory) throws AuthorizationException {
+        meterExecuteCalls.mark();
+        checkAuthorization("execute", functionName);
+        String id = nextId();
+        LOG.debug("Execute {} {}", functionName, funcArgs);
+        T req = factory.mkRequest(functionName, new DRPCRequest(funcArgs, id));
+        _requests.put(id, req);
+        ConcurrentLinkedQueue<OutstandingRequest> q = getQueue(functionName);
+        q.add(req);
+        return req;
+    }
+    
+    public String executeBlocking(String functionName, String funcArgs) throws DRPCExecutionException, AuthorizationException {
+        BlockingOutstandingRequest req = execute(functionName, funcArgs, BlockingOutstandingRequest.FACTORY);
+        try {
+            LOG.debug("Waiting for result {} {}",functionName, funcArgs);
+            return req.getResult();
+        } catch (DRPCExecutionException e) {
+            throw e;
+        } finally {
+            cleanup(req.getRequest().get_request_id());
+        }
+    }
+
+    @Override
+    public void close() {
+        _timer.cancel();
+        cleanupAll(0, SHUT_DOWN);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/daemon/drpc/DRPCServer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/drpc/DRPCServer.java b/storm-core/src/jvm/org/apache/storm/daemon/drpc/DRPCServer.java
new file mode 100644
index 0000000..34dba76
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/drpc/DRPCServer.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.drpc;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.Config;
+import org.apache.storm.daemon.drpc.webapp.DRPCApplication;
+import org.apache.storm.generated.DistributedRPC;
+import org.apache.storm.generated.DistributedRPCInvocations;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.security.auth.ThriftConnectionType;
+import org.apache.storm.security.auth.ThriftServer;
+import org.apache.storm.ui.FilterConfiguration;
+import org.apache.storm.ui.UIHelpers;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.glassfish.jersey.servlet.ServletContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.Meter;
+import com.google.common.annotations.VisibleForTesting;
+
+public class DRPCServer implements AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(DRPCServer.class);
+    private final static Meter meterShutdownCalls = StormMetricsRegistry.registerMeter("drpc:num-shutdown-calls");
+    
+    private static ThriftServer mkHandlerServer(final DistributedRPC.Iface service, Integer port, Map<String, Object> conf) {
+        ThriftServer ret = null;
+        if (port != null && port > 0) {
+            ret = new ThriftServer(conf, new DistributedRPC.Processor<>(service),
+                    ThriftConnectionType.DRPC);
+        }
+        return ret;
+    }
+
+    private static ThriftServer mkInvokeServer(final DistributedRPCInvocations.Iface service, int port, Map<String, Object> conf) {
+        return new ThriftServer(conf, new DistributedRPCInvocations.Processor<>(service),
+                ThriftConnectionType.DRPC_INVOCATIONS);
+    }
+    
+    private static Server mkHttpServer(Map<String, Object> conf, DRPC drpc) {
+        Integer drpcHttpPort = (Integer) conf.get(Config.DRPC_HTTP_PORT);
+        Server ret = null;
+        if (drpcHttpPort != null && drpcHttpPort > 0) {
+            LOG.info("Starting RPC HTTP servers...");
+            String filterClass = (String) (conf.get(Config.DRPC_HTTP_FILTER));
+            @SuppressWarnings("unchecked")
+            Map<String, String> filterParams = (Map<String, String>) (conf.get(Config.DRPC_HTTP_FILTER_PARAMS));
+            FilterConfiguration filterConfiguration = new FilterConfiguration(filterClass, filterParams);
+            final List<FilterConfiguration> filterConfigurations = Arrays.asList(filterConfiguration);
+            final Integer httpsPort = Utils.getInt(conf.get(Config.DRPC_HTTPS_PORT), 0);
+            final String httpsKsPath = (String) (conf.get(Config.DRPC_HTTPS_KEYSTORE_PATH));
+            final String httpsKsPassword = (String) (conf.get(Config.DRPC_HTTPS_KEYSTORE_PASSWORD));
+            final String httpsKsType = (String) (conf.get(Config.DRPC_HTTPS_KEYSTORE_TYPE));
+            final String httpsKeyPassword = (String) (conf.get(Config.DRPC_HTTPS_KEY_PASSWORD));
+            final String httpsTsPath = (String) (conf.get(Config.DRPC_HTTPS_TRUSTSTORE_PATH));
+            final String httpsTsPassword = (String) (conf.get(Config.DRPC_HTTPS_TRUSTSTORE_PASSWORD));
+            final String httpsTsType = (String) (conf.get(Config.DRPC_HTTPS_TRUSTSTORE_TYPE));
+            final Boolean httpsWantClientAuth = (Boolean) (conf.get(Config.DRPC_HTTPS_WANT_CLIENT_AUTH));
+            final Boolean httpsNeedClientAuth = (Boolean) (conf.get(Config.DRPC_HTTPS_NEED_CLIENT_AUTH));
+
+            //TODO a better way to do this would be great.
+            DRPCApplication.setup(drpc);
+            ret = UIHelpers.jettyCreateServer(drpcHttpPort, null, httpsPort);
+            
+            UIHelpers.configSsl(ret, httpsPort, httpsKsPath, httpsKsPassword, httpsKsType, httpsKeyPassword, httpsTsPath, httpsTsPassword, httpsTsType,
+                    httpsNeedClientAuth, httpsWantClientAuth);
+            
+            ServletContextHandler context = new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
+            context.setContextPath("/");
+            ret.setHandler(context);
+
+            ServletHolder jerseyServlet = context.addServlet(ServletContainer.class, "/*");
+            jerseyServlet.setInitOrder(1);
+            jerseyServlet.setInitParameter("javax.ws.rs.Application", DRPCApplication.class.getName());
+            
+            UIHelpers.configFilters(context, filterConfigurations);
+            UIHelpers.addRequestContextFilter(context, Config.DRPC_HTTP_CREDS_PLUGIN, conf);
+        }
+        return ret;
+    }
+    
+    private final DRPC _drpc;
+    private final ThriftServer _handlerServer;
+    private final ThriftServer _invokeServer;
+    private final Server _httpServer;
+    private boolean _closed = false;
+
+    public DRPCServer(Map<String, Object> conf) {
+        _drpc = new DRPC(conf);
+        DRPCThrift thrift = new DRPCThrift(_drpc);
+        _handlerServer = mkHandlerServer(thrift, Utils.getInt(conf.get(Config.DRPC_PORT), null), conf);
+        _invokeServer = mkInvokeServer(thrift, Utils.getInt(conf.get(Config.DRPC_INVOCATIONS_PORT), 3773), conf);
+        _httpServer = mkHttpServer(conf, _drpc);
+    }
+
+    @VisibleForTesting
+    void start() throws Exception {
+        LOG.info("Starting Distributed RPC servers...");
+        new Thread(() -> _invokeServer.serve()).start();
+
+        if (_httpServer != null) {
+            _httpServer.start();
+        }
+        
+        if (_handlerServer != null) {
+            _handlerServer.serve();
+        } else {
+            _httpServer.join();
+        }
+    }
+
+    @Override
+    public synchronized void close() {
+        if (!_closed) {
+            //This is kind of useless...
+            meterShutdownCalls.mark();
+
+            if (_handlerServer != null) {
+                _handlerServer.stop();
+            }
+
+            if (_invokeServer != null) {
+                _invokeServer.stop();
+            }
+
+            //TODO this is causing issues...
+            //if (_httpServer != null) {
+            //    _httpServer.destroy();
+            //}
+            
+            _drpc.close();
+            _closed  = true;
+        }
+    }
+    
+    public static void main(String [] args) throws Exception {
+        Utils.setupDefaultUncaughtExceptionHandler();
+        Map<String, Object> conf = ConfigUtils.readStormConfig();
+        try (DRPCServer server = new DRPCServer(conf)) {
+            Utils.addShutdownHookWithForceKillIn1Sec(() -> server.close());
+            StormMetricsRegistry.startMetricsReporters(conf);
+            server.start();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/daemon/drpc/DRPCThrift.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/drpc/DRPCThrift.java b/storm-core/src/jvm/org/apache/storm/daemon/drpc/DRPCThrift.java
new file mode 100644
index 0000000..78e008d
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/drpc/DRPCThrift.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.drpc;
+
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.DRPCExecutionException;
+import org.apache.storm.generated.DRPCRequest;
+import org.apache.storm.generated.DistributedRPC;
+import org.apache.storm.generated.DistributedRPCInvocations;
+
+public class DRPCThrift implements DistributedRPC.Iface, DistributedRPCInvocations.Iface {
+    private final DRPC _drpc;
+
+    public DRPCThrift(DRPC drpc) {
+        _drpc = drpc;
+    }
+
+    @Override
+    public void result(String id, String result) throws AuthorizationException {
+        _drpc.returnResult(id, result);
+    }
+
+    @Override
+    public DRPCRequest fetchRequest(String functionName) throws AuthorizationException {
+        return _drpc.fetchRequest(functionName);
+    }
+
+    @Override
+    public void failRequest(String id) throws AuthorizationException {
+        _drpc.failRequest(id, null);
+    }
+
+    @Override
+    public void failRequestV2(String id, DRPCExecutionException e) throws AuthorizationException {
+        _drpc.failRequest(id, e);
+    }
+
+    @Override
+    public String execute(String functionName, String funcArgs)
+            throws DRPCExecutionException, AuthorizationException {
+        return _drpc.executeBlocking(functionName, funcArgs);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/daemon/drpc/OutstandingRequest.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/drpc/OutstandingRequest.java b/storm-core/src/jvm/org/apache/storm/daemon/drpc/OutstandingRequest.java
new file mode 100644
index 0000000..6e86936
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/drpc/OutstandingRequest.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.drpc;
+
+import org.apache.storm.generated.DRPCExecutionException;
+import org.apache.storm.generated.DRPCRequest;
+import org.apache.storm.utils.Time;
+
+public abstract class OutstandingRequest {
+    private final long _start;
+    private final String _function;
+    private final DRPCRequest _req;
+    private volatile boolean _fetched = false;
+
+    public OutstandingRequest(String function, DRPCRequest req) {
+        _start = Time.currentTimeMillis();
+        _function = function;
+        _req = req;
+    }
+
+    public DRPCRequest getRequest() {
+        return _req;
+    }
+
+    public void fetched() {
+        _fetched = true;
+    }
+
+    public boolean wasFetched() {
+        return _fetched; 
+    }
+
+    public String getFunction() {
+        return _function;
+    }
+
+    public boolean isTimedOut(long timeoutMs) {
+        return (_start + timeoutMs) <= Time.currentTimeMillis();
+    }
+
+    public abstract void returnResult(String result);
+    public abstract void fail(DRPCExecutionException e);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/daemon/drpc/RequestFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/drpc/RequestFactory.java b/storm-core/src/jvm/org/apache/storm/daemon/drpc/RequestFactory.java
new file mode 100644
index 0000000..139e243
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/drpc/RequestFactory.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.drpc;
+
+import org.apache.storm.generated.DRPCRequest;
+
+public interface RequestFactory<T extends OutstandingRequest> {
+    public T mkRequest(String function, DRPCRequest req); 
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/AuthorizationExceptionMapper.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/AuthorizationExceptionMapper.java b/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/AuthorizationExceptionMapper.java
new file mode 100644
index 0000000..75c3100
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/AuthorizationExceptionMapper.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.drpc.webapp;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.ExceptionMapper;
+import javax.ws.rs.ext.Provider;
+
+import org.apache.storm.generated.AuthorizationException;
+import org.json.simple.JSONValue;
+
+@Provider
+public class AuthorizationExceptionMapper implements ExceptionMapper<AuthorizationException> {
+    @Override
+    public Response toResponse(AuthorizationException ex) {
+        Map<String, String> body = new HashMap<>();
+        body.put("error", "Not Authorized");
+        body.put("errorMessage", ex.get_msg());
+        return Response.status(403).entity(JSONValue.toJSONString(body)).type("application/json").build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCApplication.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCApplication.java b/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCApplication.java
new file mode 100644
index 0000000..4a0ce8c
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCApplication.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.drpc.webapp;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.ws.rs.ApplicationPath;
+import javax.ws.rs.core.Application;
+
+import org.apache.storm.daemon.drpc.DRPC;
+
+@ApplicationPath("")
+public class DRPCApplication extends Application {
+    private static DRPC _drpc;
+    private final Set<Object> singletons = new HashSet<Object>();
+    
+    public DRPCApplication() {
+        singletons.add(new DRPCResource(_drpc));
+        singletons.add(new DRPCExceptionMapper());
+        singletons.add(new AuthorizationExceptionMapper());
+    }
+    
+    @Override
+    public Set<Object> getSingletons() {
+        return singletons;
+    }
+
+    public static void setup(DRPC drpc) {
+        _drpc = drpc;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCExceptionMapper.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCExceptionMapper.java b/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCExceptionMapper.java
new file mode 100644
index 0000000..60cfc93
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCExceptionMapper.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.drpc.webapp;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.ResponseBuilder;
+import javax.ws.rs.ext.ExceptionMapper;
+import javax.ws.rs.ext.Provider;
+
+import org.apache.storm.generated.DRPCExecutionException;
+import org.json.simple.JSONValue;
+
+@Provider
+public class DRPCExceptionMapper implements ExceptionMapper<DRPCExecutionException> {
+
+    @Override
+    public Response toResponse(DRPCExecutionException ex) {
+        ResponseBuilder builder = Response.status(500);
+        switch (ex.get_type()) {
+            case FAILED_REQUEST:
+                builder.status(400);
+                break;
+            case SERVER_SHUTDOWN:
+                builder.status(503); //Not available
+                break;
+            case SERVER_TIMEOUT:
+                builder.status(504); //proxy timeout
+                break;
+            case INTERNAL_ERROR:
+                //fall throw on purpose
+            default:
+                //Empty (Still 500)
+                break;
+            
+        }
+        Map<String, String> body = new HashMap<>();
+        //TODO I would love to standardize this...
+        body.put("error", ex.is_set_type() ? ex.get_type().toString() : "Internal Error");
+        body.put("errorMessage", ex.get_msg());
+        return builder.entity(JSONValue.toJSONString(body)).type("application/json").build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCResource.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCResource.java b/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCResource.java
new file mode 100644
index 0000000..d6490af
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCResource.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.drpc.webapp;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.core.Context;
+
+import org.apache.storm.daemon.drpc.DRPC;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.thrift.TException;
+
+import com.codahale.metrics.Meter;
+
+@Path("/drpc/")
+public class DRPCResource {
+    private static final Meter meterHttpRequests = StormMetricsRegistry.registerMeter("drpc:num-execute-http-requests");
+    private final DRPC _drpc;
+    public DRPCResource(DRPC drpc) {
+        _drpc = drpc;
+    }
+    
+    //TODO put in some better exception mapping...
+    //TODO move populateContext to a filter...
+    @POST
+    @Path("/{func}") 
+    public String post(@PathParam("func") String func, String args, @Context HttpServletRequest request) throws TException {
+        meterHttpRequests.mark();
+        return _drpc.executeBlocking(func, args);
+    }
+    
+    @GET
+    @Path("/{func}/{args}") 
+    public String get(@PathParam("func") String func, @PathParam("args") String args, @Context HttpServletRequest request) throws TException {
+        meterHttpRequests.mark();
+        return _drpc.executeBlocking(func, args);
+    }
+    
+    @GET
+    @Path("/{func}") 
+    public String get(@PathParam("func") String func, @Context HttpServletRequest request) throws TException {
+        meterHttpRequests.mark();
+        return _drpc.executeBlocking(func, "");
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/ReqContextFilter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/ReqContextFilter.java b/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/ReqContextFilter.java
new file mode 100644
index 0000000..521901a
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/ReqContextFilter.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.drpc.webapp;
+import java.io.IOException;
+
+import javax.servlet.Filter;
+import javax.servlet.FilterChain;
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.storm.security.auth.IHttpCredentialsPlugin;
+import org.apache.storm.security.auth.ReqContext;
+
+public class ReqContextFilter implements Filter {
+    private final IHttpCredentialsPlugin _httpCredsHandler;
+
+    public ReqContextFilter(IHttpCredentialsPlugin httpCredsHandler) {
+        _httpCredsHandler = httpCredsHandler;
+    }
+    
+    /**
+     * Populate the Storm RequestContext from an servlet request. This should be called in each handler
+     * @param request the request to populate
+     */
+    public void populateContext(HttpServletRequest request) {
+        if (_httpCredsHandler != null) {
+            _httpCredsHandler.populateContext(ReqContext.context(), request);
+        }
+    }
+    
+    public void init(FilterConfig config) throws ServletException {
+        //NOOP
+        //We could add in configs through the web.xml if we wanted something stand alone here...
+    }
+
+    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
+        handle((HttpServletRequest)request, (HttpServletResponse)response, chain);
+    }
+
+    public void handle(HttpServletRequest request, HttpServletResponse response, FilterChain chain) throws IOException, ServletException{
+        if (request != null) {
+            populateContext(request);
+        }
+        chain.doFilter(request, response);
+    }
+
+    public void destroy() {
+        //NOOP
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/drpc/DRPCInvocationsClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/drpc/DRPCInvocationsClient.java b/storm-core/src/jvm/org/apache/storm/drpc/DRPCInvocationsClient.java
index 9d01e81..956999e 100644
--- a/storm-core/src/jvm/org/apache/storm/drpc/DRPCInvocationsClient.java
+++ b/storm-core/src/jvm/org/apache/storm/drpc/DRPCInvocationsClient.java
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.storm.generated.DRPCRequest;
 import org.apache.storm.generated.DistributedRPCInvocations;
 import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.DRPCExecutionException;
 import org.apache.storm.security.auth.ThriftClient;
 import org.apache.storm.security.auth.ThriftConnectionType;
 import org.apache.thrift.transport.TTransportException;
@@ -110,4 +111,20 @@ public class DRPCInvocationsClient extends ThriftClient implements DistributedRP
     public DistributedRPCInvocations.Client getClient() {
         return client.get();
     }
+
+    @Override
+    public void failRequestV2(String id, DRPCExecutionException ex) throws AuthorizationException, TException {
+        DistributedRPCInvocations.Client c = client.get();
+        try {
+            if (c == null) {
+                throw new TException("Client is not connected...");
+            }
+            c.failRequestV2(id, ex);
+        } catch(AuthorizationException aze) {
+            throw aze;
+        } catch(TException e) {
+            client.compareAndSet(c, null);
+            throw e;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/generated/DRPCExceptionType.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/generated/DRPCExceptionType.java b/storm-core/src/jvm/org/apache/storm/generated/DRPCExceptionType.java
new file mode 100644
index 0000000..2ccf1b1
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/generated/DRPCExceptionType.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.3)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package org.apache.storm.generated;
+
+
+import java.util.Map;
+import java.util.HashMap;
+import org.apache.thrift.TEnum;
+
+public enum DRPCExceptionType implements org.apache.thrift.TEnum {
+  INTERNAL_ERROR(0),
+  SERVER_SHUTDOWN(1),
+  SERVER_TIMEOUT(2),
+  FAILED_REQUEST(3);
+
+  private final int value;
+
+  private DRPCExceptionType(int value) {
+    this.value = value;
+  }
+
+  /**
+   * Get the integer value of this enum value, as defined in the Thrift IDL.
+   */
+  public int getValue() {
+    return value;
+  }
+
+  /**
+   * Find a the enum type by its integer value, as defined in the Thrift IDL.
+   * @return null if the value is not found.
+   */
+  public static DRPCExceptionType findByValue(int value) { 
+    switch (value) {
+      case 0:
+        return INTERNAL_ERROR;
+      case 1:
+        return SERVER_SHUTDOWN;
+      case 2:
+        return SERVER_TIMEOUT;
+      case 3:
+        return FAILED_REQUEST;
+      default:
+        return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/generated/DRPCExecutionException.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/generated/DRPCExecutionException.java b/storm-core/src/jvm/org/apache/storm/generated/DRPCExecutionException.java
index c7ba94f..fd701ff 100644
--- a/storm-core/src/jvm/org/apache/storm/generated/DRPCExecutionException.java
+++ b/storm-core/src/jvm/org/apache/storm/generated/DRPCExecutionException.java
@@ -56,6 +56,7 @@ public class DRPCExecutionException extends TException implements org.apache.thr
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("DRPCExecutionException");
 
   private static final org.apache.thrift.protocol.TField MSG_FIELD_DESC = new org.apache.thrift.protocol.TField("msg", org.apache.thrift.protocol.TType.STRING, (short)1);
+  private static final org.apache.thrift.protocol.TField TYPE_FIELD_DESC = new org.apache.thrift.protocol.TField("type", org.apache.thrift.protocol.TType.I32, (short)2);
 
   private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
   static {
@@ -64,10 +65,16 @@ public class DRPCExecutionException extends TException implements org.apache.thr
   }
 
   private String msg; // required
+  private DRPCExceptionType type; // optional
 
   /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
   public enum _Fields implements org.apache.thrift.TFieldIdEnum {
-    MSG((short)1, "msg");
+    MSG((short)1, "msg"),
+    /**
+     * 
+     * @see DRPCExceptionType
+     */
+    TYPE((short)2, "type");
 
     private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
 
@@ -84,6 +91,8 @@ public class DRPCExecutionException extends TException implements org.apache.thr
       switch(fieldId) {
         case 1: // MSG
           return MSG;
+        case 2: // TYPE
+          return TYPE;
         default:
           return null;
       }
@@ -124,11 +133,14 @@ public class DRPCExecutionException extends TException implements org.apache.thr
   }
 
   // isset id assignments
+  private static final _Fields optionals[] = {_Fields.TYPE};
   public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
   static {
     Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
     tmpMap.put(_Fields.MSG, new org.apache.thrift.meta_data.FieldMetaData("msg", org.apache.thrift.TFieldRequirementType.REQUIRED, 
         new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    tmpMap.put(_Fields.TYPE, new org.apache.thrift.meta_data.FieldMetaData("type", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, DRPCExceptionType.class)));
     metaDataMap = Collections.unmodifiableMap(tmpMap);
     org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(DRPCExecutionException.class, metaDataMap);
   }
@@ -150,6 +162,9 @@ public class DRPCExecutionException extends TException implements org.apache.thr
     if (other.is_set_msg()) {
       this.msg = other.msg;
     }
+    if (other.is_set_type()) {
+      this.type = other.type;
+    }
   }
 
   public DRPCExecutionException deepCopy() {
@@ -159,6 +174,7 @@ public class DRPCExecutionException extends TException implements org.apache.thr
   @Override
   public void clear() {
     this.msg = null;
+    this.type = null;
   }
 
   public String get_msg() {
@@ -184,6 +200,37 @@ public class DRPCExecutionException extends TException implements org.apache.thr
     }
   }
 
+  /**
+   * 
+   * @see DRPCExceptionType
+   */
+  public DRPCExceptionType get_type() {
+    return this.type;
+  }
+
+  /**
+   * 
+   * @see DRPCExceptionType
+   */
+  public void set_type(DRPCExceptionType type) {
+    this.type = type;
+  }
+
+  public void unset_type() {
+    this.type = null;
+  }
+
+  /** Returns true if field type is set (has been assigned a value) and false otherwise */
+  public boolean is_set_type() {
+    return this.type != null;
+  }
+
+  public void set_type_isSet(boolean value) {
+    if (!value) {
+      this.type = null;
+    }
+  }
+
   public void setFieldValue(_Fields field, Object value) {
     switch (field) {
     case MSG:
@@ -194,6 +241,14 @@ public class DRPCExecutionException extends TException implements org.apache.thr
       }
       break;
 
+    case TYPE:
+      if (value == null) {
+        unset_type();
+      } else {
+        set_type((DRPCExceptionType)value);
+      }
+      break;
+
     }
   }
 
@@ -202,6 +257,9 @@ public class DRPCExecutionException extends TException implements org.apache.thr
     case MSG:
       return get_msg();
 
+    case TYPE:
+      return get_type();
+
     }
     throw new IllegalStateException();
   }
@@ -215,6 +273,8 @@ public class DRPCExecutionException extends TException implements org.apache.thr
     switch (field) {
     case MSG:
       return is_set_msg();
+    case TYPE:
+      return is_set_type();
     }
     throw new IllegalStateException();
   }
@@ -241,6 +301,15 @@ public class DRPCExecutionException extends TException implements org.apache.thr
         return false;
     }
 
+    boolean this_present_type = true && this.is_set_type();
+    boolean that_present_type = true && that.is_set_type();
+    if (this_present_type || that_present_type) {
+      if (!(this_present_type && that_present_type))
+        return false;
+      if (!this.type.equals(that.type))
+        return false;
+    }
+
     return true;
   }
 
@@ -253,6 +322,11 @@ public class DRPCExecutionException extends TException implements org.apache.thr
     if (present_msg)
       list.add(msg);
 
+    boolean present_type = true && (is_set_type());
+    list.add(present_type);
+    if (present_type)
+      list.add(type.getValue());
+
     return list.hashCode();
   }
 
@@ -274,6 +348,16 @@ public class DRPCExecutionException extends TException implements org.apache.thr
         return lastComparison;
       }
     }
+    lastComparison = Boolean.valueOf(is_set_type()).compareTo(other.is_set_type());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_type()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.type, other.type);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     return 0;
   }
 
@@ -301,6 +385,16 @@ public class DRPCExecutionException extends TException implements org.apache.thr
       sb.append(this.msg);
     }
     first = false;
+    if (is_set_type()) {
+      if (!first) sb.append(", ");
+      sb.append("type:");
+      if (this.type == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.type);
+      }
+      first = false;
+    }
     sb.append(")");
     return sb.toString();
   }
@@ -356,6 +450,14 @@ public class DRPCExecutionException extends TException implements org.apache.thr
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
             }
             break;
+          case 2: // TYPE
+            if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
+              struct.type = org.apache.storm.generated.DRPCExceptionType.findByValue(iprot.readI32());
+              struct.set_type_isSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
           default:
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
         }
@@ -374,6 +476,13 @@ public class DRPCExecutionException extends TException implements org.apache.thr
         oprot.writeString(struct.msg);
         oprot.writeFieldEnd();
       }
+      if (struct.type != null) {
+        if (struct.is_set_type()) {
+          oprot.writeFieldBegin(TYPE_FIELD_DESC);
+          oprot.writeI32(struct.type.getValue());
+          oprot.writeFieldEnd();
+        }
+      }
       oprot.writeFieldStop();
       oprot.writeStructEnd();
     }
@@ -392,6 +501,14 @@ public class DRPCExecutionException extends TException implements org.apache.thr
     public void write(org.apache.thrift.protocol.TProtocol prot, DRPCExecutionException struct) throws org.apache.thrift.TException {
       TTupleProtocol oprot = (TTupleProtocol) prot;
       oprot.writeString(struct.msg);
+      BitSet optionals = new BitSet();
+      if (struct.is_set_type()) {
+        optionals.set(0);
+      }
+      oprot.writeBitSet(optionals, 1);
+      if (struct.is_set_type()) {
+        oprot.writeI32(struct.type.getValue());
+      }
     }
 
     @Override
@@ -399,6 +516,11 @@ public class DRPCExecutionException extends TException implements org.apache.thr
       TTupleProtocol iprot = (TTupleProtocol) prot;
       struct.msg = iprot.readString();
       struct.set_msg_isSet(true);
+      BitSet incoming = iprot.readBitSet(1);
+      if (incoming.get(0)) {
+        struct.type = org.apache.storm.generated.DRPCExceptionType.findByValue(iprot.readI32());
+        struct.set_type_isSet(true);
+      }
     }
   }