You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/12/19 00:54:14 UTC

[4/8] storm git commit: refactored to move to new modules

refactored to move to new modules


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f0b18d94
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f0b18d94
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f0b18d94

Branch: refs/heads/master
Commit: f0b18d9493a58ae6179b8011e64330377948c8c4
Parents: ea44062
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue Dec 6 17:12:48 2016 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Tue Dec 6 17:12:48 2016 -0600

----------------------------------------------------------------------
 pom.xml                                         |   1 +
 storm-core/pom.xml                              |  12 --
 .../apache/storm/daemon/drpc/DRPCServer.java    | 168 ---------------
 .../webapp/AuthorizationExceptionMapper.java    |  39 ----
 .../daemon/drpc/webapp/DRPCApplication.java     |  47 ----
 .../daemon/drpc/webapp/DRPCExceptionMapper.java |  61 ------
 .../storm/daemon/drpc/webapp/DRPCResource.java  |  63 ------
 .../daemon/drpc/webapp/ReqContextFilter.java    |  69 ------
 .../src/jvm/org/apache/storm/ui/UIHelpers.java  |   7 -
 .../storm/daemon/drpc/DRPCServerTest.java       | 214 -------------------
 storm-drpc-server/pom.xml                       | 162 ++++++++++++++
 .../apache/storm/daemon/drpc/DRPCServer.java    | 180 ++++++++++++++++
 .../webapp/AuthorizationExceptionMapper.java    |  39 ++++
 .../daemon/drpc/webapp/DRPCApplication.java     |  47 ++++
 .../daemon/drpc/webapp/DRPCExceptionMapper.java |  61 ++++++
 .../storm/daemon/drpc/webapp/DRPCResource.java  |  63 ++++++
 .../daemon/drpc/webapp/ReqContextFilter.java    |  69 ++++++
 .../storm/daemon/drpc/DRPCServerTest.java       | 214 +++++++++++++++++++
 18 files changed, 836 insertions(+), 680 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f0b18d94/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6f1ba86..4cc637f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -308,6 +308,7 @@
         <module>storm-buildtools/maven-shade-clojure-transformer</module>
         <module>storm-buildtools/storm-maven-plugins</module>
         <module>storm-core</module>
+        <module>storm-drpc-server</module>
         <module>storm-rename-hack</module>
         <module>storm-clojure</module>
 

http://git-wip-us.apache.org/repos/asf/storm/blob/f0b18d94/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index 9c47914..b6424d7 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -372,18 +372,6 @@
                 </exclusion>
             </exclusions>
         </dependency>
-        <dependency>
-           <groupId>org.glassfish.jersey.core</groupId>
-            <artifactId>jersey-server</artifactId>
-        </dependency> 
-        <dependency>
-            <groupId>org.glassfish.jersey.containers</groupId> 
-            <artifactId>jersey-container-servlet-core</artifactId>
-        </dependency> 
-        <dependency>
-            <groupId>org.glassfish.jersey.containers</groupId> 
-            <artifactId>jersey-container-jetty-http</artifactId>
-        </dependency>
     </dependencies>
     <build>
         <sourceDirectory>src/jvm</sourceDirectory>

http://git-wip-us.apache.org/repos/asf/storm/blob/f0b18d94/storm-core/src/jvm/org/apache/storm/daemon/drpc/DRPCServer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/drpc/DRPCServer.java b/storm-core/src/jvm/org/apache/storm/daemon/drpc/DRPCServer.java
deleted file mode 100644
index 34dba76..0000000
--- a/storm-core/src/jvm/org/apache/storm/daemon/drpc/DRPCServer.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.drpc;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.storm.Config;
-import org.apache.storm.daemon.drpc.webapp.DRPCApplication;
-import org.apache.storm.generated.DistributedRPC;
-import org.apache.storm.generated.DistributedRPCInvocations;
-import org.apache.storm.metric.StormMetricsRegistry;
-import org.apache.storm.security.auth.ThriftConnectionType;
-import org.apache.storm.security.auth.ThriftServer;
-import org.apache.storm.ui.FilterConfiguration;
-import org.apache.storm.ui.UIHelpers;
-import org.apache.storm.utils.ConfigUtils;
-import org.apache.storm.utils.Utils;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.servlet.ServletContextHandler;
-import org.eclipse.jetty.servlet.ServletHolder;
-import org.glassfish.jersey.servlet.ServletContainer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.codahale.metrics.Meter;
-import com.google.common.annotations.VisibleForTesting;
-
-public class DRPCServer implements AutoCloseable {
-    private static final Logger LOG = LoggerFactory.getLogger(DRPCServer.class);
-    private final static Meter meterShutdownCalls = StormMetricsRegistry.registerMeter("drpc:num-shutdown-calls");
-    
-    private static ThriftServer mkHandlerServer(final DistributedRPC.Iface service, Integer port, Map<String, Object> conf) {
-        ThriftServer ret = null;
-        if (port != null && port > 0) {
-            ret = new ThriftServer(conf, new DistributedRPC.Processor<>(service),
-                    ThriftConnectionType.DRPC);
-        }
-        return ret;
-    }
-
-    private static ThriftServer mkInvokeServer(final DistributedRPCInvocations.Iface service, int port, Map<String, Object> conf) {
-        return new ThriftServer(conf, new DistributedRPCInvocations.Processor<>(service),
-                ThriftConnectionType.DRPC_INVOCATIONS);
-    }
-    
-    private static Server mkHttpServer(Map<String, Object> conf, DRPC drpc) {
-        Integer drpcHttpPort = (Integer) conf.get(Config.DRPC_HTTP_PORT);
-        Server ret = null;
-        if (drpcHttpPort != null && drpcHttpPort > 0) {
-            LOG.info("Starting RPC HTTP servers...");
-            String filterClass = (String) (conf.get(Config.DRPC_HTTP_FILTER));
-            @SuppressWarnings("unchecked")
-            Map<String, String> filterParams = (Map<String, String>) (conf.get(Config.DRPC_HTTP_FILTER_PARAMS));
-            FilterConfiguration filterConfiguration = new FilterConfiguration(filterClass, filterParams);
-            final List<FilterConfiguration> filterConfigurations = Arrays.asList(filterConfiguration);
-            final Integer httpsPort = Utils.getInt(conf.get(Config.DRPC_HTTPS_PORT), 0);
-            final String httpsKsPath = (String) (conf.get(Config.DRPC_HTTPS_KEYSTORE_PATH));
-            final String httpsKsPassword = (String) (conf.get(Config.DRPC_HTTPS_KEYSTORE_PASSWORD));
-            final String httpsKsType = (String) (conf.get(Config.DRPC_HTTPS_KEYSTORE_TYPE));
-            final String httpsKeyPassword = (String) (conf.get(Config.DRPC_HTTPS_KEY_PASSWORD));
-            final String httpsTsPath = (String) (conf.get(Config.DRPC_HTTPS_TRUSTSTORE_PATH));
-            final String httpsTsPassword = (String) (conf.get(Config.DRPC_HTTPS_TRUSTSTORE_PASSWORD));
-            final String httpsTsType = (String) (conf.get(Config.DRPC_HTTPS_TRUSTSTORE_TYPE));
-            final Boolean httpsWantClientAuth = (Boolean) (conf.get(Config.DRPC_HTTPS_WANT_CLIENT_AUTH));
-            final Boolean httpsNeedClientAuth = (Boolean) (conf.get(Config.DRPC_HTTPS_NEED_CLIENT_AUTH));
-
-            //TODO a better way to do this would be great.
-            DRPCApplication.setup(drpc);
-            ret = UIHelpers.jettyCreateServer(drpcHttpPort, null, httpsPort);
-            
-            UIHelpers.configSsl(ret, httpsPort, httpsKsPath, httpsKsPassword, httpsKsType, httpsKeyPassword, httpsTsPath, httpsTsPassword, httpsTsType,
-                    httpsNeedClientAuth, httpsWantClientAuth);
-            
-            ServletContextHandler context = new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
-            context.setContextPath("/");
-            ret.setHandler(context);
-
-            ServletHolder jerseyServlet = context.addServlet(ServletContainer.class, "/*");
-            jerseyServlet.setInitOrder(1);
-            jerseyServlet.setInitParameter("javax.ws.rs.Application", DRPCApplication.class.getName());
-            
-            UIHelpers.configFilters(context, filterConfigurations);
-            UIHelpers.addRequestContextFilter(context, Config.DRPC_HTTP_CREDS_PLUGIN, conf);
-        }
-        return ret;
-    }
-    
-    private final DRPC _drpc;
-    private final ThriftServer _handlerServer;
-    private final ThriftServer _invokeServer;
-    private final Server _httpServer;
-    private boolean _closed = false;
-
-    public DRPCServer(Map<String, Object> conf) {
-        _drpc = new DRPC(conf);
-        DRPCThrift thrift = new DRPCThrift(_drpc);
-        _handlerServer = mkHandlerServer(thrift, Utils.getInt(conf.get(Config.DRPC_PORT), null), conf);
-        _invokeServer = mkInvokeServer(thrift, Utils.getInt(conf.get(Config.DRPC_INVOCATIONS_PORT), 3773), conf);
-        _httpServer = mkHttpServer(conf, _drpc);
-    }
-
-    @VisibleForTesting
-    void start() throws Exception {
-        LOG.info("Starting Distributed RPC servers...");
-        new Thread(() -> _invokeServer.serve()).start();
-
-        if (_httpServer != null) {
-            _httpServer.start();
-        }
-        
-        if (_handlerServer != null) {
-            _handlerServer.serve();
-        } else {
-            _httpServer.join();
-        }
-    }
-
-    @Override
-    public synchronized void close() {
-        if (!_closed) {
-            //This is kind of useless...
-            meterShutdownCalls.mark();
-
-            if (_handlerServer != null) {
-                _handlerServer.stop();
-            }
-
-            if (_invokeServer != null) {
-                _invokeServer.stop();
-            }
-
-            //TODO this is causing issues...
-            //if (_httpServer != null) {
-            //    _httpServer.destroy();
-            //}
-            
-            _drpc.close();
-            _closed  = true;
-        }
-    }
-    
-    public static void main(String [] args) throws Exception {
-        Utils.setupDefaultUncaughtExceptionHandler();
-        Map<String, Object> conf = ConfigUtils.readStormConfig();
-        try (DRPCServer server = new DRPCServer(conf)) {
-            Utils.addShutdownHookWithForceKillIn1Sec(() -> server.close());
-            StormMetricsRegistry.startMetricsReporters(conf);
-            server.start();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/f0b18d94/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/AuthorizationExceptionMapper.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/AuthorizationExceptionMapper.java b/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/AuthorizationExceptionMapper.java
deleted file mode 100644
index 75c3100..0000000
--- a/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/AuthorizationExceptionMapper.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.drpc.webapp;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.ws.rs.core.Response;
-import javax.ws.rs.ext.ExceptionMapper;
-import javax.ws.rs.ext.Provider;
-
-import org.apache.storm.generated.AuthorizationException;
-import org.json.simple.JSONValue;
-
-@Provider
-public class AuthorizationExceptionMapper implements ExceptionMapper<AuthorizationException> {
-    @Override
-    public Response toResponse(AuthorizationException ex) {
-        Map<String, String> body = new HashMap<>();
-        body.put("error", "Not Authorized");
-        body.put("errorMessage", ex.get_msg());
-        return Response.status(403).entity(JSONValue.toJSONString(body)).type("application/json").build();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/f0b18d94/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCApplication.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCApplication.java b/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCApplication.java
deleted file mode 100644
index 4a0ce8c..0000000
--- a/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCApplication.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.drpc.webapp;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import javax.ws.rs.ApplicationPath;
-import javax.ws.rs.core.Application;
-
-import org.apache.storm.daemon.drpc.DRPC;
-
-@ApplicationPath("")
-public class DRPCApplication extends Application {
-    private static DRPC _drpc;
-    private final Set<Object> singletons = new HashSet<Object>();
-    
-    public DRPCApplication() {
-        singletons.add(new DRPCResource(_drpc));
-        singletons.add(new DRPCExceptionMapper());
-        singletons.add(new AuthorizationExceptionMapper());
-    }
-    
-    @Override
-    public Set<Object> getSingletons() {
-        return singletons;
-    }
-
-    public static void setup(DRPC drpc) {
-        _drpc = drpc;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/f0b18d94/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCExceptionMapper.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCExceptionMapper.java b/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCExceptionMapper.java
deleted file mode 100644
index 60cfc93..0000000
--- a/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCExceptionMapper.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.drpc.webapp;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.ResponseBuilder;
-import javax.ws.rs.ext.ExceptionMapper;
-import javax.ws.rs.ext.Provider;
-
-import org.apache.storm.generated.DRPCExecutionException;
-import org.json.simple.JSONValue;
-
-@Provider
-public class DRPCExceptionMapper implements ExceptionMapper<DRPCExecutionException> {
-
-    @Override
-    public Response toResponse(DRPCExecutionException ex) {
-        ResponseBuilder builder = Response.status(500);
-        switch (ex.get_type()) {
-            case FAILED_REQUEST:
-                builder.status(400);
-                break;
-            case SERVER_SHUTDOWN:
-                builder.status(503); //Not available
-                break;
-            case SERVER_TIMEOUT:
-                builder.status(504); //proxy timeout
-                break;
-            case INTERNAL_ERROR:
-                //fall throw on purpose
-            default:
-                //Empty (Still 500)
-                break;
-            
-        }
-        Map<String, String> body = new HashMap<>();
-        //TODO I would love to standardize this...
-        body.put("error", ex.is_set_type() ? ex.get_type().toString() : "Internal Error");
-        body.put("errorMessage", ex.get_msg());
-        return builder.entity(JSONValue.toJSONString(body)).type("application/json").build();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/f0b18d94/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCResource.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCResource.java b/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCResource.java
deleted file mode 100644
index d6490af..0000000
--- a/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCResource.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.drpc.webapp;
-
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.core.Context;
-
-import org.apache.storm.daemon.drpc.DRPC;
-import org.apache.storm.metric.StormMetricsRegistry;
-import org.apache.thrift.TException;
-
-import com.codahale.metrics.Meter;
-
-@Path("/drpc/")
-public class DRPCResource {
-    private static final Meter meterHttpRequests = StormMetricsRegistry.registerMeter("drpc:num-execute-http-requests");
-    private final DRPC _drpc;
-    public DRPCResource(DRPC drpc) {
-        _drpc = drpc;
-    }
-    
-    //TODO put in some better exception mapping...
-    //TODO move populateContext to a filter...
-    @POST
-    @Path("/{func}") 
-    public String post(@PathParam("func") String func, String args, @Context HttpServletRequest request) throws TException {
-        meterHttpRequests.mark();
-        return _drpc.executeBlocking(func, args);
-    }
-    
-    @GET
-    @Path("/{func}/{args}") 
-    public String get(@PathParam("func") String func, @PathParam("args") String args, @Context HttpServletRequest request) throws TException {
-        meterHttpRequests.mark();
-        return _drpc.executeBlocking(func, args);
-    }
-    
-    @GET
-    @Path("/{func}") 
-    public String get(@PathParam("func") String func, @Context HttpServletRequest request) throws TException {
-        meterHttpRequests.mark();
-        return _drpc.executeBlocking(func, "");
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/f0b18d94/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/ReqContextFilter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/ReqContextFilter.java b/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/ReqContextFilter.java
deleted file mode 100644
index 521901a..0000000
--- a/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/ReqContextFilter.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.drpc.webapp;
-import java.io.IOException;
-
-import javax.servlet.Filter;
-import javax.servlet.FilterChain;
-import javax.servlet.FilterConfig;
-import javax.servlet.ServletException;
-import javax.servlet.ServletRequest;
-import javax.servlet.ServletResponse;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.apache.storm.security.auth.IHttpCredentialsPlugin;
-import org.apache.storm.security.auth.ReqContext;
-
-public class ReqContextFilter implements Filter {
-    private final IHttpCredentialsPlugin _httpCredsHandler;
-
-    public ReqContextFilter(IHttpCredentialsPlugin httpCredsHandler) {
-        _httpCredsHandler = httpCredsHandler;
-    }
-    
-    /**
-     * Populate the Storm RequestContext from an servlet request. This should be called in each handler
-     * @param request the request to populate
-     */
-    public void populateContext(HttpServletRequest request) {
-        if (_httpCredsHandler != null) {
-            _httpCredsHandler.populateContext(ReqContext.context(), request);
-        }
-    }
-    
-    public void init(FilterConfig config) throws ServletException {
-        //NOOP
-        //We could add in configs through the web.xml if we wanted something stand alone here...
-    }
-
-    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
-        handle((HttpServletRequest)request, (HttpServletResponse)response, chain);
-    }
-
-    public void handle(HttpServletRequest request, HttpServletResponse response, FilterChain chain) throws IOException, ServletException{
-        if (request != null) {
-            populateContext(request);
-        }
-        chain.doFilter(request, response);
-    }
-
-    public void destroy() {
-        //NOOP
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/f0b18d94/storm-core/src/jvm/org/apache/storm/ui/UIHelpers.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/ui/UIHelpers.java b/storm-core/src/jvm/org/apache/storm/ui/UIHelpers.java
index e97d52d..2f0cce1 100644
--- a/storm-core/src/jvm/org/apache/storm/ui/UIHelpers.java
+++ b/storm-core/src/jvm/org/apache/storm/ui/UIHelpers.java
@@ -21,7 +21,6 @@ import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableMap;
 
 import org.apache.storm.Config;
-import org.apache.storm.daemon.drpc.webapp.ReqContextFilter;
 import org.apache.storm.generated.ExecutorInfo;
 import org.apache.storm.logging.filters.AccessLoggingFilter;
 import org.apache.storm.security.auth.AuthUtils;
@@ -208,12 +207,6 @@ public class UIHelpers {
         context.addFilter(mkAccessLoggingFilterHandle(), "/*", EnumSet.allOf(DispatcherType.class));
     }
     
-    public static void addRequestContextFilter(ServletContextHandler context, String configName, Map<String, Object> conf) {
-        IHttpCredentialsPlugin auth = AuthUtils.GetHttpCredentialsPlugin(conf, (String)conf.get(configName));
-        ReqContextFilter filter = new ReqContextFilter(auth);
-        context.addFilter(new FilterHolder(filter), "/*", FilterMapping.ALL);
-    }
-
     private static Server removeNonSslConnector(Server server) {
         for (Connector c : server.getConnectors()) {
             if (c != null && !(c instanceof SslSocketConnector)) {

http://git-wip-us.apache.org/repos/asf/storm/blob/f0b18d94/storm-core/test/jvm/org/apache/storm/daemon/drpc/DRPCServerTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/daemon/drpc/DRPCServerTest.java b/storm-core/test/jvm/org/apache/storm/daemon/drpc/DRPCServerTest.java
deleted file mode 100644
index b2df441..0000000
--- a/storm-core/test/jvm/org/apache/storm/daemon/drpc/DRPCServerTest.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.drpc;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-
-import java.io.InputStream;
-import java.net.URL;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.storm.Config;
-import org.apache.storm.daemon.drpc.DRPCServer;
-import org.apache.storm.drpc.DRPCInvocationsClient;
-import org.apache.storm.generated.DRPCExecutionException;
-import org.apache.storm.generated.DRPCRequest;
-import org.apache.storm.security.auth.SimpleTransportPlugin;
-import org.apache.storm.utils.DRPCClient;
-import org.apache.storm.utils.Utils;
-import org.junit.AfterClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class DRPCServerTest {
-    private static final Logger LOG = LoggerFactory.getLogger(DRPCServerTest.class);
-    private static final ExecutorService exec = Executors.newCachedThreadPool();
-    
-    @AfterClass
-    public static void close() {
-        exec.shutdownNow();
-    }
-    
-    private static DRPCRequest getNextAvailableRequest(DRPCInvocationsClient invoke, String func) throws Exception {
-        DRPCRequest request = null;
-        long timedout = System.currentTimeMillis() + 5_000;
-        while (System.currentTimeMillis() < timedout) {
-            request = invoke.getClient().fetchRequest(func);
-            if (request != null && request.get_request_id() != null && !request.get_request_id().isEmpty()) {
-                return request;
-            }
-            Thread.sleep(1);
-        }
-        fail("Test timed out waiting for a request on " + func);
-        return request;
-    }
-    
-    private Map<String, Object> getConf(int drpcPort, int invocationsPort, Integer httpPort) {
-        Map<String, Object> conf = new HashMap<>();
-        conf.put(Config.DRPC_PORT, drpcPort);
-        conf.put(Config.DRPC_INVOCATIONS_PORT, invocationsPort);
-        conf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, SimpleTransportPlugin.class.getName());
-        conf.put(Config.DRPC_WORKER_THREADS, 5);
-        conf.put(Config.DRPC_INVOCATIONS_THREADS, 5);
-        conf.put(Config.DRPC_MAX_BUFFER_SIZE, 1048576);
-        conf.put(Config.STORM_NIMBUS_RETRY_TIMES, 2);
-        conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL, 10);
-        conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING, 100);
-        if (httpPort != null) {
-            conf.put(Config.DRPC_HTTP_PORT, httpPort);
-        }
-        return conf;
-    }
-    
-    @Test
-    public void testGoodThrift() throws Exception {
-        int drpcPort = Utils.getAvailablePort();
-        int invocationsPort = Utils.getAvailablePort(drpcPort + 1);
-        Map<String, Object> conf = getConf(drpcPort, invocationsPort, null);
-        try (DRPCServer server = new DRPCServer(conf)) {
-            exec.submit(() -> {
-                server.start();
-                return null;
-            });
-            try (DRPCClient client = new DRPCClient(conf, "localhost", drpcPort);
-                 DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost", invocationsPort)) {
-                Future<String> found = exec.submit(() -> client.getClient().execute("testing", "test"));
-                DRPCRequest request = getNextAvailableRequest(invoke, "testing");
-                assertNotNull(request);
-                assertEquals("test", request.get_func_args());
-                assertNotNull(request.get_request_id());
-                invoke.result(request.get_request_id(), "tested");
-                String result = found.get(1000, TimeUnit.MILLISECONDS);
-                assertEquals("tested", result);
-            }
-        }
-    }
-    
-    @Test
-    public void testFailedThrift() throws Exception {
-        int drpcPort = Utils.getAvailablePort();
-        int invocationsPort = Utils.getAvailablePort(drpcPort + 1);
-        Map<String, Object> conf = getConf(drpcPort, invocationsPort, null);
-        try (DRPCServer server = new DRPCServer(conf)) {
-            exec.submit(() -> {
-                server.start();
-                return null;
-            });
-            try (DRPCClient client = new DRPCClient(conf, "localhost", drpcPort);
-                    DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost", invocationsPort)) {
-                Future<String> found = exec.submit(() -> client.getClient().execute("testing", "test"));
-                DRPCRequest request = getNextAvailableRequest(invoke, "testing");
-                assertNotNull(request);
-                assertEquals("test", request.get_func_args());
-                assertNotNull(request.get_request_id());
-                invoke.failRequest(request.get_request_id());
-                try {
-                    found.get(1000, TimeUnit.MILLISECONDS);
-                    fail("exec did not throw an exception");
-                } catch (ExecutionException e) {
-                    Throwable t = e.getCause();
-                    assertEquals(t.getClass(), DRPCExecutionException.class);
-                    //Don't know a better way to validate that it failed.
-                    assertEquals("Request failed", ((DRPCExecutionException)t).get_msg());
-                }
-            }
-        }
-    }
-    
-    public static String GET(int port, String func, String args) {
-        try {
-            URL url = new URL("http://localhost:"+port+"/drpc/"+func+"/"+args);
-            InputStream in = url.openStream();
-            byte[] buffer = new byte[1024];
-            int read = in.read(buffer);
-            return new String(buffer, 0, read);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-    
-    @Test
-    public void testGoodHttpGet() throws Exception {
-        LOG.info("STARTING HTTP GET TEST...");
-        int drpcPort = Utils.getAvailablePort();
-        int invocationsPort = Utils.getAvailablePort(drpcPort + 1);
-        int httpPort = Utils.getAvailablePort(invocationsPort + 1);
-        Map<String, Object> conf = getConf(drpcPort, invocationsPort, httpPort);
-        try (DRPCServer server = new DRPCServer(conf)) {
-            exec.submit(() -> {
-                server.start();
-                return null;
-            });
-            //TODO need a better way to do this
-            Thread.sleep(2000);
-            try (DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost", invocationsPort)) {
-                Future<String> found = exec.submit(() -> GET(httpPort, "testing", "test"));
-                DRPCRequest request = getNextAvailableRequest(invoke, "testing");
-                assertNotNull(request);
-                assertEquals("test", request.get_func_args());
-                assertNotNull(request.get_request_id());
-                invoke.result(request.get_request_id(), "tested");
-                String result = found.get(1000, TimeUnit.MILLISECONDS);
-                assertEquals("tested", result);
-            }
-        }
-    }
-    
-    @Test
-    public void testFailedHttpGet() throws Exception {
-        LOG.info("STARTING HTTP GET (FAIL) TEST...");
-        int drpcPort = Utils.getAvailablePort();
-        int invocationsPort = Utils.getAvailablePort(drpcPort + 1);
-        int httpPort = Utils.getAvailablePort(invocationsPort + 1);
-        Map<String, Object> conf = getConf(drpcPort, invocationsPort, httpPort);
-        try (DRPCServer server = new DRPCServer(conf)) {
-            exec.submit(() -> {
-                server.start();
-                return null;
-            });
-            //TODO need a better way to do this
-            Thread.sleep(2000);
-            try (DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost", invocationsPort)) {
-                Future<String> found = exec.submit(() -> GET(httpPort, "testing", "test"));
-                DRPCRequest request = getNextAvailableRequest(invoke, "testing");
-                assertNotNull(request);
-                assertEquals("test", request.get_func_args());
-                assertNotNull(request.get_request_id());
-                invoke.getClient().failRequest(request.get_request_id());
-                try {
-                    found.get(1000, TimeUnit.MILLISECONDS);
-                    fail("exec did not throw an exception");
-                } catch (ExecutionException e) {
-                    LOG.warn("Got Expected Exception", e);
-                    //Getting the exact response code is a bit more complex.
-                    //TODO should use a better client
-                }
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/f0b18d94/storm-drpc-server/pom.xml
----------------------------------------------------------------------
diff --git a/storm-drpc-server/pom.xml b/storm-drpc-server/pom.xml
new file mode 100644
index 0000000..d028b0a
--- /dev/null
+++ b/storm-drpc-server/pom.xml
@@ -0,0 +1,162 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>2.0.0-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <artifactId>storm-drpc-server</artifactId>
+    <packaging>jar</packaging>
+    <name>Storm DRPC Server</name>
+    <description>DRPC Server for Apache Storm</description>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.googlecode.json-simple</groupId>
+            <artifactId>json-simple</artifactId>
+            <scope>compile</scope>
+        </dependency> 
+        <dependency>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+            <version>1.2.17</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-slf4j-impl</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.dropwizard.metrics</groupId>
+            <artifactId>metrics-core</artifactId>
+        </dependency>
+        <!-- hamcrest-core dependency is shaded inside the mockito-all and junit depends on newer version of hamcrest-core.
+        To give higher precedence to classes from newer version of hamcrest-core, Junit has been placed above mockito.
+         -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>javax.servlet</groupId>
+            <artifactId>servlet-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+           <groupId>org.glassfish.jersey.core</groupId>
+            <artifactId>jersey-server</artifactId>
+        </dependency> 
+        <dependency>
+            <groupId>org.glassfish.jersey.containers</groupId> 
+            <artifactId>jersey-container-servlet-core</artifactId>
+        </dependency> 
+        <dependency>
+            <groupId>org.glassfish.jersey.containers</groupId> 
+            <artifactId>jersey-container-jetty-http</artifactId>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+           <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-report-plugin</artifactId>
+                <configuration>
+                    <reportsDirectories>
+                        <file>${project.build.directory}/test-reports</file>
+                    </reportsDirectories>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-failsafe-plugin</artifactId>
+                <configuration>
+                    <reportsDirectory>${project.build.directory}/test-reports</reportsDirectory>
+                </configuration>
+            </plugin>
+           <plugin>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <version>2.8</version>
+                <executions>
+                    <execution>
+                        <id>copy-dependencies</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>copy-dependencies</goal>
+                        </goals>
+                        <configuration>
+                            <overWriteReleases>false</overWriteReleases>
+                            <overWriteSnapshots>false</overWriteSnapshots>
+                            <overWriteIfNewer>true</overWriteIfNewer>
+                            <includeScope>runtime</includeScope>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-source-plugin</artifactId>
+                <version>2.2.1</version>
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <goals>
+                            <!-- avoid warning about recursion -->
+                            <goal>jar-no-fork</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>2.6</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/f0b18d94/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/DRPCServer.java
----------------------------------------------------------------------
diff --git a/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/DRPCServer.java b/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/DRPCServer.java
new file mode 100644
index 0000000..77862b9
--- /dev/null
+++ b/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/DRPCServer.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.drpc;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.Config;
+import org.apache.storm.daemon.drpc.webapp.DRPCApplication;
+import org.apache.storm.daemon.drpc.webapp.ReqContextFilter;
+import org.apache.storm.generated.DistributedRPC;
+import org.apache.storm.generated.DistributedRPCInvocations;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.security.auth.AuthUtils;
+import org.apache.storm.security.auth.IHttpCredentialsPlugin;
+import org.apache.storm.security.auth.ThriftConnectionType;
+import org.apache.storm.security.auth.ThriftServer;
+import org.apache.storm.ui.FilterConfiguration;
+import org.apache.storm.ui.UIHelpers;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.shade.org.eclipse.jetty.server.Server;
+import org.apache.storm.shade.org.eclipse.jetty.servlet.FilterHolder;
+import org.apache.storm.shade.org.eclipse.jetty.servlet.FilterMapping;
+import org.apache.storm.shade.org.eclipse.jetty.servlet.ServletContextHandler;
+import org.apache.storm.shade.org.eclipse.jetty.servlet.ServletHolder;
+import org.glassfish.jersey.servlet.ServletContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.shade.com.codahale.metrics.Meter;
+import com.google.common.annotations.VisibleForTesting;
+
+public class DRPCServer implements AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(DRPCServer.class);
+    private final static Meter meterShutdownCalls = StormMetricsRegistry.registerMeter("drpc:num-shutdown-calls");
+   
+    //TODO in the future this might be better in a common webapp location
+    public static void addRequestContextFilter(ServletContextHandler context, String configName, Map<String, Object> conf) {
+        IHttpCredentialsPlugin auth = AuthUtils.GetHttpCredentialsPlugin(conf, (String)conf.get(configName));
+        ReqContextFilter filter = new ReqContextFilter(auth);
+        context.addFilter(new FilterHolder(filter), "/*", FilterMapping.ALL);
+    }
+ 
+    private static ThriftServer mkHandlerServer(final DistributedRPC.Iface service, Integer port, Map<String, Object> conf) {
+        ThriftServer ret = null;
+        if (port != null && port > 0) {
+            ret = new ThriftServer(conf, new DistributedRPC.Processor<>(service),
+                    ThriftConnectionType.DRPC);
+        }
+        return ret;
+    }
+
+    private static ThriftServer mkInvokeServer(final DistributedRPCInvocations.Iface service, int port, Map<String, Object> conf) {
+        return new ThriftServer(conf, new DistributedRPCInvocations.Processor<>(service),
+                ThriftConnectionType.DRPC_INVOCATIONS);
+    }
+    
+    private static Server mkHttpServer(Map<String, Object> conf, DRPC drpc) {
+        Integer drpcHttpPort = (Integer) conf.get(Config.DRPC_HTTP_PORT);
+        Server ret = null;
+        if (drpcHttpPort != null && drpcHttpPort > 0) {
+            LOG.info("Starting RPC HTTP servers...");
+            String filterClass = (String) (conf.get(Config.DRPC_HTTP_FILTER));
+            @SuppressWarnings("unchecked")
+            Map<String, String> filterParams = (Map<String, String>) (conf.get(Config.DRPC_HTTP_FILTER_PARAMS));
+            FilterConfiguration filterConfiguration = new FilterConfiguration(filterClass, filterParams);
+            final List<FilterConfiguration> filterConfigurations = Arrays.asList(filterConfiguration);
+            final Integer httpsPort = Utils.getInt(conf.get(Config.DRPC_HTTPS_PORT), 0);
+            final String httpsKsPath = (String) (conf.get(Config.DRPC_HTTPS_KEYSTORE_PATH));
+            final String httpsKsPassword = (String) (conf.get(Config.DRPC_HTTPS_KEYSTORE_PASSWORD));
+            final String httpsKsType = (String) (conf.get(Config.DRPC_HTTPS_KEYSTORE_TYPE));
+            final String httpsKeyPassword = (String) (conf.get(Config.DRPC_HTTPS_KEY_PASSWORD));
+            final String httpsTsPath = (String) (conf.get(Config.DRPC_HTTPS_TRUSTSTORE_PATH));
+            final String httpsTsPassword = (String) (conf.get(Config.DRPC_HTTPS_TRUSTSTORE_PASSWORD));
+            final String httpsTsType = (String) (conf.get(Config.DRPC_HTTPS_TRUSTSTORE_TYPE));
+            final Boolean httpsWantClientAuth = (Boolean) (conf.get(Config.DRPC_HTTPS_WANT_CLIENT_AUTH));
+            final Boolean httpsNeedClientAuth = (Boolean) (conf.get(Config.DRPC_HTTPS_NEED_CLIENT_AUTH));
+
+            //TODO a better way to do this would be great.
+            DRPCApplication.setup(drpc);
+            ret = UIHelpers.jettyCreateServer(drpcHttpPort, null, httpsPort);
+            
+            UIHelpers.configSsl(ret, httpsPort, httpsKsPath, httpsKsPassword, httpsKsType, httpsKeyPassword, httpsTsPath, httpsTsPassword, httpsTsType,
+                    httpsNeedClientAuth, httpsWantClientAuth);
+            
+            ServletContextHandler context = new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
+            context.setContextPath("/");
+            ret.setHandler(context);
+
+            ServletHolder jerseyServlet = context.addServlet(ServletContainer.class, "/*");
+            jerseyServlet.setInitOrder(1);
+            jerseyServlet.setInitParameter("javax.ws.rs.Application", DRPCApplication.class.getName());
+            
+            UIHelpers.configFilters(context, filterConfigurations);
+            addRequestContextFilter(context, Config.DRPC_HTTP_CREDS_PLUGIN, conf);
+        }
+        return ret;
+    }
+    
+    private final DRPC _drpc;
+    private final ThriftServer _handlerServer;
+    private final ThriftServer _invokeServer;
+    private final Server _httpServer;
+    private boolean _closed = false;
+
+    public DRPCServer(Map<String, Object> conf) {
+        _drpc = new DRPC(conf);
+        DRPCThrift thrift = new DRPCThrift(_drpc);
+        _handlerServer = mkHandlerServer(thrift, Utils.getInt(conf.get(Config.DRPC_PORT), null), conf);
+        _invokeServer = mkInvokeServer(thrift, Utils.getInt(conf.get(Config.DRPC_INVOCATIONS_PORT), 3773), conf);
+        _httpServer = mkHttpServer(conf, _drpc);
+    }
+
+    @VisibleForTesting
+    void start() throws Exception {
+        LOG.info("Starting Distributed RPC servers...");
+        new Thread(() -> _invokeServer.serve()).start();
+
+        if (_httpServer != null) {
+            _httpServer.start();
+        }
+        
+        if (_handlerServer != null) {
+            _handlerServer.serve();
+        } else {
+            _httpServer.join();
+        }
+    }
+
+    @Override
+    public synchronized void close() {
+        if (!_closed) {
+            //This is kind of useless...
+            meterShutdownCalls.mark();
+
+            if (_handlerServer != null) {
+                _handlerServer.stop();
+            }
+
+            if (_invokeServer != null) {
+                _invokeServer.stop();
+            }
+
+            //TODO this is causing issues...
+            //if (_httpServer != null) {
+            //    _httpServer.destroy();
+            //}
+            
+            _drpc.close();
+            _closed  = true;
+        }
+    }
+    
+    public static void main(String [] args) throws Exception {
+        Utils.setupDefaultUncaughtExceptionHandler();
+        Map<String, Object> conf = ConfigUtils.readStormConfig();
+        try (DRPCServer server = new DRPCServer(conf)) {
+            Utils.addShutdownHookWithForceKillIn1Sec(() -> server.close());
+            StormMetricsRegistry.startMetricsReporters(conf);
+            server.start();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/f0b18d94/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/webapp/AuthorizationExceptionMapper.java
----------------------------------------------------------------------
diff --git a/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/webapp/AuthorizationExceptionMapper.java b/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/webapp/AuthorizationExceptionMapper.java
new file mode 100644
index 0000000..75c3100
--- /dev/null
+++ b/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/webapp/AuthorizationExceptionMapper.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.drpc.webapp;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.ExceptionMapper;
+import javax.ws.rs.ext.Provider;
+
+import org.apache.storm.generated.AuthorizationException;
+import org.json.simple.JSONValue;
+
+@Provider
+public class AuthorizationExceptionMapper implements ExceptionMapper<AuthorizationException> {
+    @Override
+    public Response toResponse(AuthorizationException ex) {
+        Map<String, String> body = new HashMap<>();
+        body.put("error", "Not Authorized");
+        body.put("errorMessage", ex.get_msg());
+        return Response.status(403).entity(JSONValue.toJSONString(body)).type("application/json").build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/f0b18d94/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCApplication.java
----------------------------------------------------------------------
diff --git a/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCApplication.java b/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCApplication.java
new file mode 100644
index 0000000..4a0ce8c
--- /dev/null
+++ b/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCApplication.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.drpc.webapp;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.ws.rs.ApplicationPath;
+import javax.ws.rs.core.Application;
+
+import org.apache.storm.daemon.drpc.DRPC;
+
+@ApplicationPath("")
+public class DRPCApplication extends Application {
+    private static DRPC _drpc;
+    private final Set<Object> singletons = new HashSet<Object>();
+    
+    public DRPCApplication() {
+        singletons.add(new DRPCResource(_drpc));
+        singletons.add(new DRPCExceptionMapper());
+        singletons.add(new AuthorizationExceptionMapper());
+    }
+    
+    @Override
+    public Set<Object> getSingletons() {
+        return singletons;
+    }
+
+    public static void setup(DRPC drpc) {
+        _drpc = drpc;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/f0b18d94/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCExceptionMapper.java
----------------------------------------------------------------------
diff --git a/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCExceptionMapper.java b/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCExceptionMapper.java
new file mode 100644
index 0000000..60cfc93
--- /dev/null
+++ b/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCExceptionMapper.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.drpc.webapp;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.ResponseBuilder;
+import javax.ws.rs.ext.ExceptionMapper;
+import javax.ws.rs.ext.Provider;
+
+import org.apache.storm.generated.DRPCExecutionException;
+import org.json.simple.JSONValue;
+
+@Provider
+public class DRPCExceptionMapper implements ExceptionMapper<DRPCExecutionException> {
+
+    @Override
+    public Response toResponse(DRPCExecutionException ex) {
+        ResponseBuilder builder = Response.status(500);
+        switch (ex.get_type()) {
+            case FAILED_REQUEST:
+                builder.status(400);
+                break;
+            case SERVER_SHUTDOWN:
+                builder.status(503); //Not available
+                break;
+            case SERVER_TIMEOUT:
+                builder.status(504); //proxy timeout
+                break;
+            case INTERNAL_ERROR:
+                //fall throw on purpose
+            default:
+                //Empty (Still 500)
+                break;
+            
+        }
+        Map<String, String> body = new HashMap<>();
+        //TODO I would love to standardize this...
+        body.put("error", ex.is_set_type() ? ex.get_type().toString() : "Internal Error");
+        body.put("errorMessage", ex.get_msg());
+        return builder.entity(JSONValue.toJSONString(body)).type("application/json").build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/f0b18d94/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCResource.java
----------------------------------------------------------------------
diff --git a/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCResource.java b/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCResource.java
new file mode 100644
index 0000000..cdd4817
--- /dev/null
+++ b/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCResource.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.drpc.webapp;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.core.Context;
+
+import org.apache.storm.daemon.drpc.DRPC;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.thrift.TException;
+
+import org.apache.storm.shade.com.codahale.metrics.Meter;
+
+@Path("/drpc/")
+public class DRPCResource {
+    private static final Meter meterHttpRequests = StormMetricsRegistry.registerMeter("drpc:num-execute-http-requests");
+    private final DRPC _drpc;
+    public DRPCResource(DRPC drpc) {
+        _drpc = drpc;
+    }
+    
+    //TODO put in some better exception mapping...
+    //TODO move populateContext to a filter...
+    @POST
+    @Path("/{func}") 
+    public String post(@PathParam("func") String func, String args, @Context HttpServletRequest request) throws TException {
+        meterHttpRequests.mark();
+        return _drpc.executeBlocking(func, args);
+    }
+    
+    @GET
+    @Path("/{func}/{args}") 
+    public String get(@PathParam("func") String func, @PathParam("args") String args, @Context HttpServletRequest request) throws TException {
+        meterHttpRequests.mark();
+        return _drpc.executeBlocking(func, args);
+    }
+    
+    @GET
+    @Path("/{func}") 
+    public String get(@PathParam("func") String func, @Context HttpServletRequest request) throws TException {
+        meterHttpRequests.mark();
+        return _drpc.executeBlocking(func, "");
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/f0b18d94/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/webapp/ReqContextFilter.java
----------------------------------------------------------------------
diff --git a/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/webapp/ReqContextFilter.java b/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/webapp/ReqContextFilter.java
new file mode 100644
index 0000000..521901a
--- /dev/null
+++ b/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/webapp/ReqContextFilter.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.drpc.webapp;
+import java.io.IOException;
+
+import javax.servlet.Filter;
+import javax.servlet.FilterChain;
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.storm.security.auth.IHttpCredentialsPlugin;
+import org.apache.storm.security.auth.ReqContext;
+
+public class ReqContextFilter implements Filter {
+    private final IHttpCredentialsPlugin _httpCredsHandler;
+
+    public ReqContextFilter(IHttpCredentialsPlugin httpCredsHandler) {
+        _httpCredsHandler = httpCredsHandler;
+    }
+    
+    /**
+     * Populate the Storm RequestContext from an servlet request. This should be called in each handler
+     * @param request the request to populate
+     */
+    public void populateContext(HttpServletRequest request) {
+        if (_httpCredsHandler != null) {
+            _httpCredsHandler.populateContext(ReqContext.context(), request);
+        }
+    }
+    
+    public void init(FilterConfig config) throws ServletException {
+        //NOOP
+        //We could add in configs through the web.xml if we wanted something stand alone here...
+    }
+
+    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
+        handle((HttpServletRequest)request, (HttpServletResponse)response, chain);
+    }
+
+    public void handle(HttpServletRequest request, HttpServletResponse response, FilterChain chain) throws IOException, ServletException{
+        if (request != null) {
+            populateContext(request);
+        }
+        chain.doFilter(request, response);
+    }
+
+    public void destroy() {
+        //NOOP
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/f0b18d94/storm-drpc-server/src/test/java/org/apache/storm/daemon/drpc/DRPCServerTest.java
----------------------------------------------------------------------
diff --git a/storm-drpc-server/src/test/java/org/apache/storm/daemon/drpc/DRPCServerTest.java b/storm-drpc-server/src/test/java/org/apache/storm/daemon/drpc/DRPCServerTest.java
new file mode 100644
index 0000000..b2df441
--- /dev/null
+++ b/storm-drpc-server/src/test/java/org/apache/storm/daemon/drpc/DRPCServerTest.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.drpc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import java.io.InputStream;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.storm.Config;
+import org.apache.storm.daemon.drpc.DRPCServer;
+import org.apache.storm.drpc.DRPCInvocationsClient;
+import org.apache.storm.generated.DRPCExecutionException;
+import org.apache.storm.generated.DRPCRequest;
+import org.apache.storm.security.auth.SimpleTransportPlugin;
+import org.apache.storm.utils.DRPCClient;
+import org.apache.storm.utils.Utils;
+import org.junit.AfterClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class DRPCServerTest {
+    private static final Logger LOG = LoggerFactory.getLogger(DRPCServerTest.class);
+    private static final ExecutorService exec = Executors.newCachedThreadPool();
+    
+    @AfterClass
+    public static void close() {
+        exec.shutdownNow();
+    }
+    
+    private static DRPCRequest getNextAvailableRequest(DRPCInvocationsClient invoke, String func) throws Exception {
+        DRPCRequest request = null;
+        long timedout = System.currentTimeMillis() + 5_000;
+        while (System.currentTimeMillis() < timedout) {
+            request = invoke.getClient().fetchRequest(func);
+            if (request != null && request.get_request_id() != null && !request.get_request_id().isEmpty()) {
+                return request;
+            }
+            Thread.sleep(1);
+        }
+        fail("Test timed out waiting for a request on " + func);
+        return request;
+    }
+    
+    private Map<String, Object> getConf(int drpcPort, int invocationsPort, Integer httpPort) {
+        Map<String, Object> conf = new HashMap<>();
+        conf.put(Config.DRPC_PORT, drpcPort);
+        conf.put(Config.DRPC_INVOCATIONS_PORT, invocationsPort);
+        conf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, SimpleTransportPlugin.class.getName());
+        conf.put(Config.DRPC_WORKER_THREADS, 5);
+        conf.put(Config.DRPC_INVOCATIONS_THREADS, 5);
+        conf.put(Config.DRPC_MAX_BUFFER_SIZE, 1048576);
+        conf.put(Config.STORM_NIMBUS_RETRY_TIMES, 2);
+        conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL, 10);
+        conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING, 100);
+        if (httpPort != null) {
+            conf.put(Config.DRPC_HTTP_PORT, httpPort);
+        }
+        return conf;
+    }
+    
+    @Test
+    public void testGoodThrift() throws Exception {
+        int drpcPort = Utils.getAvailablePort();
+        int invocationsPort = Utils.getAvailablePort(drpcPort + 1);
+        Map<String, Object> conf = getConf(drpcPort, invocationsPort, null);
+        try (DRPCServer server = new DRPCServer(conf)) {
+            exec.submit(() -> {
+                server.start();
+                return null;
+            });
+            try (DRPCClient client = new DRPCClient(conf, "localhost", drpcPort);
+                 DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost", invocationsPort)) {
+                Future<String> found = exec.submit(() -> client.getClient().execute("testing", "test"));
+                DRPCRequest request = getNextAvailableRequest(invoke, "testing");
+                assertNotNull(request);
+                assertEquals("test", request.get_func_args());
+                assertNotNull(request.get_request_id());
+                invoke.result(request.get_request_id(), "tested");
+                String result = found.get(1000, TimeUnit.MILLISECONDS);
+                assertEquals("tested", result);
+            }
+        }
+    }
+    
+    @Test
+    public void testFailedThrift() throws Exception {
+        int drpcPort = Utils.getAvailablePort();
+        int invocationsPort = Utils.getAvailablePort(drpcPort + 1);
+        Map<String, Object> conf = getConf(drpcPort, invocationsPort, null);
+        try (DRPCServer server = new DRPCServer(conf)) {
+            exec.submit(() -> {
+                server.start();
+                return null;
+            });
+            try (DRPCClient client = new DRPCClient(conf, "localhost", drpcPort);
+                    DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost", invocationsPort)) {
+                Future<String> found = exec.submit(() -> client.getClient().execute("testing", "test"));
+                DRPCRequest request = getNextAvailableRequest(invoke, "testing");
+                assertNotNull(request);
+                assertEquals("test", request.get_func_args());
+                assertNotNull(request.get_request_id());
+                invoke.failRequest(request.get_request_id());
+                try {
+                    found.get(1000, TimeUnit.MILLISECONDS);
+                    fail("exec did not throw an exception");
+                } catch (ExecutionException e) {
+                    Throwable t = e.getCause();
+                    assertEquals(t.getClass(), DRPCExecutionException.class);
+                    //Don't know a better way to validate that it failed.
+                    assertEquals("Request failed", ((DRPCExecutionException)t).get_msg());
+                }
+            }
+        }
+    }
+    
+    public static String GET(int port, String func, String args) {
+        try {
+            URL url = new URL("http://localhost:"+port+"/drpc/"+func+"/"+args);
+            InputStream in = url.openStream();
+            byte[] buffer = new byte[1024];
+            int read = in.read(buffer);
+            return new String(buffer, 0, read);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+    
+    @Test
+    public void testGoodHttpGet() throws Exception {
+        LOG.info("STARTING HTTP GET TEST...");
+        int drpcPort = Utils.getAvailablePort();
+        int invocationsPort = Utils.getAvailablePort(drpcPort + 1);
+        int httpPort = Utils.getAvailablePort(invocationsPort + 1);
+        Map<String, Object> conf = getConf(drpcPort, invocationsPort, httpPort);
+        try (DRPCServer server = new DRPCServer(conf)) {
+            exec.submit(() -> {
+                server.start();
+                return null;
+            });
+            //TODO need a better way to do this
+            Thread.sleep(2000);
+            try (DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost", invocationsPort)) {
+                Future<String> found = exec.submit(() -> GET(httpPort, "testing", "test"));
+                DRPCRequest request = getNextAvailableRequest(invoke, "testing");
+                assertNotNull(request);
+                assertEquals("test", request.get_func_args());
+                assertNotNull(request.get_request_id());
+                invoke.result(request.get_request_id(), "tested");
+                String result = found.get(1000, TimeUnit.MILLISECONDS);
+                assertEquals("tested", result);
+            }
+        }
+    }
+    
+    @Test
+    public void testFailedHttpGet() throws Exception {
+        LOG.info("STARTING HTTP GET (FAIL) TEST...");
+        int drpcPort = Utils.getAvailablePort();
+        int invocationsPort = Utils.getAvailablePort(drpcPort + 1);
+        int httpPort = Utils.getAvailablePort(invocationsPort + 1);
+        Map<String, Object> conf = getConf(drpcPort, invocationsPort, httpPort);
+        try (DRPCServer server = new DRPCServer(conf)) {
+            exec.submit(() -> {
+                server.start();
+                return null;
+            });
+            //TODO need a better way to do this
+            Thread.sleep(2000);
+            try (DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost", invocationsPort)) {
+                Future<String> found = exec.submit(() -> GET(httpPort, "testing", "test"));
+                DRPCRequest request = getNextAvailableRequest(invoke, "testing");
+                assertNotNull(request);
+                assertEquals("test", request.get_func_args());
+                assertNotNull(request.get_request_id());
+                invoke.getClient().failRequest(request.get_request_id());
+                try {
+                    found.get(1000, TimeUnit.MILLISECONDS);
+                    fail("exec did not throw an exception");
+                } catch (ExecutionException e) {
+                    LOG.warn("Got Expected Exception", e);
+                    //Getting the exact response code is a bit more complex.
+                    //TODO should use a better client
+                }
+            }
+        }
+    }
+}