You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/12/19 00:54:14 UTC
[4/8] storm git commit: refactored to move to new modules
refactored to move to new modules
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f0b18d94
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f0b18d94
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f0b18d94
Branch: refs/heads/master
Commit: f0b18d9493a58ae6179b8011e64330377948c8c4
Parents: ea44062
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue Dec 6 17:12:48 2016 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Tue Dec 6 17:12:48 2016 -0600
----------------------------------------------------------------------
pom.xml | 1 +
storm-core/pom.xml | 12 --
.../apache/storm/daemon/drpc/DRPCServer.java | 168 ---------------
.../webapp/AuthorizationExceptionMapper.java | 39 ----
.../daemon/drpc/webapp/DRPCApplication.java | 47 ----
.../daemon/drpc/webapp/DRPCExceptionMapper.java | 61 ------
.../storm/daemon/drpc/webapp/DRPCResource.java | 63 ------
.../daemon/drpc/webapp/ReqContextFilter.java | 69 ------
.../src/jvm/org/apache/storm/ui/UIHelpers.java | 7 -
.../storm/daemon/drpc/DRPCServerTest.java | 214 -------------------
storm-drpc-server/pom.xml | 162 ++++++++++++++
.../apache/storm/daemon/drpc/DRPCServer.java | 180 ++++++++++++++++
.../webapp/AuthorizationExceptionMapper.java | 39 ++++
.../daemon/drpc/webapp/DRPCApplication.java | 47 ++++
.../daemon/drpc/webapp/DRPCExceptionMapper.java | 61 ++++++
.../storm/daemon/drpc/webapp/DRPCResource.java | 63 ++++++
.../daemon/drpc/webapp/ReqContextFilter.java | 69 ++++++
.../storm/daemon/drpc/DRPCServerTest.java | 214 +++++++++++++++++++
18 files changed, 836 insertions(+), 680 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/f0b18d94/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6f1ba86..4cc637f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -308,6 +308,7 @@
<module>storm-buildtools/maven-shade-clojure-transformer</module>
<module>storm-buildtools/storm-maven-plugins</module>
<module>storm-core</module>
+ <module>storm-drpc-server</module>
<module>storm-rename-hack</module>
<module>storm-clojure</module>
http://git-wip-us.apache.org/repos/asf/storm/blob/f0b18d94/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index 9c47914..b6424d7 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -372,18 +372,6 @@
</exclusion>
</exclusions>
</dependency>
- <dependency>
- <groupId>org.glassfish.jersey.core</groupId>
- <artifactId>jersey-server</artifactId>
- </dependency>
- <dependency>
- <groupId>org.glassfish.jersey.containers</groupId>
- <artifactId>jersey-container-servlet-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.glassfish.jersey.containers</groupId>
- <artifactId>jersey-container-jetty-http</artifactId>
- </dependency>
</dependencies>
<build>
<sourceDirectory>src/jvm</sourceDirectory>
http://git-wip-us.apache.org/repos/asf/storm/blob/f0b18d94/storm-core/src/jvm/org/apache/storm/daemon/drpc/DRPCServer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/drpc/DRPCServer.java b/storm-core/src/jvm/org/apache/storm/daemon/drpc/DRPCServer.java
deleted file mode 100644
index 34dba76..0000000
--- a/storm-core/src/jvm/org/apache/storm/daemon/drpc/DRPCServer.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.drpc;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.storm.Config;
-import org.apache.storm.daemon.drpc.webapp.DRPCApplication;
-import org.apache.storm.generated.DistributedRPC;
-import org.apache.storm.generated.DistributedRPCInvocations;
-import org.apache.storm.metric.StormMetricsRegistry;
-import org.apache.storm.security.auth.ThriftConnectionType;
-import org.apache.storm.security.auth.ThriftServer;
-import org.apache.storm.ui.FilterConfiguration;
-import org.apache.storm.ui.UIHelpers;
-import org.apache.storm.utils.ConfigUtils;
-import org.apache.storm.utils.Utils;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.servlet.ServletContextHandler;
-import org.eclipse.jetty.servlet.ServletHolder;
-import org.glassfish.jersey.servlet.ServletContainer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.codahale.metrics.Meter;
-import com.google.common.annotations.VisibleForTesting;
-
-public class DRPCServer implements AutoCloseable {
- private static final Logger LOG = LoggerFactory.getLogger(DRPCServer.class);
- private final static Meter meterShutdownCalls = StormMetricsRegistry.registerMeter("drpc:num-shutdown-calls");
-
- private static ThriftServer mkHandlerServer(final DistributedRPC.Iface service, Integer port, Map<String, Object> conf) {
- ThriftServer ret = null;
- if (port != null && port > 0) {
- ret = new ThriftServer(conf, new DistributedRPC.Processor<>(service),
- ThriftConnectionType.DRPC);
- }
- return ret;
- }
-
- private static ThriftServer mkInvokeServer(final DistributedRPCInvocations.Iface service, int port, Map<String, Object> conf) {
- return new ThriftServer(conf, new DistributedRPCInvocations.Processor<>(service),
- ThriftConnectionType.DRPC_INVOCATIONS);
- }
-
- private static Server mkHttpServer(Map<String, Object> conf, DRPC drpc) {
- Integer drpcHttpPort = (Integer) conf.get(Config.DRPC_HTTP_PORT);
- Server ret = null;
- if (drpcHttpPort != null && drpcHttpPort > 0) {
- LOG.info("Starting RPC HTTP servers...");
- String filterClass = (String) (conf.get(Config.DRPC_HTTP_FILTER));
- @SuppressWarnings("unchecked")
- Map<String, String> filterParams = (Map<String, String>) (conf.get(Config.DRPC_HTTP_FILTER_PARAMS));
- FilterConfiguration filterConfiguration = new FilterConfiguration(filterClass, filterParams);
- final List<FilterConfiguration> filterConfigurations = Arrays.asList(filterConfiguration);
- final Integer httpsPort = Utils.getInt(conf.get(Config.DRPC_HTTPS_PORT), 0);
- final String httpsKsPath = (String) (conf.get(Config.DRPC_HTTPS_KEYSTORE_PATH));
- final String httpsKsPassword = (String) (conf.get(Config.DRPC_HTTPS_KEYSTORE_PASSWORD));
- final String httpsKsType = (String) (conf.get(Config.DRPC_HTTPS_KEYSTORE_TYPE));
- final String httpsKeyPassword = (String) (conf.get(Config.DRPC_HTTPS_KEY_PASSWORD));
- final String httpsTsPath = (String) (conf.get(Config.DRPC_HTTPS_TRUSTSTORE_PATH));
- final String httpsTsPassword = (String) (conf.get(Config.DRPC_HTTPS_TRUSTSTORE_PASSWORD));
- final String httpsTsType = (String) (conf.get(Config.DRPC_HTTPS_TRUSTSTORE_TYPE));
- final Boolean httpsWantClientAuth = (Boolean) (conf.get(Config.DRPC_HTTPS_WANT_CLIENT_AUTH));
- final Boolean httpsNeedClientAuth = (Boolean) (conf.get(Config.DRPC_HTTPS_NEED_CLIENT_AUTH));
-
- //TODO a better way to do this would be great.
- DRPCApplication.setup(drpc);
- ret = UIHelpers.jettyCreateServer(drpcHttpPort, null, httpsPort);
-
- UIHelpers.configSsl(ret, httpsPort, httpsKsPath, httpsKsPassword, httpsKsType, httpsKeyPassword, httpsTsPath, httpsTsPassword, httpsTsType,
- httpsNeedClientAuth, httpsWantClientAuth);
-
- ServletContextHandler context = new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
- context.setContextPath("/");
- ret.setHandler(context);
-
- ServletHolder jerseyServlet = context.addServlet(ServletContainer.class, "/*");
- jerseyServlet.setInitOrder(1);
- jerseyServlet.setInitParameter("javax.ws.rs.Application", DRPCApplication.class.getName());
-
- UIHelpers.configFilters(context, filterConfigurations);
- UIHelpers.addRequestContextFilter(context, Config.DRPC_HTTP_CREDS_PLUGIN, conf);
- }
- return ret;
- }
-
- private final DRPC _drpc;
- private final ThriftServer _handlerServer;
- private final ThriftServer _invokeServer;
- private final Server _httpServer;
- private boolean _closed = false;
-
- public DRPCServer(Map<String, Object> conf) {
- _drpc = new DRPC(conf);
- DRPCThrift thrift = new DRPCThrift(_drpc);
- _handlerServer = mkHandlerServer(thrift, Utils.getInt(conf.get(Config.DRPC_PORT), null), conf);
- _invokeServer = mkInvokeServer(thrift, Utils.getInt(conf.get(Config.DRPC_INVOCATIONS_PORT), 3773), conf);
- _httpServer = mkHttpServer(conf, _drpc);
- }
-
- @VisibleForTesting
- void start() throws Exception {
- LOG.info("Starting Distributed RPC servers...");
- new Thread(() -> _invokeServer.serve()).start();
-
- if (_httpServer != null) {
- _httpServer.start();
- }
-
- if (_handlerServer != null) {
- _handlerServer.serve();
- } else {
- _httpServer.join();
- }
- }
-
- @Override
- public synchronized void close() {
- if (!_closed) {
- //This is kind of useless...
- meterShutdownCalls.mark();
-
- if (_handlerServer != null) {
- _handlerServer.stop();
- }
-
- if (_invokeServer != null) {
- _invokeServer.stop();
- }
-
- //TODO this is causing issues...
- //if (_httpServer != null) {
- // _httpServer.destroy();
- //}
-
- _drpc.close();
- _closed = true;
- }
- }
-
- public static void main(String [] args) throws Exception {
- Utils.setupDefaultUncaughtExceptionHandler();
- Map<String, Object> conf = ConfigUtils.readStormConfig();
- try (DRPCServer server = new DRPCServer(conf)) {
- Utils.addShutdownHookWithForceKillIn1Sec(() -> server.close());
- StormMetricsRegistry.startMetricsReporters(conf);
- server.start();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/f0b18d94/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/AuthorizationExceptionMapper.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/AuthorizationExceptionMapper.java b/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/AuthorizationExceptionMapper.java
deleted file mode 100644
index 75c3100..0000000
--- a/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/AuthorizationExceptionMapper.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.drpc.webapp;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.ws.rs.core.Response;
-import javax.ws.rs.ext.ExceptionMapper;
-import javax.ws.rs.ext.Provider;
-
-import org.apache.storm.generated.AuthorizationException;
-import org.json.simple.JSONValue;
-
-@Provider
-public class AuthorizationExceptionMapper implements ExceptionMapper<AuthorizationException> {
- @Override
- public Response toResponse(AuthorizationException ex) {
- Map<String, String> body = new HashMap<>();
- body.put("error", "Not Authorized");
- body.put("errorMessage", ex.get_msg());
- return Response.status(403).entity(JSONValue.toJSONString(body)).type("application/json").build();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/f0b18d94/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCApplication.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCApplication.java b/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCApplication.java
deleted file mode 100644
index 4a0ce8c..0000000
--- a/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCApplication.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.drpc.webapp;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import javax.ws.rs.ApplicationPath;
-import javax.ws.rs.core.Application;
-
-import org.apache.storm.daemon.drpc.DRPC;
-
-@ApplicationPath("")
-public class DRPCApplication extends Application {
- private static DRPC _drpc;
- private final Set<Object> singletons = new HashSet<Object>();
-
- public DRPCApplication() {
- singletons.add(new DRPCResource(_drpc));
- singletons.add(new DRPCExceptionMapper());
- singletons.add(new AuthorizationExceptionMapper());
- }
-
- @Override
- public Set<Object> getSingletons() {
- return singletons;
- }
-
- public static void setup(DRPC drpc) {
- _drpc = drpc;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/f0b18d94/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCExceptionMapper.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCExceptionMapper.java b/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCExceptionMapper.java
deleted file mode 100644
index 60cfc93..0000000
--- a/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCExceptionMapper.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.drpc.webapp;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.ResponseBuilder;
-import javax.ws.rs.ext.ExceptionMapper;
-import javax.ws.rs.ext.Provider;
-
-import org.apache.storm.generated.DRPCExecutionException;
-import org.json.simple.JSONValue;
-
-@Provider
-public class DRPCExceptionMapper implements ExceptionMapper<DRPCExecutionException> {
-
- @Override
- public Response toResponse(DRPCExecutionException ex) {
- ResponseBuilder builder = Response.status(500);
- switch (ex.get_type()) {
- case FAILED_REQUEST:
- builder.status(400);
- break;
- case SERVER_SHUTDOWN:
- builder.status(503); //Not available
- break;
- case SERVER_TIMEOUT:
- builder.status(504); //proxy timeout
- break;
- case INTERNAL_ERROR:
- //fall throw on purpose
- default:
- //Empty (Still 500)
- break;
-
- }
- Map<String, String> body = new HashMap<>();
- //TODO I would love to standardize this...
- body.put("error", ex.is_set_type() ? ex.get_type().toString() : "Internal Error");
- body.put("errorMessage", ex.get_msg());
- return builder.entity(JSONValue.toJSONString(body)).type("application/json").build();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/f0b18d94/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCResource.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCResource.java b/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCResource.java
deleted file mode 100644
index d6490af..0000000
--- a/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCResource.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.drpc.webapp;
-
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.core.Context;
-
-import org.apache.storm.daemon.drpc.DRPC;
-import org.apache.storm.metric.StormMetricsRegistry;
-import org.apache.thrift.TException;
-
-import com.codahale.metrics.Meter;
-
-@Path("/drpc/")
-public class DRPCResource {
- private static final Meter meterHttpRequests = StormMetricsRegistry.registerMeter("drpc:num-execute-http-requests");
- private final DRPC _drpc;
- public DRPCResource(DRPC drpc) {
- _drpc = drpc;
- }
-
- //TODO put in some better exception mapping...
- //TODO move populateContext to a filter...
- @POST
- @Path("/{func}")
- public String post(@PathParam("func") String func, String args, @Context HttpServletRequest request) throws TException {
- meterHttpRequests.mark();
- return _drpc.executeBlocking(func, args);
- }
-
- @GET
- @Path("/{func}/{args}")
- public String get(@PathParam("func") String func, @PathParam("args") String args, @Context HttpServletRequest request) throws TException {
- meterHttpRequests.mark();
- return _drpc.executeBlocking(func, args);
- }
-
- @GET
- @Path("/{func}")
- public String get(@PathParam("func") String func, @Context HttpServletRequest request) throws TException {
- meterHttpRequests.mark();
- return _drpc.executeBlocking(func, "");
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/f0b18d94/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/ReqContextFilter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/ReqContextFilter.java b/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/ReqContextFilter.java
deleted file mode 100644
index 521901a..0000000
--- a/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/ReqContextFilter.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.drpc.webapp;
-import java.io.IOException;
-
-import javax.servlet.Filter;
-import javax.servlet.FilterChain;
-import javax.servlet.FilterConfig;
-import javax.servlet.ServletException;
-import javax.servlet.ServletRequest;
-import javax.servlet.ServletResponse;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.apache.storm.security.auth.IHttpCredentialsPlugin;
-import org.apache.storm.security.auth.ReqContext;
-
-public class ReqContextFilter implements Filter {
- private final IHttpCredentialsPlugin _httpCredsHandler;
-
- public ReqContextFilter(IHttpCredentialsPlugin httpCredsHandler) {
- _httpCredsHandler = httpCredsHandler;
- }
-
- /**
- * Populate the Storm RequestContext from an servlet request. This should be called in each handler
- * @param request the request to populate
- */
- public void populateContext(HttpServletRequest request) {
- if (_httpCredsHandler != null) {
- _httpCredsHandler.populateContext(ReqContext.context(), request);
- }
- }
-
- public void init(FilterConfig config) throws ServletException {
- //NOOP
- //We could add in configs through the web.xml if we wanted something stand alone here...
- }
-
- public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
- handle((HttpServletRequest)request, (HttpServletResponse)response, chain);
- }
-
- public void handle(HttpServletRequest request, HttpServletResponse response, FilterChain chain) throws IOException, ServletException{
- if (request != null) {
- populateContext(request);
- }
- chain.doFilter(request, response);
- }
-
- public void destroy() {
- //NOOP
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/f0b18d94/storm-core/src/jvm/org/apache/storm/ui/UIHelpers.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/ui/UIHelpers.java b/storm-core/src/jvm/org/apache/storm/ui/UIHelpers.java
index e97d52d..2f0cce1 100644
--- a/storm-core/src/jvm/org/apache/storm/ui/UIHelpers.java
+++ b/storm-core/src/jvm/org/apache/storm/ui/UIHelpers.java
@@ -21,7 +21,6 @@ import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import org.apache.storm.Config;
-import org.apache.storm.daemon.drpc.webapp.ReqContextFilter;
import org.apache.storm.generated.ExecutorInfo;
import org.apache.storm.logging.filters.AccessLoggingFilter;
import org.apache.storm.security.auth.AuthUtils;
@@ -208,12 +207,6 @@ public class UIHelpers {
context.addFilter(mkAccessLoggingFilterHandle(), "/*", EnumSet.allOf(DispatcherType.class));
}
- public static void addRequestContextFilter(ServletContextHandler context, String configName, Map<String, Object> conf) {
- IHttpCredentialsPlugin auth = AuthUtils.GetHttpCredentialsPlugin(conf, (String)conf.get(configName));
- ReqContextFilter filter = new ReqContextFilter(auth);
- context.addFilter(new FilterHolder(filter), "/*", FilterMapping.ALL);
- }
-
private static Server removeNonSslConnector(Server server) {
for (Connector c : server.getConnectors()) {
if (c != null && !(c instanceof SslSocketConnector)) {
http://git-wip-us.apache.org/repos/asf/storm/blob/f0b18d94/storm-core/test/jvm/org/apache/storm/daemon/drpc/DRPCServerTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/daemon/drpc/DRPCServerTest.java b/storm-core/test/jvm/org/apache/storm/daemon/drpc/DRPCServerTest.java
deleted file mode 100644
index b2df441..0000000
--- a/storm-core/test/jvm/org/apache/storm/daemon/drpc/DRPCServerTest.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon.drpc;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-
-import java.io.InputStream;
-import java.net.URL;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.storm.Config;
-import org.apache.storm.daemon.drpc.DRPCServer;
-import org.apache.storm.drpc.DRPCInvocationsClient;
-import org.apache.storm.generated.DRPCExecutionException;
-import org.apache.storm.generated.DRPCRequest;
-import org.apache.storm.security.auth.SimpleTransportPlugin;
-import org.apache.storm.utils.DRPCClient;
-import org.apache.storm.utils.Utils;
-import org.junit.AfterClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class DRPCServerTest {
- private static final Logger LOG = LoggerFactory.getLogger(DRPCServerTest.class);
- private static final ExecutorService exec = Executors.newCachedThreadPool();
-
- @AfterClass
- public static void close() {
- exec.shutdownNow();
- }
-
- private static DRPCRequest getNextAvailableRequest(DRPCInvocationsClient invoke, String func) throws Exception {
- DRPCRequest request = null;
- long timedout = System.currentTimeMillis() + 5_000;
- while (System.currentTimeMillis() < timedout) {
- request = invoke.getClient().fetchRequest(func);
- if (request != null && request.get_request_id() != null && !request.get_request_id().isEmpty()) {
- return request;
- }
- Thread.sleep(1);
- }
- fail("Test timed out waiting for a request on " + func);
- return request;
- }
-
- private Map<String, Object> getConf(int drpcPort, int invocationsPort, Integer httpPort) {
- Map<String, Object> conf = new HashMap<>();
- conf.put(Config.DRPC_PORT, drpcPort);
- conf.put(Config.DRPC_INVOCATIONS_PORT, invocationsPort);
- conf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, SimpleTransportPlugin.class.getName());
- conf.put(Config.DRPC_WORKER_THREADS, 5);
- conf.put(Config.DRPC_INVOCATIONS_THREADS, 5);
- conf.put(Config.DRPC_MAX_BUFFER_SIZE, 1048576);
- conf.put(Config.STORM_NIMBUS_RETRY_TIMES, 2);
- conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL, 10);
- conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING, 100);
- if (httpPort != null) {
- conf.put(Config.DRPC_HTTP_PORT, httpPort);
- }
- return conf;
- }
-
- @Test
- public void testGoodThrift() throws Exception {
- int drpcPort = Utils.getAvailablePort();
- int invocationsPort = Utils.getAvailablePort(drpcPort + 1);
- Map<String, Object> conf = getConf(drpcPort, invocationsPort, null);
- try (DRPCServer server = new DRPCServer(conf)) {
- exec.submit(() -> {
- server.start();
- return null;
- });
- try (DRPCClient client = new DRPCClient(conf, "localhost", drpcPort);
- DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost", invocationsPort)) {
- Future<String> found = exec.submit(() -> client.getClient().execute("testing", "test"));
- DRPCRequest request = getNextAvailableRequest(invoke, "testing");
- assertNotNull(request);
- assertEquals("test", request.get_func_args());
- assertNotNull(request.get_request_id());
- invoke.result(request.get_request_id(), "tested");
- String result = found.get(1000, TimeUnit.MILLISECONDS);
- assertEquals("tested", result);
- }
- }
- }
-
- @Test
- public void testFailedThrift() throws Exception {
- int drpcPort = Utils.getAvailablePort();
- int invocationsPort = Utils.getAvailablePort(drpcPort + 1);
- Map<String, Object> conf = getConf(drpcPort, invocationsPort, null);
- try (DRPCServer server = new DRPCServer(conf)) {
- exec.submit(() -> {
- server.start();
- return null;
- });
- try (DRPCClient client = new DRPCClient(conf, "localhost", drpcPort);
- DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost", invocationsPort)) {
- Future<String> found = exec.submit(() -> client.getClient().execute("testing", "test"));
- DRPCRequest request = getNextAvailableRequest(invoke, "testing");
- assertNotNull(request);
- assertEquals("test", request.get_func_args());
- assertNotNull(request.get_request_id());
- invoke.failRequest(request.get_request_id());
- try {
- found.get(1000, TimeUnit.MILLISECONDS);
- fail("exec did not throw an exception");
- } catch (ExecutionException e) {
- Throwable t = e.getCause();
- assertEquals(t.getClass(), DRPCExecutionException.class);
- //Don't know a better way to validate that it failed.
- assertEquals("Request failed", ((DRPCExecutionException)t).get_msg());
- }
- }
- }
- }
-
- public static String GET(int port, String func, String args) {
- try {
- URL url = new URL("http://localhost:"+port+"/drpc/"+func+"/"+args);
- InputStream in = url.openStream();
- byte[] buffer = new byte[1024];
- int read = in.read(buffer);
- return new String(buffer, 0, read);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @Test
- public void testGoodHttpGet() throws Exception {
- LOG.info("STARTING HTTP GET TEST...");
- int drpcPort = Utils.getAvailablePort();
- int invocationsPort = Utils.getAvailablePort(drpcPort + 1);
- int httpPort = Utils.getAvailablePort(invocationsPort + 1);
- Map<String, Object> conf = getConf(drpcPort, invocationsPort, httpPort);
- try (DRPCServer server = new DRPCServer(conf)) {
- exec.submit(() -> {
- server.start();
- return null;
- });
- //TODO need a better way to do this
- Thread.sleep(2000);
- try (DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost", invocationsPort)) {
- Future<String> found = exec.submit(() -> GET(httpPort, "testing", "test"));
- DRPCRequest request = getNextAvailableRequest(invoke, "testing");
- assertNotNull(request);
- assertEquals("test", request.get_func_args());
- assertNotNull(request.get_request_id());
- invoke.result(request.get_request_id(), "tested");
- String result = found.get(1000, TimeUnit.MILLISECONDS);
- assertEquals("tested", result);
- }
- }
- }
-
- @Test
- public void testFailedHttpGet() throws Exception {
- LOG.info("STARTING HTTP GET (FAIL) TEST...");
- int drpcPort = Utils.getAvailablePort();
- int invocationsPort = Utils.getAvailablePort(drpcPort + 1);
- int httpPort = Utils.getAvailablePort(invocationsPort + 1);
- Map<String, Object> conf = getConf(drpcPort, invocationsPort, httpPort);
- try (DRPCServer server = new DRPCServer(conf)) {
- exec.submit(() -> {
- server.start();
- return null;
- });
- //TODO need a better way to do this
- Thread.sleep(2000);
- try (DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost", invocationsPort)) {
- Future<String> found = exec.submit(() -> GET(httpPort, "testing", "test"));
- DRPCRequest request = getNextAvailableRequest(invoke, "testing");
- assertNotNull(request);
- assertEquals("test", request.get_func_args());
- assertNotNull(request.get_request_id());
- invoke.getClient().failRequest(request.get_request_id());
- try {
- found.get(1000, TimeUnit.MILLISECONDS);
- fail("exec did not throw an exception");
- } catch (ExecutionException e) {
- LOG.warn("Got Expected Exception", e);
- //Getting the exact response code is a bit more complex.
- //TODO should use a better client
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/f0b18d94/storm-drpc-server/pom.xml
----------------------------------------------------------------------
diff --git a/storm-drpc-server/pom.xml b/storm-drpc-server/pom.xml
new file mode 100644
index 0000000..d028b0a
--- /dev/null
+++ b/storm-drpc-server/pom.xml
@@ -0,0 +1,162 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>storm-drpc-server</artifactId>
+ <packaging>jar</packaging>
+ <name>Storm DRPC Server</name>
+ <description>DRPC Server for Apache Storm</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.googlecode.json-simple</groupId>
+ <artifactId>json-simple</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.17</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ </dependency>
+ <!-- hamcrest-core dependency is shaded inside the mockito-all and junit depends on newer version of hamcrest-core.
+ To give higher precedence to classes from newer version of hamcrest-core, Junit has been placed above mockito.
+ -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.core</groupId>
+ <artifactId>jersey-server</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.containers</groupId>
+ <artifactId>jersey-container-servlet-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.containers</groupId>
+ <artifactId>jersey-container-jetty-http</artifactId>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-report-plugin</artifactId>
+ <configuration>
+ <reportsDirectories>
+ <file>${project.build.directory}/test-reports</file>
+ </reportsDirectories>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <configuration>
+ <reportsDirectory>${project.build.directory}/test-reports</reportsDirectory>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <version>2.8</version>
+ <executions>
+ <execution>
+ <id>copy-dependencies</id>
+ <phase>package</phase>
+ <goals>
+ <goal>copy-dependencies</goal>
+ </goals>
+ <configuration>
+ <overWriteReleases>false</overWriteReleases>
+ <overWriteSnapshots>false</overWriteSnapshots>
+ <overWriteIfNewer>true</overWriteIfNewer>
+ <includeScope>runtime</includeScope>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>2.2.1</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <!-- avoid warning about recursion -->
+ <goal>jar-no-fork</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.6</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/f0b18d94/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/DRPCServer.java
----------------------------------------------------------------------
diff --git a/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/DRPCServer.java b/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/DRPCServer.java
new file mode 100644
index 0000000..77862b9
--- /dev/null
+++ b/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/DRPCServer.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.drpc;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.Config;
+import org.apache.storm.daemon.drpc.webapp.DRPCApplication;
+import org.apache.storm.daemon.drpc.webapp.ReqContextFilter;
+import org.apache.storm.generated.DistributedRPC;
+import org.apache.storm.generated.DistributedRPCInvocations;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.security.auth.AuthUtils;
+import org.apache.storm.security.auth.IHttpCredentialsPlugin;
+import org.apache.storm.security.auth.ThriftConnectionType;
+import org.apache.storm.security.auth.ThriftServer;
+import org.apache.storm.ui.FilterConfiguration;
+import org.apache.storm.ui.UIHelpers;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.shade.org.eclipse.jetty.server.Server;
+import org.apache.storm.shade.org.eclipse.jetty.servlet.FilterHolder;
+import org.apache.storm.shade.org.eclipse.jetty.servlet.FilterMapping;
+import org.apache.storm.shade.org.eclipse.jetty.servlet.ServletContextHandler;
+import org.apache.storm.shade.org.eclipse.jetty.servlet.ServletHolder;
+import org.glassfish.jersey.servlet.ServletContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.shade.com.codahale.metrics.Meter;
+import com.google.common.annotations.VisibleForTesting;
+
+public class DRPCServer implements AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(DRPCServer.class);
+ private final static Meter meterShutdownCalls = StormMetricsRegistry.registerMeter("drpc:num-shutdown-calls");
+
+ //TODO in the future this might be better in a common webapp location
+ public static void addRequestContextFilter(ServletContextHandler context, String configName, Map<String, Object> conf) {
+ IHttpCredentialsPlugin auth = AuthUtils.GetHttpCredentialsPlugin(conf, (String)conf.get(configName));
+ ReqContextFilter filter = new ReqContextFilter(auth);
+ context.addFilter(new FilterHolder(filter), "/*", FilterMapping.ALL);
+ }
+
+ private static ThriftServer mkHandlerServer(final DistributedRPC.Iface service, Integer port, Map<String, Object> conf) {
+ ThriftServer ret = null;
+ if (port != null && port > 0) {
+ ret = new ThriftServer(conf, new DistributedRPC.Processor<>(service),
+ ThriftConnectionType.DRPC);
+ }
+ return ret;
+ }
+
+ private static ThriftServer mkInvokeServer(final DistributedRPCInvocations.Iface service, int port, Map<String, Object> conf) {
+ return new ThriftServer(conf, new DistributedRPCInvocations.Processor<>(service),
+ ThriftConnectionType.DRPC_INVOCATIONS);
+ }
+
+ private static Server mkHttpServer(Map<String, Object> conf, DRPC drpc) {
+ Integer drpcHttpPort = (Integer) conf.get(Config.DRPC_HTTP_PORT);
+ Server ret = null;
+ if (drpcHttpPort != null && drpcHttpPort > 0) {
+ LOG.info("Starting RPC HTTP servers...");
+ String filterClass = (String) (conf.get(Config.DRPC_HTTP_FILTER));
+ @SuppressWarnings("unchecked")
+ Map<String, String> filterParams = (Map<String, String>) (conf.get(Config.DRPC_HTTP_FILTER_PARAMS));
+ FilterConfiguration filterConfiguration = new FilterConfiguration(filterClass, filterParams);
+ final List<FilterConfiguration> filterConfigurations = Arrays.asList(filterConfiguration);
+ final Integer httpsPort = Utils.getInt(conf.get(Config.DRPC_HTTPS_PORT), 0);
+ final String httpsKsPath = (String) (conf.get(Config.DRPC_HTTPS_KEYSTORE_PATH));
+ final String httpsKsPassword = (String) (conf.get(Config.DRPC_HTTPS_KEYSTORE_PASSWORD));
+ final String httpsKsType = (String) (conf.get(Config.DRPC_HTTPS_KEYSTORE_TYPE));
+ final String httpsKeyPassword = (String) (conf.get(Config.DRPC_HTTPS_KEY_PASSWORD));
+ final String httpsTsPath = (String) (conf.get(Config.DRPC_HTTPS_TRUSTSTORE_PATH));
+ final String httpsTsPassword = (String) (conf.get(Config.DRPC_HTTPS_TRUSTSTORE_PASSWORD));
+ final String httpsTsType = (String) (conf.get(Config.DRPC_HTTPS_TRUSTSTORE_TYPE));
+ final Boolean httpsWantClientAuth = (Boolean) (conf.get(Config.DRPC_HTTPS_WANT_CLIENT_AUTH));
+ final Boolean httpsNeedClientAuth = (Boolean) (conf.get(Config.DRPC_HTTPS_NEED_CLIENT_AUTH));
+
+ //TODO a better way to do this would be great.
+ DRPCApplication.setup(drpc);
+ ret = UIHelpers.jettyCreateServer(drpcHttpPort, null, httpsPort);
+
+ UIHelpers.configSsl(ret, httpsPort, httpsKsPath, httpsKsPassword, httpsKsType, httpsKeyPassword, httpsTsPath, httpsTsPassword, httpsTsType,
+ httpsNeedClientAuth, httpsWantClientAuth);
+
+ ServletContextHandler context = new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
+ context.setContextPath("/");
+ ret.setHandler(context);
+
+ ServletHolder jerseyServlet = context.addServlet(ServletContainer.class, "/*");
+ jerseyServlet.setInitOrder(1);
+ jerseyServlet.setInitParameter("javax.ws.rs.Application", DRPCApplication.class.getName());
+
+ UIHelpers.configFilters(context, filterConfigurations);
+ addRequestContextFilter(context, Config.DRPC_HTTP_CREDS_PLUGIN, conf);
+ }
+ return ret;
+ }
+
+ private final DRPC _drpc;
+ private final ThriftServer _handlerServer;
+ private final ThriftServer _invokeServer;
+ private final Server _httpServer;
+ private boolean _closed = false;
+
+ public DRPCServer(Map<String, Object> conf) {
+ _drpc = new DRPC(conf);
+ DRPCThrift thrift = new DRPCThrift(_drpc);
+ _handlerServer = mkHandlerServer(thrift, Utils.getInt(conf.get(Config.DRPC_PORT), null), conf);
+ _invokeServer = mkInvokeServer(thrift, Utils.getInt(conf.get(Config.DRPC_INVOCATIONS_PORT), 3773), conf);
+ _httpServer = mkHttpServer(conf, _drpc);
+ }
+
+ @VisibleForTesting
+ void start() throws Exception {
+ LOG.info("Starting Distributed RPC servers...");
+ new Thread(() -> _invokeServer.serve()).start();
+
+ if (_httpServer != null) {
+ _httpServer.start();
+ }
+
+ if (_handlerServer != null) {
+ _handlerServer.serve();
+ } else {
+ _httpServer.join();
+ }
+ }
+
+ @Override
+ public synchronized void close() {
+ if (!_closed) {
+ //This is kind of useless...
+ meterShutdownCalls.mark();
+
+ if (_handlerServer != null) {
+ _handlerServer.stop();
+ }
+
+ if (_invokeServer != null) {
+ _invokeServer.stop();
+ }
+
+ //TODO this is causing issues...
+ //if (_httpServer != null) {
+ // _httpServer.destroy();
+ //}
+
+ _drpc.close();
+ _closed = true;
+ }
+ }
+
+ public static void main(String [] args) throws Exception {
+ Utils.setupDefaultUncaughtExceptionHandler();
+ Map<String, Object> conf = ConfigUtils.readStormConfig();
+ try (DRPCServer server = new DRPCServer(conf)) {
+ Utils.addShutdownHookWithForceKillIn1Sec(() -> server.close());
+ StormMetricsRegistry.startMetricsReporters(conf);
+ server.start();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f0b18d94/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/webapp/AuthorizationExceptionMapper.java
----------------------------------------------------------------------
diff --git a/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/webapp/AuthorizationExceptionMapper.java b/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/webapp/AuthorizationExceptionMapper.java
new file mode 100644
index 0000000..75c3100
--- /dev/null
+++ b/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/webapp/AuthorizationExceptionMapper.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.drpc.webapp;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.ExceptionMapper;
+import javax.ws.rs.ext.Provider;
+
+import org.apache.storm.generated.AuthorizationException;
+import org.json.simple.JSONValue;
+
+@Provider
+public class AuthorizationExceptionMapper implements ExceptionMapper<AuthorizationException> {
+ @Override
+ public Response toResponse(AuthorizationException ex) {
+ Map<String, String> body = new HashMap<>();
+ body.put("error", "Not Authorized");
+ body.put("errorMessage", ex.get_msg());
+ return Response.status(403).entity(JSONValue.toJSONString(body)).type("application/json").build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f0b18d94/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCApplication.java
----------------------------------------------------------------------
diff --git a/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCApplication.java b/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCApplication.java
new file mode 100644
index 0000000..4a0ce8c
--- /dev/null
+++ b/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCApplication.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.drpc.webapp;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.ws.rs.ApplicationPath;
+import javax.ws.rs.core.Application;
+
+import org.apache.storm.daemon.drpc.DRPC;
+
+@ApplicationPath("")
+public class DRPCApplication extends Application {
+ private static DRPC _drpc;
+ private final Set<Object> singletons = new HashSet<Object>();
+
+ public DRPCApplication() {
+ singletons.add(new DRPCResource(_drpc));
+ singletons.add(new DRPCExceptionMapper());
+ singletons.add(new AuthorizationExceptionMapper());
+ }
+
+ @Override
+ public Set<Object> getSingletons() {
+ return singletons;
+ }
+
+ public static void setup(DRPC drpc) {
+ _drpc = drpc;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/f0b18d94/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCExceptionMapper.java
----------------------------------------------------------------------
diff --git a/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCExceptionMapper.java b/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCExceptionMapper.java
new file mode 100644
index 0000000..60cfc93
--- /dev/null
+++ b/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCExceptionMapper.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.drpc.webapp;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.ResponseBuilder;
+import javax.ws.rs.ext.ExceptionMapper;
+import javax.ws.rs.ext.Provider;
+
+import org.apache.storm.generated.DRPCExecutionException;
+import org.json.simple.JSONValue;
+
+@Provider
+public class DRPCExceptionMapper implements ExceptionMapper<DRPCExecutionException> {
+
+ @Override
+ public Response toResponse(DRPCExecutionException ex) {
+ ResponseBuilder builder = Response.status(500);
+ switch (ex.get_type()) {
+ case FAILED_REQUEST:
+ builder.status(400);
+ break;
+ case SERVER_SHUTDOWN:
+ builder.status(503); //Not available
+ break;
+ case SERVER_TIMEOUT:
+ builder.status(504); //proxy timeout
+ break;
+ case INTERNAL_ERROR:
+ //fall throw on purpose
+ default:
+ //Empty (Still 500)
+ break;
+
+ }
+ Map<String, String> body = new HashMap<>();
+ //TODO I would love to standardize this...
+ body.put("error", ex.is_set_type() ? ex.get_type().toString() : "Internal Error");
+ body.put("errorMessage", ex.get_msg());
+ return builder.entity(JSONValue.toJSONString(body)).type("application/json").build();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f0b18d94/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCResource.java
----------------------------------------------------------------------
diff --git a/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCResource.java b/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCResource.java
new file mode 100644
index 0000000..cdd4817
--- /dev/null
+++ b/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCResource.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.drpc.webapp;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.core.Context;
+
+import org.apache.storm.daemon.drpc.DRPC;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.thrift.TException;
+
+import org.apache.storm.shade.com.codahale.metrics.Meter;
+
+@Path("/drpc/")
+public class DRPCResource {
+ private static final Meter meterHttpRequests = StormMetricsRegistry.registerMeter("drpc:num-execute-http-requests");
+ private final DRPC _drpc;
+ public DRPCResource(DRPC drpc) {
+ _drpc = drpc;
+ }
+
+ //TODO put in some better exception mapping...
+ //TODO move populateContext to a filter...
+ @POST
+ @Path("/{func}")
+ public String post(@PathParam("func") String func, String args, @Context HttpServletRequest request) throws TException {
+ meterHttpRequests.mark();
+ return _drpc.executeBlocking(func, args);
+ }
+
+ @GET
+ @Path("/{func}/{args}")
+ public String get(@PathParam("func") String func, @PathParam("args") String args, @Context HttpServletRequest request) throws TException {
+ meterHttpRequests.mark();
+ return _drpc.executeBlocking(func, args);
+ }
+
+ @GET
+ @Path("/{func}")
+ public String get(@PathParam("func") String func, @Context HttpServletRequest request) throws TException {
+ meterHttpRequests.mark();
+ return _drpc.executeBlocking(func, "");
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f0b18d94/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/webapp/ReqContextFilter.java
----------------------------------------------------------------------
diff --git a/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/webapp/ReqContextFilter.java b/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/webapp/ReqContextFilter.java
new file mode 100644
index 0000000..521901a
--- /dev/null
+++ b/storm-drpc-server/src/main/java/org/apache/storm/daemon/drpc/webapp/ReqContextFilter.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.drpc.webapp;
+import java.io.IOException;
+
+import javax.servlet.Filter;
+import javax.servlet.FilterChain;
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.storm.security.auth.IHttpCredentialsPlugin;
+import org.apache.storm.security.auth.ReqContext;
+
+public class ReqContextFilter implements Filter {
+ private final IHttpCredentialsPlugin _httpCredsHandler;
+
+ public ReqContextFilter(IHttpCredentialsPlugin httpCredsHandler) {
+ _httpCredsHandler = httpCredsHandler;
+ }
+
+ /**
+ * Populate the Storm RequestContext from an servlet request. This should be called in each handler
+ * @param request the request to populate
+ */
+ public void populateContext(HttpServletRequest request) {
+ if (_httpCredsHandler != null) {
+ _httpCredsHandler.populateContext(ReqContext.context(), request);
+ }
+ }
+
+ public void init(FilterConfig config) throws ServletException {
+ //NOOP
+ //We could add in configs through the web.xml if we wanted something stand alone here...
+ }
+
+ public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
+ handle((HttpServletRequest)request, (HttpServletResponse)response, chain);
+ }
+
+ public void handle(HttpServletRequest request, HttpServletResponse response, FilterChain chain) throws IOException, ServletException{
+ if (request != null) {
+ populateContext(request);
+ }
+ chain.doFilter(request, response);
+ }
+
+ public void destroy() {
+ //NOOP
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/f0b18d94/storm-drpc-server/src/test/java/org/apache/storm/daemon/drpc/DRPCServerTest.java
----------------------------------------------------------------------
diff --git a/storm-drpc-server/src/test/java/org/apache/storm/daemon/drpc/DRPCServerTest.java b/storm-drpc-server/src/test/java/org/apache/storm/daemon/drpc/DRPCServerTest.java
new file mode 100644
index 0000000..b2df441
--- /dev/null
+++ b/storm-drpc-server/src/test/java/org/apache/storm/daemon/drpc/DRPCServerTest.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.drpc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import java.io.InputStream;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.storm.Config;
+import org.apache.storm.daemon.drpc.DRPCServer;
+import org.apache.storm.drpc.DRPCInvocationsClient;
+import org.apache.storm.generated.DRPCExecutionException;
+import org.apache.storm.generated.DRPCRequest;
+import org.apache.storm.security.auth.SimpleTransportPlugin;
+import org.apache.storm.utils.DRPCClient;
+import org.apache.storm.utils.Utils;
+import org.junit.AfterClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class DRPCServerTest {
+ private static final Logger LOG = LoggerFactory.getLogger(DRPCServerTest.class);
+ private static final ExecutorService exec = Executors.newCachedThreadPool();
+
+ @AfterClass
+ public static void close() {
+ exec.shutdownNow();
+ }
+
+ private static DRPCRequest getNextAvailableRequest(DRPCInvocationsClient invoke, String func) throws Exception {
+ DRPCRequest request = null;
+ long timedout = System.currentTimeMillis() + 5_000;
+ while (System.currentTimeMillis() < timedout) {
+ request = invoke.getClient().fetchRequest(func);
+ if (request != null && request.get_request_id() != null && !request.get_request_id().isEmpty()) {
+ return request;
+ }
+ Thread.sleep(1);
+ }
+ fail("Test timed out waiting for a request on " + func);
+ return request;
+ }
+
+ private Map<String, Object> getConf(int drpcPort, int invocationsPort, Integer httpPort) {
+ Map<String, Object> conf = new HashMap<>();
+ conf.put(Config.DRPC_PORT, drpcPort);
+ conf.put(Config.DRPC_INVOCATIONS_PORT, invocationsPort);
+ conf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, SimpleTransportPlugin.class.getName());
+ conf.put(Config.DRPC_WORKER_THREADS, 5);
+ conf.put(Config.DRPC_INVOCATIONS_THREADS, 5);
+ conf.put(Config.DRPC_MAX_BUFFER_SIZE, 1048576);
+ conf.put(Config.STORM_NIMBUS_RETRY_TIMES, 2);
+ conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL, 10);
+ conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING, 100);
+ if (httpPort != null) {
+ conf.put(Config.DRPC_HTTP_PORT, httpPort);
+ }
+ return conf;
+ }
+
+ @Test
+ public void testGoodThrift() throws Exception {
+ int drpcPort = Utils.getAvailablePort();
+ int invocationsPort = Utils.getAvailablePort(drpcPort + 1);
+ Map<String, Object> conf = getConf(drpcPort, invocationsPort, null);
+ try (DRPCServer server = new DRPCServer(conf)) {
+ exec.submit(() -> {
+ server.start();
+ return null;
+ });
+ try (DRPCClient client = new DRPCClient(conf, "localhost", drpcPort);
+ DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost", invocationsPort)) {
+ Future<String> found = exec.submit(() -> client.getClient().execute("testing", "test"));
+ DRPCRequest request = getNextAvailableRequest(invoke, "testing");
+ assertNotNull(request);
+ assertEquals("test", request.get_func_args());
+ assertNotNull(request.get_request_id());
+ invoke.result(request.get_request_id(), "tested");
+ String result = found.get(1000, TimeUnit.MILLISECONDS);
+ assertEquals("tested", result);
+ }
+ }
+ }
+
+ @Test
+ public void testFailedThrift() throws Exception {
+ int drpcPort = Utils.getAvailablePort();
+ int invocationsPort = Utils.getAvailablePort(drpcPort + 1);
+ Map<String, Object> conf = getConf(drpcPort, invocationsPort, null);
+ try (DRPCServer server = new DRPCServer(conf)) {
+ exec.submit(() -> {
+ server.start();
+ return null;
+ });
+ try (DRPCClient client = new DRPCClient(conf, "localhost", drpcPort);
+ DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost", invocationsPort)) {
+ Future<String> found = exec.submit(() -> client.getClient().execute("testing", "test"));
+ DRPCRequest request = getNextAvailableRequest(invoke, "testing");
+ assertNotNull(request);
+ assertEquals("test", request.get_func_args());
+ assertNotNull(request.get_request_id());
+ invoke.failRequest(request.get_request_id());
+ try {
+ found.get(1000, TimeUnit.MILLISECONDS);
+ fail("exec did not throw an exception");
+ } catch (ExecutionException e) {
+ Throwable t = e.getCause();
+ assertEquals(t.getClass(), DRPCExecutionException.class);
+ //Don't know a better way to validate that it failed.
+ assertEquals("Request failed", ((DRPCExecutionException)t).get_msg());
+ }
+ }
+ }
+ }
+
+ public static String GET(int port, String func, String args) {
+ try {
+ URL url = new URL("http://localhost:"+port+"/drpc/"+func+"/"+args);
+ InputStream in = url.openStream();
+ byte[] buffer = new byte[1024];
+ int read = in.read(buffer);
+ return new String(buffer, 0, read);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testGoodHttpGet() throws Exception {
+ LOG.info("STARTING HTTP GET TEST...");
+ int drpcPort = Utils.getAvailablePort();
+ int invocationsPort = Utils.getAvailablePort(drpcPort + 1);
+ int httpPort = Utils.getAvailablePort(invocationsPort + 1);
+ Map<String, Object> conf = getConf(drpcPort, invocationsPort, httpPort);
+ try (DRPCServer server = new DRPCServer(conf)) {
+ exec.submit(() -> {
+ server.start();
+ return null;
+ });
+ //TODO need a better way to do this
+ Thread.sleep(2000);
+ try (DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost", invocationsPort)) {
+ Future<String> found = exec.submit(() -> GET(httpPort, "testing", "test"));
+ DRPCRequest request = getNextAvailableRequest(invoke, "testing");
+ assertNotNull(request);
+ assertEquals("test", request.get_func_args());
+ assertNotNull(request.get_request_id());
+ invoke.result(request.get_request_id(), "tested");
+ String result = found.get(1000, TimeUnit.MILLISECONDS);
+ assertEquals("tested", result);
+ }
+ }
+ }
+
+ @Test
+ public void testFailedHttpGet() throws Exception {
+ LOG.info("STARTING HTTP GET (FAIL) TEST...");
+ int drpcPort = Utils.getAvailablePort();
+ int invocationsPort = Utils.getAvailablePort(drpcPort + 1);
+ int httpPort = Utils.getAvailablePort(invocationsPort + 1);
+ Map<String, Object> conf = getConf(drpcPort, invocationsPort, httpPort);
+ try (DRPCServer server = new DRPCServer(conf)) {
+ exec.submit(() -> {
+ server.start();
+ return null;
+ });
+ //TODO need a better way to do this
+ Thread.sleep(2000);
+ try (DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost", invocationsPort)) {
+ Future<String> found = exec.submit(() -> GET(httpPort, "testing", "test"));
+ DRPCRequest request = getNextAvailableRequest(invoke, "testing");
+ assertNotNull(request);
+ assertEquals("test", request.get_func_args());
+ assertNotNull(request.get_request_id());
+ invoke.getClient().failRequest(request.get_request_id());
+ try {
+ found.get(1000, TimeUnit.MILLISECONDS);
+ fail("exec did not throw an exception");
+ } catch (ExecutionException e) {
+ LOG.warn("Got Expected Exception", e);
+ //Getting the exact response code is a bit more complex.
+ //TODO should use a better client
+ }
+ }
+ }
+ }
+}