You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/03/09 16:53:57 UTC

[04/12] storm git commit: update some tests about drpc

update some tests about drpc


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/802d28e6
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/802d28e6
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/802d28e6

Branch: refs/heads/master
Commit: 802d28e607ce953664fcea7356eac98fb354683f
Parents: 9178b1c
Author: xiaojian.fxj <xi...@alibaba-inc.com>
Authored: Fri Feb 26 20:32:29 2016 +0800
Committer: xiaojian.fxj <xi...@alibaba-inc.com>
Committed: Fri Feb 26 22:02:56 2016 +0800

----------------------------------------------------------------------
 .../org/apache/storm/starter/ManualDRPC.java    |  53 ++-
 .../src/clj/org/apache/storm/LocalDRPC.clj      |  56 ---
 .../src/clj/org/apache/storm/daemon/drpc.clj    | 214 +---------
 .../clj/org/apache/storm/trident/testing.clj    |   2 -
 .../src/jvm/org/apache/storm/LocalDRPC.java     |  80 ++++
 .../jvm/org/apache/storm/LocalDRPCProcess.java  |  87 -----
 .../org/apache/storm/daemon/DrpcProcess.java    | 337 ----------------
 .../jvm/org/apache/storm/daemon/DrpcServer.java | 390 +++++++++++++++++++
 .../test/clj/org/apache/storm/drpc_test.clj     |  29 +-
 .../apache/storm/security/auth/auth_test.clj    |   2 +
 .../storm/security/auth/drpc_auth_test.clj      |   5 +-
 11 files changed, 526 insertions(+), 729 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/802d28e6/examples/storm-starter/src/jvm/org/apache/storm/starter/ManualDRPC.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/ManualDRPC.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/ManualDRPC.java
index 4c9daec..34136a1 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/ManualDRPC.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/ManualDRPC.java
@@ -30,39 +30,36 @@ import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
 
-
 public class ManualDRPC {
-  public static class ExclamationBolt extends BaseBasicBolt {
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields("result", "return-info"));
-    }
+    public static class ExclamationBolt extends BaseBasicBolt {
 
-    @Override
-    public void execute(Tuple tuple, BasicOutputCollector collector) {
-      String arg = tuple.getString(0);
-      Object retInfo = tuple.getValue(1);
-      collector.emit(new Values(arg + "!!!", retInfo));
-    }
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("result", "return-info"));
+        }
 
-  }
+        @Override
+        public void execute(Tuple tuple, BasicOutputCollector collector) {
+            String arg = tuple.getString(0);
+            Object retInfo = tuple.getValue(1);
+            collector.emit(new Values(arg + "!!!", retInfo));
+        }
 
-  public static void main(String[] args) {
-    TopologyBuilder builder = new TopologyBuilder();
-    LocalDRPC drpc = new LocalDRPC();
-
-    DRPCSpout spout = new DRPCSpout("exclamation", drpc);
-    builder.setSpout("drpc", spout);
-    builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc");
-    builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim");
+    }
 
-    LocalCluster cluster = new LocalCluster();
-    Config conf = new Config();
-    cluster.submitTopology("exclaim", conf, builder.createTopology());
+    public static void main(String[] args) {
+        TopologyBuilder builder = new TopologyBuilder();
+        LocalDRPC drpc = new LocalDRPC();
 
-    System.out.println(drpc.execute("exclamation", "aaa"));
-    System.out.println(drpc.execute("exclamation", "bbb"));
+        DRPCSpout spout = new DRPCSpout("exclamation", drpc);
+        builder.setSpout("drpc", spout);
+        builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc");
+        builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim");
 
-  }
+        LocalCluster cluster = new LocalCluster();
+        Config conf = new Config();
+        cluster.submitTopology("exclaim", conf, builder.createTopology());
+        System.out.println(drpc.execute("exclamation", "aaa"));
+        System.out.println(drpc.execute("exclamation", "bbb"));
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/802d28e6/storm-core/src/clj/org/apache/storm/LocalDRPC.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/LocalDRPC.clj b/storm-core/src/clj/org/apache/storm/LocalDRPC.clj
deleted file mode 100644
index 5f2c22f..0000000
--- a/storm-core/src/clj/org/apache/storm/LocalDRPC.clj
+++ /dev/null
@@ -1,56 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns org.apache.storm.LocalDRPC
-  (:require [org.apache.storm.daemon [drpc :as drpc]])
-  (:use [org.apache.storm config util])
-  (:import [org.apache.storm.utils InprocMessaging ServiceRegistry ConfigUtils])
-  (:gen-class
-   :init init
-   :implements [org.apache.storm.ILocalDRPC]
-   :constructors {[] []}
-   :state state ))
-
-(defn -init []
-  (let [handler (drpc/service-handler (clojurify-structure (ConfigUtils/readStormConfig)))
-        id (ServiceRegistry/registerService handler)
-        ]
-    [[] {:service-id id :handler handler}]
-    ))
-
-(defn -execute [this func funcArgs]
-  (.execute (:handler (. this state)) func funcArgs)
-  )
-
-(defn -result [this id result]
-  (.result (:handler (. this state)) id result)
-  )
-
-(defn -fetchRequest [this func]
-  (.fetchRequest (:handler (. this state)) func)
-  )
-
-(defn -failRequest [this id]
-  (.failRequest (:handler (. this state)) id)
-  )
-
-(defn -getServiceId [this]
-  (:service-id (. this state)))
-
-(defn -shutdown [this]
-  (ServiceRegistry/unregisterService (:service-id (. this state)))
-  (.shutdown (:handler (. this state)))
-  )

http://git-wip-us.apache.org/repos/asf/storm/blob/802d28e6/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/drpc.clj b/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
index 4a835e1..2cb4016 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
@@ -15,23 +15,11 @@
 ;; limitations under the License.
 
 (ns org.apache.storm.daemon.drpc
-  (:import [org.apache.storm.security.auth AuthUtils ThriftServer ThriftConnectionType ReqContext]
-           [org.apache.storm.ui UIHelpers IConfigurator FilterConfiguration])
-  (:import [org.apache.storm.security.auth.authorizer DRPCAuthorizerBase])
+  (:import [org.apache.storm.security.auth AuthUtils ReqContext]
+           [org.apache.storm.daemon DrpcServer])
   (:import [org.apache.storm.utils Utils])
-  (:import [org.apache.storm.generated DistributedRPC DistributedRPC$Iface DistributedRPC$Processor
-            DRPCRequest DRPCExecutionException DistributedRPCInvocations DistributedRPCInvocations$Iface
-            DistributedRPCInvocations$Processor])
-  (:import [java.util.concurrent Semaphore ConcurrentLinkedQueue
-            ThreadPoolExecutor ArrayBlockingQueue TimeUnit])
-  (:import [org.apache.storm.daemon Shutdownable]
-           [org.apache.storm.utils Time])
-  (:import [java.net InetAddress])
-  (:import [org.apache.storm.generated AuthorizationException]
-           [org.apache.storm.utils VersionInfo ConfigUtils]
-           [org.apache.storm.logging ThriftAccessLogger])
+  (:import [org.apache.storm.utils ConfigUtils])
   (:use [org.apache.storm config log util])
-  (:use [org.apache.storm.daemon common])
   (:use [org.apache.storm.ui helpers])
   (:use compojure.core)
   (:use ring.middleware.reload)
@@ -40,141 +28,6 @@
   (:gen-class))
 
 (defmeter drpc:num-execute-http-requests)
-(defmeter drpc:num-execute-calls)
-(defmeter drpc:num-result-calls)
-(defmeter drpc:num-failRequest-calls)
-(defmeter drpc:num-fetchRequest-calls)
-(defmeter drpc:num-shutdown-calls)
-
-(def STORM-VERSION (VersionInfo/getVersion))
-
-(defn timeout-check-secs [] 5)
-
-(defn acquire-queue [queues-atom function]
-  (swap! queues-atom
-    (fn [amap]
-      (if-not (amap function)
-        (assoc amap function (ConcurrentLinkedQueue.))
-        amap)))
-  (@queues-atom function))
-
-(defn check-authorization
-  ([aclHandler mapping operation context]
-    (if (not-nil? context)
-      (ThriftAccessLogger/logAccess (.requestID context) (.remoteAddress context) (.principal context) operation))
-    (if aclHandler
-      (let [context (or context (ReqContext/context))]
-        (if-not (.permit aclHandler context operation mapping)
-          (let [principal (.principal context)
-                user (if principal (.getName principal) "unknown")]
-              (throw (AuthorizationException.
-                       (str "DRPC request '" operation "' for '"
-                            user "' user is not authorized"))))))))
-  ([aclHandler mapping operation]
-    (check-authorization aclHandler mapping operation (ReqContext/context))))
-
-;; TODO: change this to use TimeCacheMap
-(defn service-handler [conf]
-  (let [drpc-acl-handler (mk-authorization-handler (conf DRPC-AUTHORIZER) conf)
-        ctr (atom 0)
-        id->sem (atom {})
-        id->result (atom {})
-        id->start (atom {})
-        id->function (atom {})
-        id->request (atom {})
-        request-queues (atom {})
-        cleanup (fn [id] (swap! id->sem dissoc id)
-                  (swap! id->result dissoc id)
-                  (swap! id->function dissoc id)
-                  (swap! id->request dissoc id)
-                  (swap! id->start dissoc id))
-        my-ip (.getHostAddress (InetAddress/getLocalHost))
-        clear-thread (Utils/asyncLoop
-                       (fn []
-                         (doseq [[id start] @id->start]
-                           (when (> (Time/deltaSecs start) (conf DRPC-REQUEST-TIMEOUT-SECS))
-                             (when-let [sem (@id->sem id)]
-                               (.remove (acquire-queue request-queues (@id->function id)) (@id->request id))
-                               (log-warn "Timeout DRPC request id: " id " start at " start)
-                               (.release sem))
-                             (cleanup id)))
-                         (timeout-check-secs)))]
-    (reify DistributedRPC$Iface
-      (^String execute
-        [this ^String function ^String args]
-        (mark! drpc:num-execute-calls)
-        (log-debug "Received DRPC request for " function " (" args ") at " (System/currentTimeMillis))
-        (check-authorization drpc-acl-handler
-                             {DRPCAuthorizerBase/FUNCTION_NAME function}
-                             "execute")
-        (let [id (str (swap! ctr (fn [v] (mod (inc v) 1000000000))))
-              ^Semaphore sem (Semaphore. 0)
-              req (DRPCRequest. args id)
-              ^ConcurrentLinkedQueue queue (acquire-queue request-queues function)]
-          (swap! id->start assoc id (Time/currentTimeSecs))
-          (swap! id->sem assoc id sem)
-          (swap! id->function assoc id function)
-          (swap! id->request assoc id req)
-          (.add queue req)
-          (log-debug "Waiting for DRPC result for " function " " args " at " (System/currentTimeMillis))
-          (.acquire sem)
-          (log-debug "Acquired DRPC result for " function " " args " at " (System/currentTimeMillis))
-          (let [result (@id->result id)]
-            (cleanup id)
-            (log-debug "Returning DRPC result for " function " " args " at " (System/currentTimeMillis))
-            (if (instance? DRPCExecutionException result)
-              (throw result)
-              (if (nil? result)
-                (throw (DRPCExecutionException. "Request timed out"))
-                result)))))
-
-      DistributedRPCInvocations$Iface
-
-      (^void result
-        [this ^String id ^String result]
-        (mark! drpc:num-result-calls)
-        (when-let [func (@id->function id)]
-          (check-authorization drpc-acl-handler
-                               {DRPCAuthorizerBase/FUNCTION_NAME func}
-                               "result")
-          (let [^Semaphore sem (@id->sem id)]
-            (log-debug "Received result " result " for " id " at " (System/currentTimeMillis))
-            (when sem
-              (swap! id->result assoc id result)
-              (.release sem)
-              ))))
-
-      (^void failRequest
-        [this ^String id]
-        (mark! drpc:num-failRequest-calls)
-        (when-let [func (@id->function id)]
-          (check-authorization drpc-acl-handler
-                               {DRPCAuthorizerBase/FUNCTION_NAME func}
-                               "failRequest")
-          (let [^Semaphore sem (@id->sem id)]
-            (when sem
-              (swap! id->result assoc id (DRPCExecutionException. "Request failed"))
-              (.release sem)))))
-
-      (^DRPCRequest fetchRequest
-        [this ^String func]
-        (mark! drpc:num-fetchRequest-calls)
-        (check-authorization drpc-acl-handler
-                             {DRPCAuthorizerBase/FUNCTION_NAME func}
-                             "fetchRequest")
-        (let [^ConcurrentLinkedQueue queue (acquire-queue request-queues func)
-              ret (.poll queue)]
-          (if ret
-            (do (log-debug "Fetched request for " func " at " (System/currentTimeMillis))
-              ret)
-            (DRPCRequest. "" ""))))
-
-      Shutdownable
-
-      (shutdown
-        [this]
-        (mark! drpc:num-shutdown-calls)
-        (.interrupt clear-thread)))))
 
 (defn handle-request [handler]
   (fn [request]
@@ -213,65 +66,16 @@
 
 (defn launch-server!
   ([]
-    (log-message "Starting drpc server for storm version '" STORM-VERSION "'")
     (let [conf (clojurify-structure (ConfigUtils/readStormConfig))
-          worker-threads (int (conf DRPC-WORKER-THREADS))
-          queue-size (int (conf DRPC-QUEUE-SIZE))
           drpc-http-port (int (conf DRPC-HTTP-PORT))
-          drpc-port (int (conf DRPC-PORT))
-          drpc-service-handler (service-handler conf)
-          ;; requests and returns need to be on separate thread pools, since calls to
-          ;; "execute" don't unblock until other thrift methods are called. So if
-          ;; 64 threads are calling execute, the server won't accept the result
-          ;; invocations that will unblock those threads
-          handler-server (when (> drpc-port 0)
-                           (ThriftServer. conf
-                             (DistributedRPC$Processor. drpc-service-handler)
-                             ThriftConnectionType/DRPC))
-          invoke-server (ThriftServer. conf
-                          (DistributedRPCInvocations$Processor. drpc-service-handler)
-                          ThriftConnectionType/DRPC_INVOCATIONS)
+          drpc-server (DrpcServer.)
           http-creds-handler (AuthUtils/GetDrpcHttpCredentialsPlugin conf)]
-      (Utils/addShutdownHookWithForceKillIn1Sec (fn []
-                                            (if handler-server (.stop handler-server))
-                                            (.stop invoke-server)))
-      (log-message "Starting Distributed RPC servers...")
-      (future (.serve invoke-server))
       (when (> drpc-http-port 0)
-        (let [app (-> (webapp drpc-service-handler http-creds-handler)
-                    requests-middleware)
-              filter-class (conf DRPC-HTTP-FILTER)
-              filter-params (conf DRPC-HTTP-FILTER-PARAMS)
-              filters-confs [(FilterConfiguration. filter-class filter-params)]
-              https-port (int (or (conf DRPC-HTTPS-PORT) 0))
-              https-ks-path (conf DRPC-HTTPS-KEYSTORE-PATH)
-              https-ks-password (conf DRPC-HTTPS-KEYSTORE-PASSWORD)
-              https-ks-type (conf DRPC-HTTPS-KEYSTORE-TYPE)
-              https-key-password (conf DRPC-HTTPS-KEY-PASSWORD)
-              https-ts-path (conf DRPC-HTTPS-TRUSTSTORE-PATH)
-              https-ts-password (conf DRPC-HTTPS-TRUSTSTORE-PASSWORD)
-              https-ts-type (conf DRPC-HTTPS-TRUSTSTORE-TYPE)
-              https-want-client-auth (conf DRPC-HTTPS-WANT-CLIENT-AUTH)
-              https-need-client-auth (conf DRPC-HTTPS-NEED-CLIENT-AUTH)]
-
-          (UIHelpers/stormRunJetty
-            (int drpc-http-port)
-            (reify IConfigurator (execute [this server]
-                                   (UIHelpers/configSsl server
-                                     https-port
-                                     https-ks-path
-                                     https-ks-password
-                                     https-ks-type
-                                     https-key-password
-                                     https-ts-path
-                                     https-ts-password
-                                     https-ts-type
-                                     https-need-client-auth
-                                     https-want-client-auth)
-                                   (UIHelpers/configFilter server (ring.util.servlet/servlet app) filters-confs))))))
-      (start-metrics-reporters conf)
-      (when handler-server
-        (.serve handler-server)))))
+        (let [app (-> (webapp drpc-server http-creds-handler)
+                    requests-middleware)]
+          (.setHttpServlet drpc-server (ring.util.servlet/servlet app))))
+      (.launchServer drpc-server)))
+)
 
 (defn -main []
   (Utils/setupDefaultUncaughtExceptionHandler)

http://git-wip-us.apache.org/repos/asf/storm/blob/802d28e6/storm-core/src/clj/org/apache/storm/trident/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/trident/testing.clj b/storm-core/src/clj/org/apache/storm/trident/testing.clj
index 0ec5613..3bfcb9c 100644
--- a/storm-core/src/clj/org/apache/storm/trident/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/trident/testing.clj
@@ -14,9 +14,7 @@
 ;; See the License for the specific language governing permissions and
 ;; limitations under the License.
 (ns org.apache.storm.trident.testing
-  (:require [org.apache.storm.LocalDRPC :as LocalDRPC])
   (:import [org.apache.storm.trident.testing FeederBatchSpout FeederCommitterBatchSpout MemoryMapState MemoryMapState$Factory TuplifyArgs])
-  (:require [org.apache.storm [LocalDRPC]])
   (:import [org.apache.storm LocalDRPC])
   (:import [org.apache.storm.tuple Fields])
   (:import [org.apache.storm.generated KillOptions]

http://git-wip-us.apache.org/repos/asf/storm/blob/802d28e6/storm-core/src/jvm/org/apache/storm/LocalDRPC.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/LocalDRPC.java b/storm-core/src/jvm/org/apache/storm/LocalDRPC.java
new file mode 100644
index 0000000..0cc8e43
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/LocalDRPC.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm;
+
+import org.apache.log4j.Logger;
+import org.apache.storm.daemon.DrpcServer;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.DRPCExecutionException;
+import org.apache.storm.generated.DRPCRequest;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.ServiceRegistry;
+import org.apache.storm.utils.Utils;
+import org.apache.thrift.TException;
+
+import java.util.Map;
+
+public class LocalDRPC implements ILocalDRPC {
+    private static final Logger LOG = Logger.getLogger(LocalDRPC.class);
+
+    private DrpcServer handler = new DrpcServer();
+    private Thread thread;
+    private final String serviceId;
+
+    public LocalDRPC() {
+        try {
+            Map conf = ConfigUtils.readStormConfig();
+            handler.launchServer(true, conf);
+        }catch (Exception e){
+            throw Utils.wrapInRuntime(e);
+        }
+
+        serviceId = ServiceRegistry.registerService(handler);
+    }
+
+    @Override
+    public String getServiceId() {
+        return serviceId;
+    }
+
+    @Override
+    public void result(String id, String result) throws AuthorizationException, TException {
+        handler.result(id, result);
+    }
+
+    @Override
+    public String execute(String functionName, String funcArgs) throws DRPCExecutionException, AuthorizationException, TException {
+        return handler.execute(functionName, funcArgs);
+    }
+
+    @Override
+    public void failRequest(String id) throws AuthorizationException, TException {
+        handler.failRequest(id);
+    }
+
+    @Override
+    public void shutdown() {
+        ServiceRegistry.unregisterService(this.serviceId);
+        this.handler.shutdown();
+    }
+
+    @Override
+    public DRPCRequest fetchRequest(String functionName) throws AuthorizationException, TException {
+        return handler.fetchRequest(functionName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/802d28e6/storm-core/src/jvm/org/apache/storm/LocalDRPCProcess.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/LocalDRPCProcess.java b/storm-core/src/jvm/org/apache/storm/LocalDRPCProcess.java
deleted file mode 100644
index 701fc5b..0000000
--- a/storm-core/src/jvm/org/apache/storm/LocalDRPCProcess.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm;
-
-import org.apache.log4j.Logger;
-import org.apache.storm.daemon.DrpcProcess;
-import org.apache.storm.generated.AuthorizationException;
-import org.apache.storm.generated.DRPCExecutionException;
-import org.apache.storm.generated.DRPCRequest;
-import org.apache.storm.utils.ServiceRegistry;
-import org.apache.thrift.TException;
-
-public class LocalDRPCProcess implements ILocalDRPC {
-    private static final Logger LOG = Logger.getLogger(LocalDRPCProcess.class);
-
-    private DrpcProcess handler = new DrpcProcess();
-    private Thread thread;
-
-    private final String serviceId;
-
-    public LocalDRPCProcess() {
-
-        thread = new Thread(new Runnable() {
-
-            @Override
-            public void run() {
-                LOG.info("Begin to init local Drpc");
-                try {
-                    handler.launchServer();
-                } catch (Exception e) {
-                    LOG.info("Failed to  start local drpc");
-                    System.exit(-1);
-                }
-                LOG.info("Successfully start local drpc");
-            }
-        });
-        thread.start();
-
-        serviceId = ServiceRegistry.registerService(handler);
-    }
-
-    @Override
-    public String getServiceId() {
-        return serviceId;
-    }
-
-    @Override
-    public void result(String id, String result) throws AuthorizationException, TException {
-        handler.result(id, result);
-    }
-
-    @Override
-    public String execute(String functionName, String funcArgs) throws DRPCExecutionException, AuthorizationException, TException {
-        return handler.execute(functionName, funcArgs);
-    }
-
-    @Override
-    public void failRequest(String id) throws AuthorizationException, TException {
-        handler.failRequest(id);
-    }
-
-    @Override
-    public void shutdown() {
-        ServiceRegistry.unregisterService(this.serviceId);
-        this.handler.shutdown();
-    }
-
-    @Override
-    public DRPCRequest fetchRequest(String functionName) throws AuthorizationException, TException {
-        return handler.fetchRequest(functionName);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/802d28e6/storm-core/src/jvm/org/apache/storm/daemon/DrpcProcess.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/DrpcProcess.java b/storm-core/src/jvm/org/apache/storm/daemon/DrpcProcess.java
deleted file mode 100644
index 528ab9e..0000000
--- a/storm-core/src/jvm/org/apache/storm/daemon/DrpcProcess.java
+++ /dev/null
@@ -1,337 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon;
-
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.MetricRegistry;
-import org.apache.commons.lang.StringUtils;
-import org.apache.storm.Config;
-import org.apache.storm.daemon.metrics.MetricsUtils;
-import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
-import org.apache.storm.generated.*;
-import org.apache.storm.logging.ThriftAccessLogger;
-import org.apache.storm.security.auth.*;
-import org.apache.storm.security.auth.authorizer.DRPCAuthorizerBase;
-import org.apache.storm.utils.ConfigUtils;
-import org.apache.storm.utils.Time;
-import org.apache.storm.utils.Utils;
-import org.apache.storm.utils.VersionInfo;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.security.Principal;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class DrpcProcess implements DistributedRPC.Iface, DistributedRPCInvocations.Iface, Shutdownable {
-
-    private static final Logger LOG = LoggerFactory.getLogger(DrpcProcess.class);
-    private final Integer timeoutCheckSecs = 5;
-
-    private Map conf;
-
-    private ThriftServer handlerServer;
-    private ThriftServer invokeServer;
-    private IHttpCredentialsPlugin httpCredsHandler;
-
-    private Thread clearThread;
-
-    private IAuthorizer authorizer;
-
-    private AtomicInteger ctr = new AtomicInteger(0);
-    private ConcurrentHashMap<String, Semaphore> idtoSem = new ConcurrentHashMap<String, Semaphore>();
-    private ConcurrentHashMap<String, Object> idtoResult = new ConcurrentHashMap<String, Object>();
-    private ConcurrentHashMap<String, Integer> idtoStart = new ConcurrentHashMap<String, Integer>();
-    private ConcurrentHashMap<String, String> idtoFunction = new ConcurrentHashMap<String, String>();
-    private ConcurrentHashMap<String, DRPCRequest> idtoRequest = new ConcurrentHashMap<String, DRPCRequest>();
-    private ConcurrentHashMap<String, ConcurrentLinkedQueue<DRPCRequest>> requestQueues = new ConcurrentHashMap<String, ConcurrentLinkedQueue<DRPCRequest>>();
-
-    private Meter meterHttpRequests = new MetricRegistry().meter("drpc:num-execute-http-requests");
-    private Meter meterExecuteCalls = new MetricRegistry().meter("drpc:num-execute-calls");
-    private Meter meterResultCalls = new MetricRegistry().meter("drpc:num-result-calls");
-    private Meter meterFailRequestCalls = new MetricRegistry().meter("drpc:num-failRequest-calls");
-    private Meter meterFetchRequestCalls = new MetricRegistry().meter("drpc:num-fetchRequest-calls");
-    private Meter meterShutdownCalls = new MetricRegistry().meter("drpc:num-shutdown-calls");
-
-    public DrpcProcess() {
-
-    }
-
-    private ThriftServer initHandlerServer(Map conf, final DrpcProcess service) throws Exception {
-        int port = (int) conf.get(Config.DRPC_PORT);
-        if (port > 0) {
-            handlerServer = new ThriftServer(conf, new DistributedRPC.Processor<DistributedRPC.Iface>(service), ThriftConnectionType.DRPC);
-        }
-        return handlerServer;
-    }
-
-    private ThriftServer initInvokeServer(Map conf, final DrpcProcess service) throws Exception {
-        invokeServer = new ThriftServer(conf, new DistributedRPCInvocations.Processor<DistributedRPCInvocations.Iface>(service),
-                ThriftConnectionType.DRPC_INVOCATIONS);
-        return invokeServer;
-    }
-
-    private void initServer() throws Exception {
-
-        authorizer = mkAuthorizationHandler((String) (conf.get(Config.DRPC_AUTHORIZER)), conf);
-        handlerServer = initHandlerServer(conf, this);
-        invokeServer = initInvokeServer(conf, this);
-        httpCredsHandler = AuthUtils.GetDrpcHttpCredentialsPlugin(conf);
-        Utils.addShutdownHookWithForceKillIn1Sec(new Runnable() {
-            @Override
-            public void run() {
-                if (handlerServer != null) {
-                    handlerServer.stop();
-                } else {
-                    invokeServer.stop();
-                }
-            }
-        });
-        LOG.info("Starting Distributed RPC servers...");
-
-        LOG.info("Starting Distributed RPC servers...");
-        new Thread(new Runnable() {
-
-            @Override
-            public void run() {
-                invokeServer.serve();
-            }
-        }).start();
-        // To be replaced by Common.StartMetricsReporters
-        List<PreparableReporter> reporters = MetricsUtils.getPreparableReporters(conf);
-        for (PreparableReporter reporter : reporters) {
-            reporter.prepare(new MetricRegistry(), conf);
-            reporter.start();
-            LOG.info("Started statistics report plugin...");
-        }
-        if (handlerServer != null)
-            handlerServer.serve();
-    }
-
-    private void webApp(DrpcProcess drpc, IHttpCredentialsPlugin httpCredsHandler){
-        meterExecuteCalls.mark();
-
-    }
-    private void initClearThread() {
-        clearThread = Utils.asyncLoop(new Callable() {
-
-            @Override
-            public Object call() throws Exception {
-                for (Map.Entry<String, Integer> e : idtoStart.entrySet()) {
-                    if (Time.deltaSecs(e.getValue()) > (int) conf.get(Config.DRPC_REQUEST_TIMEOUT_SECS)) {
-                        String id = e.getKey();
-                        Semaphore sem = idtoSem.get(id);
-                        if (sem != null) {
-                            String func = idtoFunction.get(id);
-                            acquireQueue(func).remove(idtoRequest.get(id));
-                            LOG.warn("Timeout DRPC request id: {} start at {}", id, e.getValue());
-                            sem.release();
-                        }
-                        cleanup(id);
-                        LOG.info("Clear request " + id);
-                    }
-                }
-                return timeoutCheckSecs;
-            }
-        });
-    }
-
-    public void launchServer() throws Exception {
-
-        LOG.info("Starting drpc server for storm version {}", VersionInfo.getVersion());
-        conf = ConfigUtils.readStormConfig();
-
-        initClearThread();
-
-        initServer();
-    }
-
-    @Override
-    public void shutdown() {
-        meterShutdownCalls.mark();
-        clearThread.interrupt();
-    }
-
-    public void cleanup(String id) {
-        idtoSem.remove(id);
-        idtoResult.remove(id);
-        idtoStart.remove(id);
-        idtoFunction.remove(id);
-        idtoRequest.remove(id);
-    }
-
-    @Override
-    public String execute(String functionName, String funcArgs) throws DRPCExecutionException, AuthorizationException, org.apache.thrift.TException {
-        meterExecuteCalls.mark();
-        LOG.debug("Received DRPC request for {} {} at {} ", functionName, funcArgs, System.currentTimeMillis());
-        Map<String, String> map = new HashMap<>();
-        map.put(DRPCAuthorizerBase.FUNCTION_NAME, functionName);
-        checkAuthorization(authorizer, map, "execute");
-
-        int idinc = this.ctr.incrementAndGet();
-        int maxvalue = 1000000000;
-        int newid = idinc % maxvalue;
-        if (idinc != newid) {
-            this.ctr.compareAndSet(idinc, newid);
-        }
-
-        String strid = String.valueOf(newid);
-        Semaphore sem = new Semaphore(0);
-
-        DRPCRequest req = new DRPCRequest(funcArgs, strid);
-        this.idtoStart.put(strid, Time.currentTimeSecs());
-        this.idtoSem.put(strid, sem);
-        this.idtoFunction.put(strid, functionName);
-        this.idtoRequest.put(strid, req);
-        ConcurrentLinkedQueue<DRPCRequest> queue = acquireQueue(functionName);
-        queue.add(req);
-        LOG.debug("Waiting for DRPC request for {} {} at {}", functionName, funcArgs, System.currentTimeMillis());
-        try {
-            sem.acquire();
-        } catch (InterruptedException e) {
-            LOG.error("acquire fail ", e);
-        }
-        LOG.debug("Acquired for DRPC request for {} {} at {}", functionName, funcArgs, System.currentTimeMillis());
-
-        Object result = this.idtoResult.get(strid);
-
-        LOG.info("Returning for DRPC request for " + functionName + " " + funcArgs + " at " + (System.currentTimeMillis()));
-
-        this.cleanup(strid);
-
-        if (result instanceof DRPCExecutionException) {
-            throw (DRPCExecutionException) result;
-        }
-        if (result == null) {
-            throw new DRPCExecutionException("Request timed out");
-        }
-        return String.valueOf(result);
-    }
-
-    @Override
-    public void result(String id, String result) throws AuthorizationException, TException {
-        meterResultCalls.mark();
-        String func = this.idtoFunction.get(id);
-        if (func != null) {
-            Map<String, String> map = new HashMap<>();
-            map.put(DRPCAuthorizerBase.FUNCTION_NAME, func);
-            checkAuthorization(authorizer, map, "result");
-            Semaphore sem = this.idtoSem.get(id);
-            LOG.debug("Received result {} for {} at {}", result, id, System.currentTimeMillis());
-            if (sem != null) {
-                this.idtoResult.put(id, result);
-                sem.release();
-            }
-        }
-    }
-
-    @Override
-    public DRPCRequest fetchRequest(String functionName) throws AuthorizationException, TException {
-        meterFetchRequestCalls.mark();
-        Map<String, String> map = new HashMap<>();
-        map.put(DRPCAuthorizerBase.FUNCTION_NAME, functionName);
-        checkAuthorization(authorizer, map, "fetchRequest");
-        ConcurrentLinkedQueue<DRPCRequest> queue = acquireQueue(functionName);
-        DRPCRequest req = queue.poll();
-        if (req != null) {
-            LOG.debug("Fetched request for {} at {}", functionName, System.currentTimeMillis());
-            return req;
-        } else {
-            return new DRPCRequest("", "");
-        }
-    }
-
-    @Override
-    public void failRequest(String id) throws AuthorizationException, TException {
-        meterFailRequestCalls.mark();
-        String func = this.idtoFunction.get(id);
-        if (func != null) {
-            Map<String, String> map = new HashMap<>();
-            map.put(DRPCAuthorizerBase.FUNCTION_NAME, func);
-            checkAuthorization(authorizer, map, "failRequest");
-            Semaphore sem = this.idtoSem.get(id);
-            if (sem != null) {
-                this.idtoResult.put(id, new DRPCExecutionException("Request failed"));
-                sem.release();
-            }
-        }
-    }
-
-    protected ConcurrentLinkedQueue<DRPCRequest> acquireQueue(String function) {
-        ConcurrentLinkedQueue<DRPCRequest> reqQueue = requestQueues.get(function);
-        if (reqQueue == null) {
-            reqQueue = new ConcurrentLinkedQueue<DRPCRequest>();
-            requestQueues.put(function, reqQueue);
-        }
-        return reqQueue;
-    }
-
-    private void checkAuthorization(IAuthorizer aclHandler, Map mapping, String operation, ReqContext reqContext) throws AuthorizationException {
-        if (reqContext != null) {
-            ThriftAccessLogger.logAccess(reqContext.requestID(), reqContext.remoteAddress(), reqContext.principal(), operation);
-        }
-        if (aclHandler != null) {
-            if (reqContext == null)
-                reqContext = ReqContext.context();
-            if (!aclHandler.permit(reqContext, operation, mapping)) {
-                Principal principal = reqContext.principal();
-                String user = (principal != null) ? principal.getName() : "unknown";
-                throw new AuthorizationException("DRPC request '" + operation + "' for '" + user + "' user is not authorized");
-            }
-        }
-    }
-
-    private void checkAuthorization(IAuthorizer aclHandler, Map mapping, String operation) throws AuthorizationException {
-        checkAuthorization(aclHandler, mapping, operation, ReqContext.context());
-    }
-
-    // TO be replaced by Common.mkAuthorizationHandler
-    private IAuthorizer mkAuthorizationHandler(String klassname, Map conf) {
-        IAuthorizer authorizer = null;
-        Class aznClass = null;
-        if (StringUtils.isNotBlank(klassname)) {
-            try {
-                aznClass = Class.forName(klassname);
-                authorizer = (IAuthorizer) aznClass.newInstance();
-                if (authorizer != null) {
-                    authorizer.prepare(conf);
-                }
-            } catch (Exception e) {
-                LOG.error("mkAuthorizationHandler failed!", e);
-            }
-        }
-        LOG.debug("authorization class name: {} class: {} handler: {}", klassname, aznClass, authorizer);
-        return authorizer;
-    }
-
-    public Map getConf() {
-        return conf;
-    }
-
-    public static void main(String[] args) throws Exception {
-
-        Utils.setupDefaultUncaughtExceptionHandler();
-        final DrpcProcess service = new DrpcProcess();
-        service.launchServer();
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/802d28e6/storm-core/src/jvm/org/apache/storm/daemon/DrpcServer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/DrpcServer.java b/storm-core/src/jvm/org/apache/storm/daemon/DrpcServer.java
new file mode 100644
index 0000000..7cee915
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/DrpcServer.java
@@ -0,0 +1,390 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import com.sun.net.httpserver.HttpsServer;
+import com.sun.org.apache.bcel.internal.generic.ARRAYLENGTH;
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.daemon.metrics.MetricsUtils;
+import org.apache.storm.daemon.metrics.reporters.PreparableReporter;
+import org.apache.storm.generated.*;
+import org.apache.storm.logging.ThriftAccessLogger;
+import org.apache.storm.security.auth.*;
+import org.apache.storm.security.auth.authorizer.DRPCAuthorizerBase;
+import org.apache.storm.ui.FilterConfiguration;
+import org.apache.storm.ui.IConfigurator;
+import org.apache.storm.ui.UIHelpers;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.VersionInfo;
+import org.apache.thrift.TException;
+import org.eclipse.jetty.server.Server;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.Servlet;
+import java.security.Principal;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class DrpcServer implements DistributedRPC.Iface, DistributedRPCInvocations.Iface, Shutdownable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DrpcServer.class);
+    private final Long timeoutCheckSecs = 5L;
+
+    private Map conf;
+
+    private ThriftServer handlerServer;
+    private ThriftServer invokeServer;
+    private IHttpCredentialsPlugin httpCredsHandler;
+
+    private Thread clearThread;
+
+    private IAuthorizer authorizer;
+
+    // To be removed after porting drpc.clj
+    private Servlet httpServlet;
+
+    private AtomicInteger ctr = new AtomicInteger(0);
+    private ConcurrentHashMap<String, Semaphore> idtoSem = new ConcurrentHashMap<String, Semaphore>();
+    private ConcurrentHashMap<String, Object> idtoResult = new ConcurrentHashMap<String, Object>();
+    private ConcurrentHashMap<String, Integer> idtoStart = new ConcurrentHashMap<String, Integer>();
+    private ConcurrentHashMap<String, String> idtoFunction = new ConcurrentHashMap<String, String>();
+    private ConcurrentHashMap<String, DRPCRequest> idtoRequest = new ConcurrentHashMap<String, DRPCRequest>();
+    private ConcurrentHashMap<String, ConcurrentLinkedQueue<DRPCRequest>> requestQueues = new ConcurrentHashMap<String, ConcurrentLinkedQueue<DRPCRequest>>();
+
+    private final Meter meterHttpRequests = new MetricRegistry().meter("drpc:num-execute-http-requests");
+    private final Meter meterExecuteCalls = new MetricRegistry().meter("drpc:num-execute-calls");
+    private final Meter meterResultCalls = new MetricRegistry().meter("drpc:num-result-calls");
+    private final Meter meterFailRequestCalls = new MetricRegistry().meter("drpc:num-failRequest-calls");
+    private final Meter meterFetchRequestCalls = new MetricRegistry().meter("drpc:num-fetchRequest-calls");
+    private final Meter meterShutdownCalls = new MetricRegistry().meter("drpc:num-shutdown-calls");
+    
+    public DrpcServer() {
+
+    }
+
+    public IHttpCredentialsPlugin getHttpCredsHandler() {
+        return httpCredsHandler;
+    }
+
+    public void setHttpCredsHandler(IHttpCredentialsPlugin httpCredsHandler) {
+        this.httpCredsHandler = httpCredsHandler;
+    }
+
+    public Servlet getHttpServlet() {
+        return httpServlet;
+    }
+
+    public void setHttpServlet(Servlet httpServlet) {
+        this.httpServlet = httpServlet;
+    }
+
+
+
+    private ThriftServer initHandlerServer(Map conf, final DrpcServer service) throws Exception {
+        int port = (int) conf.get(Config.DRPC_PORT);
+        if (port > 0) {
+            handlerServer = new ThriftServer(conf, new DistributedRPC.Processor<DistributedRPC.Iface>(service), ThriftConnectionType.DRPC);
+        }
+        return handlerServer;
+    }
+
+    private ThriftServer initInvokeServer(Map conf, final DrpcServer service) throws Exception {
+        invokeServer = new ThriftServer(conf, new DistributedRPCInvocations.Processor<DistributedRPCInvocations.Iface>(service),
+                ThriftConnectionType.DRPC_INVOCATIONS);
+        return invokeServer;
+    }
+
+    private void initServer() throws Exception {
+        Integer drpcHttpPort = (Integer) conf.get(Config.DRPC_HTTP_PORT);
+        handlerServer = initHandlerServer(conf, this);
+        invokeServer = initInvokeServer(conf, this);
+        httpCredsHandler = AuthUtils.GetDrpcHttpCredentialsPlugin(conf);
+        Utils.addShutdownHookWithForceKillIn1Sec(new Runnable() {
+            @Override
+            public void run() {
+                if (handlerServer != null) {
+                    handlerServer.stop();
+                } else {
+                    invokeServer.stop();
+                }
+            }
+        });
+        LOG.info("Starting Distributed RPC servers...");
+
+        LOG.info("Starting Distributed RPC servers...");
+        new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                invokeServer.serve();
+            }
+        }).start();
+        if (drpcHttpPort != null && drpcHttpPort > 0) {
+            String filterClass = (String) (conf.get(Config.DRPC_HTTP_FILTER));
+            Map<String, String> filterParams = (Map<String, String>) (conf.get(Config.DRPC_HTTP_FILTER_PARAMS));
+            FilterConfiguration filterConfiguration = new FilterConfiguration(filterParams, filterClass);
+            final List<FilterConfiguration> filterConfigurations = Arrays.asList(filterConfiguration);
+            final Integer httpsPort = Utils.getInt(conf.get(Config.DRPC_HTTPS_PORT), 0);
+            final String httpsKsPath = (String) (conf.get(Config.DRPC_HTTPS_KEYSTORE_PATH));
+            final String httpsKsPassword = (String) (conf.get(Config.DRPC_HTTPS_KEYSTORE_PASSWORD));
+            final String httpsKsType = (String) (conf.get(Config.DRPC_HTTPS_KEYSTORE_TYPE));
+            final String httpsKeyPassword = (String) (conf.get(Config.DRPC_HTTPS_KEY_PASSWORD));
+            final String httpsTsPath = (String) (conf.get(Config.DRPC_HTTPS_TRUSTSTORE_PATH));
+            final String httpsTsPassword = (String) (conf.get(Config.DRPC_HTTPS_TRUSTSTORE_PASSWORD));
+            final String httpsTsType = (String) (conf.get(Config.DRPC_HTTPS_TRUSTSTORE_TYPE));
+            final Boolean httpsWantClientAuth = (Boolean) (conf.get(Config.DRPC_HTTPS_WANT_CLIENT_AUTH));
+            final Boolean httpsNeedClientAuth = (Boolean) (conf.get(Config.DRPC_HTTPS_NEED_CLIENT_AUTH));
+
+            UIHelpers.stormRunJetty(drpcHttpPort, new IConfigurator() {
+                @Override
+                public void execute(Server s) {
+                    UIHelpers.configSsl(s, httpsPort, httpsKsPath, httpsKsPassword, httpsKsType, httpsKeyPassword, httpsTsPath, httpsTsPassword, httpsTsType,
+                            httpsNeedClientAuth, httpsWantClientAuth);
+                    UIHelpers.configFilter(s, httpServlet, filterConfigurations);
+                }
+            });
+        }
+
+        // To be replaced by Common.StartMetricsReporters
+        List<PreparableReporter> reporters = MetricsUtils.getPreparableReporters(conf);
+        for (PreparableReporter reporter : reporters) {
+            reporter.prepare(new MetricRegistry(), conf);
+            reporter.start();
+            LOG.info("Started statistics report plugin...");
+        }
+        if (handlerServer != null)
+            handlerServer.serve();
+    }
+
+    private void initClearThread() {
+        clearThread = Utils.asyncLoop(new Callable() {
+
+            @Override
+            public Object call() throws Exception {
+                for (Map.Entry<String, Integer> e : idtoStart.entrySet()) {
+
+                    if (Time.deltaSecs(e.getValue()) > Utils.getInt(conf.get(Config.DRPC_REQUEST_TIMEOUT_SECS), 0)) {
+                        String id = e.getKey();
+                        Semaphore sem = idtoSem.get(id);
+                        if (sem != null) {
+                            String func = idtoFunction.get(id);
+                            acquireQueue(func).remove(idtoRequest.get(id));
+                            LOG.warn("Timeout DRPC request id: {} start at {}", id, e.getValue());
+                            sem.release();
+                        }
+                        cleanup(id);
+                        LOG.info("Clear request " + id);
+                    }
+                }
+                return getTimeoutCheckSecs();
+            }
+        });
+    }
+
+    public Long getTimeoutCheckSecs() {
+        return timeoutCheckSecs;
+    }
+
+    public void launchServer(boolean isLocal, Map conf) throws Exception {
+
+        LOG.info("Starting drpc server for storm version {}", VersionInfo.getVersion());
+        this.conf = conf;
+        authorizer = mkAuthorizationHandler((String) (conf.get(Config.DRPC_AUTHORIZER)), conf);
+
+        initClearThread();
+        if (!isLocal)
+            initServer();
+    }
+
+    @Override
+    public void shutdown() {
+        meterShutdownCalls.mark();
+        clearThread.interrupt();
+    }
+
+    public void cleanup(String id) {
+        idtoSem.remove(id);
+        idtoResult.remove(id);
+        idtoStart.remove(id);
+        idtoFunction.remove(id);
+        idtoRequest.remove(id);
+    }
+
+    @Override
+    public String execute(String functionName, String funcArgs) throws DRPCExecutionException, AuthorizationException, org.apache.thrift.TException {
+        meterExecuteCalls.mark();
+        LOG.debug("Received DRPC request for {} {} at {} ", functionName, funcArgs, System.currentTimeMillis());
+        Map<String, String> map = new HashMap<>();
+        map.put(DRPCAuthorizerBase.FUNCTION_NAME, functionName);
+        checkAuthorization(authorizer, map, "execute");
+
+        int idinc = this.ctr.incrementAndGet();
+        int maxvalue = 1000000000;
+        int newid = idinc % maxvalue;
+        if (idinc != newid) {
+            this.ctr.compareAndSet(idinc, newid);
+        }
+
+        String strid = String.valueOf(newid);
+        Semaphore sem = new Semaphore(0);
+
+        DRPCRequest req = new DRPCRequest(funcArgs, strid);
+        this.idtoStart.put(strid, Time.currentTimeSecs());
+        this.idtoSem.put(strid, sem);
+        this.idtoFunction.put(strid, functionName);
+        this.idtoRequest.put(strid, req);
+        ConcurrentLinkedQueue<DRPCRequest> queue = acquireQueue(functionName);
+        queue.add(req);
+        LOG.debug("Waiting for DRPC request for {} {} at {}", functionName, funcArgs, System.currentTimeMillis());
+        try {
+            sem.acquire();
+        } catch (InterruptedException e) {
+            LOG.error("acquire fail ", e);
+        }
+        LOG.debug("Acquired for DRPC request for {} {} at {}", functionName, funcArgs, System.currentTimeMillis());
+
+        Object result = this.idtoResult.get(strid);
+
+        LOG.info("Returning for DRPC request for " + functionName + " " + funcArgs + " at " + (System.currentTimeMillis()));
+
+        this.cleanup(strid);
+
+        if (result instanceof DRPCExecutionException) {
+            throw (DRPCExecutionException) result;
+        }
+        if (result == null) {
+            throw new DRPCExecutionException("Request timed out");
+        }
+        return String.valueOf(result);
+    }
+
+    @Override
+    public void result(String id, String result) throws AuthorizationException, TException {
+        meterResultCalls.mark();
+        String func = this.idtoFunction.get(id);
+        if (func != null) {
+            Map<String, String> map = new HashMap<>();
+            map.put(DRPCAuthorizerBase.FUNCTION_NAME, func);
+            checkAuthorization(authorizer, map, "result");
+            Semaphore sem = this.idtoSem.get(id);
+            LOG.debug("Received result {} for {} at {}", result, id, System.currentTimeMillis());
+            if (sem != null) {
+                this.idtoResult.put(id, result);
+                sem.release();
+            }
+        }
+    }
+
+    @Override
+    public DRPCRequest fetchRequest(String functionName) throws AuthorizationException, TException {
+        meterFetchRequestCalls.mark();
+        Map<String, String> map = new HashMap<>();
+        map.put(DRPCAuthorizerBase.FUNCTION_NAME, functionName);
+        checkAuthorization(authorizer, map, "fetchRequest");
+        ConcurrentLinkedQueue<DRPCRequest> queue = acquireQueue(functionName);
+        DRPCRequest req = queue.poll();
+        if (req != null) {
+            LOG.debug("Fetched request for {} at {}", functionName, System.currentTimeMillis());
+            return req;
+        } else {
+            return new DRPCRequest("", "");
+        }
+    }
+
+    @Override
+    public void failRequest(String id) throws AuthorizationException, TException {
+        meterFailRequestCalls.mark();
+        String func = this.idtoFunction.get(id);
+        if (func != null) {
+            Map<String, String> map = new HashMap<>();
+            map.put(DRPCAuthorizerBase.FUNCTION_NAME, func);
+            checkAuthorization(authorizer, map, "failRequest");
+            Semaphore sem = this.idtoSem.get(id);
+            if (sem != null) {
+                this.idtoResult.put(id, new DRPCExecutionException("Request failed"));
+                sem.release();
+            }
+        }
+    }
+
+    protected ConcurrentLinkedQueue<DRPCRequest> acquireQueue(String function) {
+        ConcurrentLinkedQueue<DRPCRequest> reqQueue = requestQueues.get(function);
+        if (reqQueue == null) {
+            reqQueue = new ConcurrentLinkedQueue<DRPCRequest>();
+            requestQueues.put(function, reqQueue);
+        }
+        return reqQueue;
+    }
+
+    private void checkAuthorization(IAuthorizer aclHandler, Map mapping, String operation, ReqContext reqContext) throws AuthorizationException {
+        if (reqContext != null) {
+            ThriftAccessLogger.logAccess(reqContext.requestID(), reqContext.remoteAddress(), reqContext.principal(), operation);
+        }
+        if (aclHandler != null) {
+            if (reqContext == null)
+                reqContext = ReqContext.context();
+            if (!aclHandler.permit(reqContext, operation, mapping)) {
+                Principal principal = reqContext.principal();
+                String user = (principal != null) ? principal.getName() : "unknown";
+                throw new AuthorizationException("DRPC request '" + operation + "' for '" + user + "' user is not authorized");
+            }
+        }
+    }
+
+    private void checkAuthorization(IAuthorizer aclHandler, Map mapping, String operation) throws AuthorizationException {
+        checkAuthorization(aclHandler, mapping, operation, ReqContext.context());
+    }
+
+    // TO be replaced by Common.mkAuthorizationHandler
+    private IAuthorizer mkAuthorizationHandler(String klassname, Map conf) {
+        IAuthorizer authorizer = null;
+        Class aznClass = null;
+        if (StringUtils.isNotBlank(klassname)) {
+            try {
+                aznClass = Class.forName(klassname);
+                authorizer = (IAuthorizer) aznClass.newInstance();
+                if (authorizer != null) {
+                    authorizer.prepare(conf);
+                }
+            } catch (Exception e) {
+                LOG.error("mkAuthorizationHandler failed!", e);
+            }
+        }
+        LOG.debug("authorization class name: {} class: {} handler: {}", klassname, aznClass, authorizer);
+        return authorizer;
+    }
+
+    public Map getConf() {
+        return conf;
+    }
+
+    public static void main(String[] args) throws Exception {
+
+        Utils.setupDefaultUncaughtExceptionHandler();
+        final DrpcServer service = new DrpcServer();
+        service.launchServer(false, ConfigUtils.readStormConfig());
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/802d28e6/storm-core/test/clj/org/apache/storm/drpc_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/drpc_test.clj b/storm-core/test/clj/org/apache/storm/drpc_test.clj
index 6024674..4879d0d 100644
--- a/storm-core/test/clj/org/apache/storm/drpc_test.clj
+++ b/storm-core/test/clj/org/apache/storm/drpc_test.clj
@@ -22,11 +22,14 @@
   (:import [org.apache.storm.coordination CoordinatedBolt$FinishedCallback])
   (:import [org.apache.storm LocalDRPC LocalCluster])
   (:import [org.apache.storm.tuple Fields])
+  (:import [org.mockito Mockito])
+  (:import [org.mockito.exceptions.base MockitoAssertionError])
   (:import [org.apache.storm.utils ConfigUtils]
            [org.apache.storm.utils.staticmocking ConfigUtilsInstaller])
   (:import [org.apache.storm.generated DRPCExecutionException])
   (:import [java.util.concurrent ConcurrentLinkedQueue])
   (:import [org.apache.storm Thrift])
+  (:import [org.apache.storm.daemon DrpcServer])
   (:use [org.apache.storm config testing])
   (:use [org.apache.storm.internal clojure])
   (:use [org.apache.storm.daemon common drpc])
@@ -231,24 +234,26 @@
         delay-seconds 2
         conf {DRPC-REQUEST-TIMEOUT-SECS delay-seconds}
         mock-cu (proxy [ConfigUtils] []
-                  (readStormConfigImpl [] conf))]
+                  (readStormConfigImpl [] conf))
+        drpc-handler (proxy [DrpcServer] []
+                       (acquireQueue [function] queue))]
     (with-open [_ (ConfigUtilsInstaller. mock-cu)]
-      (stubbing [acquire-queue queue]
-        (let [drpc-handler (service-handler conf)]
-          (is (thrown? DRPCExecutionException
+      (.launchServer drpc-handler true conf)
+      (is (thrown? DRPCExecutionException
             (.execute drpc-handler "ArbitraryDRPCFunctionName" "")))
-          (is (= 0 (.size queue))))))))
+      (is (= 0 (.size queue))))))
 
-(deftest test-drpc-timeout-cleanup 
+(deftest test-drpc-timeout-cleanup
   (let [queue (ConcurrentLinkedQueue.)
         delay-seconds 1
         conf {DRPC-REQUEST-TIMEOUT-SECS delay-seconds}
         mock-cu (proxy [ConfigUtils] []
-                  (readStormConfigImpl [] conf))]
+                  (readStormConfigImpl [] conf))
+        drpc-handler (proxy [DrpcServer] []
+          (acquireQueue [function] queue)
+          (getTimeoutCheckSecs [] delay-seconds))]
     (with-open [_ (ConfigUtilsInstaller. mock-cu)]
-          (stubbing [acquire-queue queue
-                     timeout-check-secs delay-seconds]
-              (let [drpc-handler (service-handler conf)]
-                (is (thrown? DRPCExecutionException 
-                             (.execute drpc-handler "ArbitraryDRPCFunctionName" "no-args"))))))))
+      (.launchServer drpc-handler true conf)
+      (is (thrown? DRPCExecutionException
+            (.execute drpc-handler "ArbitraryDRPCFunctionName" "no-args"))))))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/802d28e6/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj b/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
index 27f5816..a366efa 100644
--- a/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
+++ b/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
@@ -27,6 +27,8 @@
   (:import [javax.security.auth Subject])
   (:import [java.net InetAddress])
   (:import [org.apache.storm Config])
+  (:import [org.mockito Mockito])
+  (:import [org.mockito.exceptions.base MockitoAssertionError])
   (:import [org.apache.storm.generated AuthorizationException])
   (:import [org.apache.storm.utils NimbusClient ConfigUtils])
   (:import [org.apache.storm.security.auth.authorizer SimpleWhitelistAuthorizer SimpleACLAuthorizer])

http://git-wip-us.apache.org/repos/asf/storm/blob/802d28e6/storm-core/test/clj/org/apache/storm/security/auth/drpc_auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/security/auth/drpc_auth_test.clj b/storm-core/test/clj/org/apache/storm/security/auth/drpc_auth_test.clj
index 3250054..3eef31b 100644
--- a/storm-core/test/clj/org/apache/storm/security/auth/drpc_auth_test.clj
+++ b/storm-core/test/clj/org/apache/storm/security/auth/drpc_auth_test.clj
@@ -18,7 +18,8 @@
   (:require [org.apache.storm.daemon [drpc :as drpc]])
   (:import [org.apache.storm.generated AuthorizationException
             DRPCExecutionException DistributedRPC$Processor
-            DistributedRPCInvocations$Processor])
+            DistributedRPCInvocations$Processor]
+           [org.apache.storm.daemon DrpcServer])
   (:import [org.apache.storm Config])
   (:import [org.apache.storm.security.auth ReqContext SingleUserPrincipal ThriftServer ThriftConnectionType])
   (:import [org.apache.storm.utils DRPCClient ConfigUtils])
@@ -37,7 +38,7 @@
         conf (if login-cfg (assoc conf "java.security.auth.login.config" login-cfg) conf)
         conf (assoc conf DRPC-PORT client-port)
         conf (assoc conf DRPC-INVOCATIONS-PORT invocations-port)
-        service-handler (drpc/service-handler conf)
+        service-handler (let [drpc-service (DrpcServer.)] (.launchServer drpc-service true conf) drpc-service)
         handler-server (ThriftServer. conf
                                       (DistributedRPC$Processor. service-handler)
                                       ThriftConnectionType/DRPC)