You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/12/19 00:54:13 UTC
[3/8] storm git commit: STORM-2217: Make DRPC pure java with Jersy
and prepare for separating classpaths.
STORM-2217: Make DRPC pure java with Jersy and prepare for separating classpaths.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ea44062f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ea44062f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ea44062f
Branch: refs/heads/master
Commit: ea44062fdaa60cbc8faff54a26862848a527b25c
Parents: 607ba8a
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Nov 24 14:43:31 2016 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Dec 5 08:09:28 2016 -0600
----------------------------------------------------------------------
bin/storm.py | 2 +-
pom.xml | 16 +
storm-core/pom.xml | 12 +
.../src/clj/org/apache/storm/daemon/drpc.clj | 82 --
.../src/jvm/org/apache/storm/LocalDRPC.java | 29 +-
.../jvm/org/apache/storm/daemon/DrpcServer.java | 357 -------
.../org/apache/storm/daemon/StormCommon.java | 5 +-
.../daemon/drpc/BlockingOutstandingRequest.java | 67 ++
.../jvm/org/apache/storm/daemon/drpc/DRPC.java | 215 ++++
.../apache/storm/daemon/drpc/DRPCServer.java | 168 ++++
.../apache/storm/daemon/drpc/DRPCThrift.java | 58 ++
.../storm/daemon/drpc/OutstandingRequest.java | 58 ++
.../storm/daemon/drpc/RequestFactory.java | 24 +
.../webapp/AuthorizationExceptionMapper.java | 39 +
.../daemon/drpc/webapp/DRPCApplication.java | 47 +
.../daemon/drpc/webapp/DRPCExceptionMapper.java | 61 ++
.../storm/daemon/drpc/webapp/DRPCResource.java | 63 ++
.../daemon/drpc/webapp/ReqContextFilter.java | 69 ++
.../storm/drpc/DRPCInvocationsClient.java | 17 +
.../storm/generated/DRPCExceptionType.java | 68 ++
.../storm/generated/DRPCExecutionException.java | 124 ++-
.../generated/DistributedRPCInvocations.java | 969 +++++++++++++++++++
.../apache/storm/security/auth/AuthUtils.java | 14 +-
.../apache/storm/security/auth/ReqContext.java | 14 +-
.../authorizer/DRPCSimpleACLAuthorizer.java | 5 +-
.../src/jvm/org/apache/storm/ui/UIHelpers.java | 68 +-
.../src/jvm/org/apache/storm/utils/Utils.java | 1 +
.../py/storm/DistributedRPCInvocations-remote | 7 +
.../src/py/storm/DistributedRPCInvocations.py | 209 ++++
storm-core/src/py/storm/ttypes.py | 35 +-
storm-core/src/storm.thrift | 9 +
.../test/clj/org/apache/storm/drpc_test.clj | 28 -
.../storm/security/auth/drpc_auth_test.clj | 320 ------
.../storm/daemon/drpc/DRPCServerTest.java | 214 ++++
.../org/apache/storm/daemon/drpc/DRPCTest.java | 252 +++++
35 files changed, 2887 insertions(+), 839 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/bin/storm.py
----------------------------------------------------------------------
diff --git a/bin/storm.py b/bin/storm.py
index c48a2b9..a2dff80 100755
--- a/bin/storm.py
+++ b/bin/storm.py
@@ -759,7 +759,7 @@ def drpc():
"-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(), "cluster.xml")
]
exec_storm_class(
- "org.apache.storm.daemon.drpc",
+ "org.apache.storm.daemon.drpc.DRPCServer",
jvmtype="-server",
daemonName="drpc",
jvmopts=jvmopts,
http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 12f6e91..6f1ba86 100644
--- a/pom.xml
+++ b/pom.xml
@@ -298,6 +298,7 @@
<wagonVersion>1.0</wagonVersion>
<qpid.version>0.32</qpid.version>
<eventhubs.client.version>1.0.1</eventhubs.client.version>
+ <jersey.version>2.24.1</jersey.version>
</properties>
<modules>
@@ -565,6 +566,21 @@
<dependencyManagement>
<dependencies>
<dependency>
+ <groupId>org.glassfish.jersey.core</groupId>
+ <artifactId>jersey-server</artifactId>
+ <version>${jersey.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.containers</groupId>
+ <artifactId>jersey-container-servlet-core</artifactId>
+ <version>${jersey.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.containers</groupId>
+ <artifactId>jersey-container-jetty-http</artifactId>
+ <version>${jersey.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.hdrhistogram</groupId>
<artifactId>HdrHistogram</artifactId>
<version>${hdrhistogram.version}</version>
http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index b6424d7..9c47914 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -372,6 +372,18 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.core</groupId>
+ <artifactId>jersey-server</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.containers</groupId>
+ <artifactId>jersey-container-servlet-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.containers</groupId>
+ <artifactId>jersey-container-jetty-http</artifactId>
+ </dependency>
</dependencies>
<build>
<sourceDirectory>src/jvm</sourceDirectory>
http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/drpc.clj b/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
deleted file mode 100644
index 872407c..0000000
--- a/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
+++ /dev/null
@@ -1,82 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns org.apache.storm.daemon.drpc
- (:import [org.apache.storm.security.auth AuthUtils ReqContext]
- [org.apache.storm.daemon DrpcServer]
- [org.apache.storm.metric StormMetricsRegistry])
- (:import [org.apache.storm.utils Utils])
- (:import [org.apache.storm.utils ConfigUtils])
- (:use [org.apache.storm config log util])
- (:use [org.apache.storm.ui helpers])
- (:use compojure.core)
- (:use ring.middleware.reload)
- (:require [compojure.handler :as handler])
- (:gen-class))
-
-(def drpc:num-execute-http-requests (StormMetricsRegistry/registerMeter "drpc:num-execute-http-requests"))
-
-(defn handle-request [handler]
- (fn [request]
- (handler request)))
-
-(defn populate-context!
- "Populate the Storm RequestContext from an servlet-request. This should be called in each handler"
- [http-creds-handler servlet-request]
- (when http-creds-handler
- (.populateContext http-creds-handler (ReqContext/context) servlet-request)))
-
-(defn webapp [handler http-creds-handler]
- (.mark drpc:num-execute-http-requests)
- (->
- (routes
- (POST "/drpc/:func" [:as {:keys [body servlet-request]} func & m]
- (let [args (slurp body)]
- (populate-context! http-creds-handler servlet-request)
- (.execute handler func args)))
- (POST "/drpc/:func/" [:as {:keys [body servlet-request]} func & m]
- (let [args (slurp body)]
- (populate-context! http-creds-handler servlet-request)
- (.execute handler func args)))
- (GET "/drpc/:func/:args" [:as {:keys [servlet-request]} func args & m]
- (populate-context! http-creds-handler servlet-request)
- (.execute handler func args))
- (GET "/drpc/:func/" [:as {:keys [servlet-request]} func & m]
- (populate-context! http-creds-handler servlet-request)
- (.execute handler func ""))
- (GET "/drpc/:func" [:as {:keys [servlet-request]} func & m]
- (populate-context! http-creds-handler servlet-request)
- (.execute handler func "")))
- (wrap-reload '[org.apache.storm.daemon.drpc])
- handle-request))
-
-
-(defn launch-server!
- ([]
- (let [conf (clojurify-structure (ConfigUtils/readStormConfig))
- drpc-http-port (int (conf DRPC-HTTP-PORT))
- drpc-server (DrpcServer. conf)
- http-creds-handler (AuthUtils/GetDrpcHttpCredentialsPlugin conf)]
- (when (> drpc-http-port 0)
- (let [app (-> (webapp drpc-server http-creds-handler)
- requests-middleware)]
- (.setHttpServlet drpc-server (ring.util.servlet/servlet app))))
- (.launchServer drpc-server)))
-)
-
-(defn -main []
- (Utils/setupDefaultUncaughtExceptionHandler)
- (launch-server!))
http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/LocalDRPC.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/LocalDRPC.java b/storm-core/src/jvm/org/apache/storm/LocalDRPC.java
index e3f34d1..685c35f 100644
--- a/storm-core/src/jvm/org/apache/storm/LocalDRPC.java
+++ b/storm-core/src/jvm/org/apache/storm/LocalDRPC.java
@@ -19,7 +19,8 @@ package org.apache.storm;
import java.util.Map;
-import org.apache.storm.daemon.DrpcServer;
+import org.apache.storm.daemon.drpc.DRPC;
+import org.apache.storm.daemon.drpc.DRPCThrift;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.DRPCExecutionException;
import org.apache.storm.generated.DRPCRequest;
@@ -36,13 +37,13 @@ import org.apache.thrift.TException;
*/
public class LocalDRPC implements ILocalDRPC {
- private final DrpcServer handler;
+ private final DRPC drpc;
private final String serviceId;
public LocalDRPC() {
Map<String, Object> conf = ConfigUtils.readStormConfig();
- handler = new DrpcServer(conf);
- serviceId = ServiceRegistry.registerService(handler);
+ drpc = new DRPC(conf);
+ serviceId = ServiceRegistry.registerService(new DRPCThrift(drpc));
}
@Override
@@ -52,32 +53,38 @@ public class LocalDRPC implements ILocalDRPC {
@Override
public void result(String id, String result) throws AuthorizationException, TException {
- handler.result(id, result);
+ drpc.returnResult(id, result);
}
@Override
public String execute(String functionName, String funcArgs) throws DRPCExecutionException, AuthorizationException, TException {
- return handler.execute(functionName, funcArgs);
+ return drpc.executeBlocking(functionName, funcArgs);
}
@Override
public void failRequest(String id) throws AuthorizationException, TException {
- handler.failRequest(id);
+ drpc.failRequest(id, null);
}
+
@Override
- public void shutdown() {
- close();
+ public void failRequestV2(String id, DRPCExecutionException e) throws AuthorizationException, TException {
+ drpc.failRequest(id, e);
}
@Override
public DRPCRequest fetchRequest(String functionName) throws AuthorizationException, TException {
- return handler.fetchRequest(functionName);
+ return drpc.fetchRequest(functionName);
}
@Override
public void close() {
ServiceRegistry.unregisterService(this.serviceId);
- this.handler.close();
+ drpc.close();
+ }
+
+ @Override
+ public void shutdown() {
+ close();
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/daemon/DrpcServer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/DrpcServer.java b/storm-core/src/jvm/org/apache/storm/daemon/DrpcServer.java
deleted file mode 100644
index 21308fd..0000000
--- a/storm-core/src/jvm/org/apache/storm/daemon/DrpcServer.java
+++ /dev/null
@@ -1,357 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon;
-
-import com.codahale.metrics.Meter;
-import com.google.common.collect.ImmutableMap;
-import org.apache.commons.lang.StringUtils;
-import org.apache.storm.Config;
-import org.apache.storm.generated.*;
-import org.apache.storm.logging.ThriftAccessLogger;
-import org.apache.storm.metric.StormMetricsRegistry;
-import org.apache.storm.security.auth.*;
-import org.apache.storm.security.auth.authorizer.DRPCAuthorizerBase;
-import org.apache.storm.ui.FilterConfiguration;
-import org.apache.storm.ui.IConfigurator;
-import org.apache.storm.ui.UIHelpers;
-import org.apache.storm.utils.Time;
-import org.apache.storm.utils.Utils;
-import org.apache.storm.utils.VersionInfo;
-import org.apache.thrift.TException;
-import org.eclipse.jetty.server.Server;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.servlet.Servlet;
-import java.security.Principal;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class DrpcServer implements DistributedRPC.Iface, DistributedRPCInvocations.Iface, AutoCloseable {
-
- private static final Logger LOG = LoggerFactory.getLogger(DrpcServer.class);
- private final Long timeoutCheckSecs = 5L;
-
- private Map conf;
-
- private ThriftServer handlerServer;
- private ThriftServer invokeServer;
- private IHttpCredentialsPlugin httpCredsHandler;
-
- private Thread clearThread;
-
- private IAuthorizer authorizer;
-
- private Servlet httpServlet;
-
- private AtomicInteger ctr = new AtomicInteger(0);
- private ConcurrentHashMap<String, ConcurrentLinkedQueue<DRPCRequest>> requestQueues = new ConcurrentHashMap<String, ConcurrentLinkedQueue<DRPCRequest>>();
-
-
- private static class InternalRequest {
- public final Semaphore sem;
- public final int startTimeSecs;
- public final String function;
- public final DRPCRequest request;
- public volatile Object result;
-
- public InternalRequest(String function, DRPCRequest request) {
- sem = new Semaphore(0);
- startTimeSecs = Time.currentTimeSecs();
- this.function = function;
- this.request = request;
- }
- }
-
- private ConcurrentHashMap<String, InternalRequest> outstandingRequests = new ConcurrentHashMap<>();
-
- private final static Meter meterHttpRequests = StormMetricsRegistry.registerMeter("drpc:num-execute-http-requests");
- private final static Meter meterExecuteCalls = StormMetricsRegistry.registerMeter("drpc:num-execute-calls");
- private final static Meter meterResultCalls = StormMetricsRegistry.registerMeter("drpc:num-result-calls");
- private final static Meter meterFailRequestCalls = StormMetricsRegistry.registerMeter("drpc:num-failRequest-calls");
- private final static Meter meterFetchRequestCalls = StormMetricsRegistry.registerMeter("drpc:num-fetchRequest-calls");
- private final static Meter meterShutdownCalls = StormMetricsRegistry.registerMeter("drpc:num-shutdown-calls");
-
- public DrpcServer(Map conf) {
- this.conf = conf;
- this.authorizer = mkAuthorizationHandler((String) (this.conf.get(Config.DRPC_AUTHORIZER)));
- initClearThread();
- }
-
- public void setHttpServlet(Servlet httpServlet) {
- this.httpServlet = httpServlet;
- }
-
- private ThriftServer initHandlerServer(final DrpcServer service) throws Exception {
- int port = (int) conf.get(Config.DRPC_PORT);
- if (port > 0) {
- handlerServer = new ThriftServer(conf, new DistributedRPC.Processor<DistributedRPC.Iface>(service), ThriftConnectionType.DRPC);
- }
- return handlerServer;
- }
-
- private ThriftServer initInvokeServer(final DrpcServer service) throws Exception {
- invokeServer = new ThriftServer(conf, new DistributedRPCInvocations.Processor<DistributedRPCInvocations.Iface>(service),
- ThriftConnectionType.DRPC_INVOCATIONS);
- return invokeServer;
- }
-
- private void initHttp() throws Exception {
- LOG.info("Starting RPC Http servers...");
- Integer drpcHttpPort = (Integer) conf.get(Config.DRPC_HTTP_PORT);
- if (drpcHttpPort != null && drpcHttpPort > 0) {
- String filterClass = (String) (conf.get(Config.DRPC_HTTP_FILTER));
- Map<String, String> filterParams = (Map<String, String>) (conf.get(Config.DRPC_HTTP_FILTER_PARAMS));
- FilterConfiguration filterConfiguration = new FilterConfiguration(filterClass, filterParams);
- final List<FilterConfiguration> filterConfigurations = Arrays.asList(filterConfiguration);
- final Integer httpsPort = Utils.getInt(conf.get(Config.DRPC_HTTPS_PORT), 0);
- final String httpsKsPath = (String) (conf.get(Config.DRPC_HTTPS_KEYSTORE_PATH));
- final String httpsKsPassword = (String) (conf.get(Config.DRPC_HTTPS_KEYSTORE_PASSWORD));
- final String httpsKsType = (String) (conf.get(Config.DRPC_HTTPS_KEYSTORE_TYPE));
- final String httpsKeyPassword = (String) (conf.get(Config.DRPC_HTTPS_KEY_PASSWORD));
- final String httpsTsPath = (String) (conf.get(Config.DRPC_HTTPS_TRUSTSTORE_PATH));
- final String httpsTsPassword = (String) (conf.get(Config.DRPC_HTTPS_TRUSTSTORE_PASSWORD));
- final String httpsTsType = (String) (conf.get(Config.DRPC_HTTPS_TRUSTSTORE_TYPE));
- final Boolean httpsWantClientAuth = (Boolean) (conf.get(Config.DRPC_HTTPS_WANT_CLIENT_AUTH));
- final Boolean httpsNeedClientAuth = (Boolean) (conf.get(Config.DRPC_HTTPS_NEED_CLIENT_AUTH));
-
- UIHelpers.stormRunJetty(drpcHttpPort, new IConfigurator() {
- @Override
- public void execute(Server s) {
- UIHelpers.configSsl(s, httpsPort, httpsKsPath, httpsKsPassword, httpsKsType, httpsKeyPassword, httpsTsPath, httpsTsPassword, httpsTsType,
- httpsNeedClientAuth, httpsWantClientAuth);
- UIHelpers.configFilter(s, httpServlet, filterConfigurations);
- }
- });
- }
-
- }
-
- private void initThrift() throws Exception {
-
- handlerServer = initHandlerServer(this);
- invokeServer = initInvokeServer(this);
- httpCredsHandler = AuthUtils.GetDrpcHttpCredentialsPlugin(conf);
- Utils.addShutdownHookWithForceKillIn1Sec(new Runnable() {
- @Override
- public void run() {
- if (handlerServer != null)
- handlerServer.stop();
- invokeServer.stop();
- }
- });
- LOG.info("Starting Distributed RPC servers...");
- new Thread(new Runnable() {
-
- @Override
- public void run() {
- invokeServer.serve();
- }
- }).start();
-
- StormMetricsRegistry.startMetricsReporters(conf);
-
- if (handlerServer != null)
- handlerServer.serve();
- }
-
- private void initClearThread() {
- clearThread = Utils.asyncLoop(new Callable() {
-
- @Override
- public Object call() throws Exception {
- for (Map.Entry<String, InternalRequest> e : outstandingRequests.entrySet()) {
- InternalRequest internalRequest = e.getValue();
- if (Time.deltaSecs(internalRequest.startTimeSecs) > Utils.getInt(conf.get(Config.DRPC_REQUEST_TIMEOUT_SECS))) {
- String id = e.getKey();
- Semaphore sem = internalRequest.sem;
- if (sem != null) {
- String func = internalRequest.function;
- acquireQueue(func).remove(internalRequest.request);
- LOG.warn("Timeout DRPC request id: {} start at {}", id, e.getValue());
- sem.release();
- }
- cleanup(id);
- }
- }
- return getTimeoutCheckSecs();
- }
- });
- }
-
- public Long getTimeoutCheckSecs() {
- return timeoutCheckSecs;
- }
-
- public void launchServer() throws Exception {
- LOG.info("Starting drpc server for storm version {}", VersionInfo.getVersion());
- initHttp();
- initThrift();
- }
-
- @Override
- public void close() {
- meterShutdownCalls.mark();
- clearThread.interrupt();
- }
-
- public void cleanup(String id) {
- outstandingRequests.remove(id);
- }
-
- @Override
- public String execute(String functionName, String funcArgs) throws DRPCExecutionException, AuthorizationException, org.apache.thrift.TException {
- meterExecuteCalls.mark();
- LOG.debug("Received DRPC request for {} ({}) at {} ", functionName, funcArgs, System.currentTimeMillis());
- Map<String, String> map = new HashMap<>();
- map.put(DRPCAuthorizerBase.FUNCTION_NAME, functionName);
- checkAuthorization(authorizer, map, "execute", functionName);
-
- int newid = 0;
- int orig = 0;
- do {
- orig = ctr.get();
- newid = (orig + 1) % 1000000000;
- } while (!ctr.compareAndSet(orig, newid));
- String strid = String.valueOf(newid);
-
- DRPCRequest req = new DRPCRequest(funcArgs, strid);
- InternalRequest internalRequest = new InternalRequest(functionName, req);
- this.outstandingRequests.put(strid, internalRequest);
- ConcurrentLinkedQueue<DRPCRequest> queue = acquireQueue(functionName);
- queue.add(req);
- LOG.debug("Waiting for DRPC request for {} {} at {}", functionName, funcArgs, System.currentTimeMillis());
- try {
- internalRequest.sem.acquire();
- } catch (InterruptedException e) {
- LOG.error("acquire fail ", e);
- }
- LOG.debug("Acquired for DRPC request for {} {} at {}", functionName, funcArgs, System.currentTimeMillis());
-
- Object result = internalRequest.result;
-
- LOG.debug("Returning for DRPC request for " + functionName + " " + funcArgs + " at " + (System.currentTimeMillis()));
-
- this.cleanup(strid);
-
- if (result instanceof DRPCExecutionException) {
- throw (DRPCExecutionException) result;
- }
- if (result == null) {
- throw new DRPCExecutionException("Request timed out");
- }
- return (String) result;
- }
-
- @Override
- public void result(String id, String result) throws AuthorizationException, TException {
- meterResultCalls.mark();
- InternalRequest internalRequest = this.outstandingRequests.get(id);
- if (internalRequest != null) {
- Map<String, String> map = ImmutableMap.of(DRPCAuthorizerBase.FUNCTION_NAME, internalRequest.function);
- checkAuthorization(authorizer, map, "result", internalRequest.function);
- Semaphore sem = internalRequest.sem;
- LOG.debug("Received result {} for {} at {}", result, id, System.currentTimeMillis());
- if (sem != null) {
- internalRequest.result = result;
- sem.release();
- }
- }
- }
-
- @Override
- public DRPCRequest fetchRequest(String functionName) throws AuthorizationException, TException {
- meterFetchRequestCalls.mark();
- Map<String, String> map = new HashMap<>();
- map.put(DRPCAuthorizerBase.FUNCTION_NAME, functionName);
- checkAuthorization(authorizer, map, "fetchRequest", functionName);
- ConcurrentLinkedQueue<DRPCRequest> queue = acquireQueue(functionName);
- DRPCRequest req = queue.poll();
- if (req != null) {
- LOG.debug("Fetched request for {} at {}", functionName, System.currentTimeMillis());
- return req;
- } else {
- return new DRPCRequest("", "");
- }
- }
-
- @Override
- public void failRequest(String id) throws AuthorizationException, TException {
- meterFailRequestCalls.mark();
- InternalRequest internalRequest = this.outstandingRequests.get(id);
- if (internalRequest != null) {
- Map<String, String> map = new HashMap<>();
- map.put(DRPCAuthorizerBase.FUNCTION_NAME, internalRequest.function);
- checkAuthorization(authorizer, map, "failRequest", internalRequest.function);
- Semaphore sem = internalRequest.sem;
- if (sem != null) {
- internalRequest.result = new DRPCExecutionException("Request failed");
- sem.release();
- }
- }
- }
-
- protected ConcurrentLinkedQueue<DRPCRequest> acquireQueue(String function) {
- ConcurrentLinkedQueue<DRPCRequest> reqQueue = requestQueues.get(function);
- if (reqQueue == null) {
- reqQueue = new ConcurrentLinkedQueue<>();
- ConcurrentLinkedQueue<DRPCRequest> old = requestQueues.putIfAbsent(function, reqQueue);
- if (old != null) {
- reqQueue = old;
- }
- }
- return reqQueue;
- }
-
- private void checkAuthorization(IAuthorizer aclHandler, Map mapping, String operation, ReqContext reqContext, String function) throws AuthorizationException {
- if (reqContext != null) {
- ThriftAccessLogger.logAccessFunction(reqContext.requestID(), reqContext.remoteAddress(), reqContext.principal(), operation, function);
- }
- if (aclHandler != null) {
- if (reqContext == null)
- reqContext = ReqContext.context();
- if (!aclHandler.permit(reqContext, operation, mapping)) {
- Principal principal = reqContext.principal();
- String user = (principal != null) ? principal.getName() : "unknown";
- throw new AuthorizationException("DRPC request '" + operation + "' for '" + user + "' user is not authorized");
- }
- }
- }
-
- private void checkAuthorization(IAuthorizer aclHandler, Map mapping, String operation, String function) throws AuthorizationException {
- checkAuthorization(aclHandler, mapping, operation, ReqContext.context(), function);
- }
-
- // TO be replaced by Common.mkAuthorizationHandler
- private IAuthorizer mkAuthorizationHandler(String klassname) {
- IAuthorizer authorizer = null;
- Class aznClass = null;
- if (StringUtils.isNotBlank(klassname)) {
- try {
- aznClass = Class.forName(klassname);
- authorizer = (IAuthorizer) aznClass.newInstance();
- authorizer.prepare(conf);
- } catch (Exception e) {
- LOG.error("mkAuthorizationHandler failed!", e);
- }
- }
- LOG.debug("authorization class name: {} class: {} handler: {}", klassname, aznClass, authorizer);
- return authorizer;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
index 874b607..90753a7 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
@@ -17,6 +17,7 @@
*/
package org.apache.storm.daemon;
+import org.apache.commons.lang.StringUtils;
import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.Thrift;
@@ -552,8 +553,8 @@ public class StormCommon {
protected IAuthorizer mkAuthorizationHandlerImpl(String klassName, Map conf)
throws ClassNotFoundException, IllegalAccessException, InstantiationException {
IAuthorizer aznHandler = null;
- if (klassName != null) {
- Class aznClass = Class.forName(klassName);
+ if (StringUtils.isNotBlank(klassName)) {
+ Class<?> aznClass = Class.forName(klassName);
if (aznClass != null) {
aznHandler = (IAuthorizer) aznClass.newInstance();
if (aznHandler != null) {
http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/daemon/drpc/BlockingOutstandingRequest.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/drpc/BlockingOutstandingRequest.java b/storm-core/src/jvm/org/apache/storm/daemon/drpc/BlockingOutstandingRequest.java
new file mode 100644
index 0000000..619af65
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/drpc/BlockingOutstandingRequest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.drpc;
+
+import java.util.concurrent.Semaphore;
+
+import org.apache.storm.generated.DRPCExceptionType;
+import org.apache.storm.generated.DRPCExecutionException;
+import org.apache.storm.generated.DRPCRequest;
+
+public class BlockingOutstandingRequest extends OutstandingRequest {
+ public static final RequestFactory<BlockingOutstandingRequest> FACTORY =
+ (function, request) -> new BlockingOutstandingRequest(function, request);
+ private Semaphore _sem;
+ private volatile String _result = null;
+ private volatile DRPCExecutionException _e = null;
+
+ public BlockingOutstandingRequest(String function, DRPCRequest req) {
+ super(function, req);
+ _sem = new Semaphore(0);
+ }
+
+ public String getResult() throws DRPCExecutionException {
+ try {
+ _sem.acquire();
+ } catch (InterruptedException e) {
+ //Ignored
+ }
+
+ if (_result != null) {
+ return _result;
+ }
+
+ if (_e == null) {
+ _e = new DRPCExecutionException("Internal Error: No Result and No Exception");
+ _e.set_type(DRPCExceptionType.INTERNAL_ERROR);
+ }
+ throw _e;
+ }
+
+ @Override
+ public void returnResult(String result) {
+ _result = result;
+ _sem.release();
+ }
+
+ @Override
+ public void fail(DRPCExecutionException e) {
+ _e = e;
+ _sem.release();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/daemon/drpc/DRPC.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/drpc/DRPC.java b/storm-core/src/jvm/org/apache/storm/daemon/drpc/DRPC.java
new file mode 100644
index 0000000..7ddb6a9
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/drpc/DRPC.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.drpc;
+
+import java.security.Principal;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.storm.Config;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.DRPCExceptionType;
+import org.apache.storm.generated.DRPCExecutionException;
+import org.apache.storm.generated.DRPCRequest;
+import org.apache.storm.logging.ThriftAccessLogger;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.security.auth.IAuthorizer;
+import org.apache.storm.security.auth.ReqContext;
+import org.apache.storm.security.auth.authorizer.DRPCAuthorizerBase;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.Meter;
+import com.google.common.annotations.VisibleForTesting;
+
+public class DRPC implements AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(DRPC.class);
+ private static final DRPCRequest NOTHING_REQUEST = new DRPCRequest("","");
+ private static final DRPCExecutionException TIMED_OUT = new DRPCExecutionException("Timed Out");
+ private static final DRPCExecutionException SHUT_DOWN = new DRPCExecutionException("Server Shutting Down");
+ private static final DRPCExecutionException DEFAULT_FAILED = new DRPCExecutionException("Request failed");
+ static {
+ TIMED_OUT.set_type(DRPCExceptionType.SERVER_TIMEOUT);
+ SHUT_DOWN.set_type(DRPCExceptionType.SERVER_SHUTDOWN);
+ DEFAULT_FAILED.set_type(DRPCExceptionType.FAILED_REQUEST);
+ }
+ private static final Meter meterServerTimedOut = StormMetricsRegistry.registerMeter("drpc:num-server-timedout-requests");
+ private static final Meter meterExecuteCalls = StormMetricsRegistry.registerMeter("drpc:num-execute-calls");
+ private static final Meter meterResultCalls = StormMetricsRegistry.registerMeter("drpc:num-result-calls");
+ private static final Meter meterFailRequestCalls = StormMetricsRegistry.registerMeter("drpc:num-failRequest-calls");
+ private static final Meter meterFetchRequestCalls = StormMetricsRegistry.registerMeter("drpc:num-fetchRequest-calls");
+
+ private static IAuthorizer mkAuthorizationHandler(String klassname, Map<String, Object> conf) {
+ try {
+ return StormCommon.mkAuthorizationHandler(klassname, conf);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @VisibleForTesting
+ static void checkAuthorization(ReqContext reqContext, IAuthorizer auth, String operation, String function) throws AuthorizationException {
+ if (reqContext != null) {
+ ThriftAccessLogger.logAccessFunction(reqContext.requestID(), reqContext.remoteAddress(), reqContext.principal(), operation, function);
+ }
+ if (auth != null) {
+ Map<String, String> map = new HashMap<>();
+ map.put(DRPCAuthorizerBase.FUNCTION_NAME, function);
+ if (!auth.permit(reqContext, operation, map)) {
+ Principal principal = reqContext.principal();
+ String user = (principal != null) ? principal.getName() : "unknown";
+ throw new AuthorizationException("DRPC request '" + operation + "' for '" + user + "' user is not authorized");
+ }
+ }
+ }
+
+ //Waiting to be fetched
+ private final ConcurrentHashMap<String, ConcurrentLinkedQueue<OutstandingRequest>> _queues =
+ new ConcurrentHashMap<>();
+ //Waiting to be returned
+ private final ConcurrentHashMap<String, OutstandingRequest> _requests =
+ new ConcurrentHashMap<>();
+ private final Timer _timer = new Timer();
+ private final AtomicLong _ctr = new AtomicLong(0);
+ private final IAuthorizer _auth;
+
+ public DRPC(Map<String, Object> conf) {
+ this(mkAuthorizationHandler((String)conf.get(Config.DRPC_AUTHORIZER), conf),
+ Utils.getInt(conf.get(Config.DRPC_REQUEST_TIMEOUT_SECS), 600) * 1000);
+ }
+
+ public DRPC(IAuthorizer auth, long timeoutMs) {
+ _auth = auth;
+ _timer.scheduleAtFixedRate(new TimerTask() {
+ @Override
+ public void run() {
+ cleanupAll(timeoutMs, TIMED_OUT);
+ }
+ }, timeoutMs/2, timeoutMs/2);
+ }
+
+
+ private void checkAuthorization(String operation, String function) throws AuthorizationException {
+ checkAuthorization(ReqContext.context(), _auth, operation, function);
+ }
+
+ private void cleanup(String id) {
+ OutstandingRequest req = _requests.remove(id);
+ if (req != null && !req.wasFetched()) {
+ _queues.get(req.getFunction()).remove(req);
+ }
+ }
+
+ private void cleanupAll(long timeoutMs, DRPCExecutionException exp) {
+ for (Entry<String, OutstandingRequest> e : _requests.entrySet()) {
+ OutstandingRequest req = e.getValue();
+ if (req.isTimedOut(timeoutMs)) {
+ req.fail(exp);
+ cleanup(e.getKey());
+ meterServerTimedOut.mark();
+ }
+ }
+ }
+
+ private String nextId() {
+ return String.valueOf(_ctr.incrementAndGet());
+ }
+
+ private ConcurrentLinkedQueue<OutstandingRequest> getQueue(String function) {
+ ConcurrentLinkedQueue<OutstandingRequest> queue = _queues.get(function);
+ if (queue == null) {
+ _queues.putIfAbsent(function, new ConcurrentLinkedQueue<>());
+ queue = _queues.get(function);
+ }
+ return queue;
+ }
+
+ public void returnResult(String id, String result) throws AuthorizationException {
+ meterResultCalls.mark();
+ LOG.debug("Got a result {} {}", id, result);
+ OutstandingRequest req = _requests.get(id);
+ if (req != null) {
+ checkAuthorization("result", req.getFunction());
+ req.returnResult(result);
+ }
+ }
+
+ public DRPCRequest fetchRequest(String functionName) throws AuthorizationException {
+ meterFetchRequestCalls.mark();
+ checkAuthorization("fetchRequest", functionName);
+ ConcurrentLinkedQueue<OutstandingRequest> q = getQueue(functionName);
+ OutstandingRequest req = q.poll();
+ if (req != null) {
+ req.fetched();
+ DRPCRequest ret = req.getRequest();
+ return ret;
+ }
+ return NOTHING_REQUEST;
+ }
+
+ public void failRequest(String id, DRPCExecutionException e) throws AuthorizationException {
+ meterFailRequestCalls.mark();
+ LOG.debug("Got a fail {}", id);
+ OutstandingRequest req = _requests.get(id);
+ if (req != null) {
+ checkAuthorization("failRequest", req.getFunction());
+ if (e == null) {
+ e = DEFAULT_FAILED;
+ }
+ req.fail(e);
+ }
+ }
+
+ public <T extends OutstandingRequest> T execute(String functionName, String funcArgs, RequestFactory<T> factory) throws AuthorizationException {
+ meterExecuteCalls.mark();
+ checkAuthorization("execute", functionName);
+ String id = nextId();
+ LOG.debug("Execute {} {}", functionName, funcArgs);
+ T req = factory.mkRequest(functionName, new DRPCRequest(funcArgs, id));
+ _requests.put(id, req);
+ ConcurrentLinkedQueue<OutstandingRequest> q = getQueue(functionName);
+ q.add(req);
+ return req;
+ }
+
+ public String executeBlocking(String functionName, String funcArgs) throws DRPCExecutionException, AuthorizationException {
+ BlockingOutstandingRequest req = execute(functionName, funcArgs, BlockingOutstandingRequest.FACTORY);
+ try {
+ LOG.debug("Waiting for result {} {}",functionName, funcArgs);
+ return req.getResult();
+ } catch (DRPCExecutionException e) {
+ throw e;
+ } finally {
+ cleanup(req.getRequest().get_request_id());
+ }
+ }
+
+ @Override
+ public void close() {
+ _timer.cancel();
+ cleanupAll(0, SHUT_DOWN);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/daemon/drpc/DRPCServer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/drpc/DRPCServer.java b/storm-core/src/jvm/org/apache/storm/daemon/drpc/DRPCServer.java
new file mode 100644
index 0000000..34dba76
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/drpc/DRPCServer.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.drpc;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.Config;
+import org.apache.storm.daemon.drpc.webapp.DRPCApplication;
+import org.apache.storm.generated.DistributedRPC;
+import org.apache.storm.generated.DistributedRPCInvocations;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.security.auth.ThriftConnectionType;
+import org.apache.storm.security.auth.ThriftServer;
+import org.apache.storm.ui.FilterConfiguration;
+import org.apache.storm.ui.UIHelpers;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.glassfish.jersey.servlet.ServletContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.Meter;
+import com.google.common.annotations.VisibleForTesting;
+
+public class DRPCServer implements AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(DRPCServer.class);
+ private final static Meter meterShutdownCalls = StormMetricsRegistry.registerMeter("drpc:num-shutdown-calls");
+
+ private static ThriftServer mkHandlerServer(final DistributedRPC.Iface service, Integer port, Map<String, Object> conf) {
+ ThriftServer ret = null;
+ if (port != null && port > 0) {
+ ret = new ThriftServer(conf, new DistributedRPC.Processor<>(service),
+ ThriftConnectionType.DRPC);
+ }
+ return ret;
+ }
+
+ private static ThriftServer mkInvokeServer(final DistributedRPCInvocations.Iface service, int port, Map<String, Object> conf) {
+ return new ThriftServer(conf, new DistributedRPCInvocations.Processor<>(service),
+ ThriftConnectionType.DRPC_INVOCATIONS);
+ }
+
+ private static Server mkHttpServer(Map<String, Object> conf, DRPC drpc) {
+ Integer drpcHttpPort = (Integer) conf.get(Config.DRPC_HTTP_PORT);
+ Server ret = null;
+ if (drpcHttpPort != null && drpcHttpPort > 0) {
+ LOG.info("Starting RPC HTTP servers...");
+ String filterClass = (String) (conf.get(Config.DRPC_HTTP_FILTER));
+ @SuppressWarnings("unchecked")
+ Map<String, String> filterParams = (Map<String, String>) (conf.get(Config.DRPC_HTTP_FILTER_PARAMS));
+ FilterConfiguration filterConfiguration = new FilterConfiguration(filterClass, filterParams);
+ final List<FilterConfiguration> filterConfigurations = Arrays.asList(filterConfiguration);
+ final Integer httpsPort = Utils.getInt(conf.get(Config.DRPC_HTTPS_PORT), 0);
+ final String httpsKsPath = (String) (conf.get(Config.DRPC_HTTPS_KEYSTORE_PATH));
+ final String httpsKsPassword = (String) (conf.get(Config.DRPC_HTTPS_KEYSTORE_PASSWORD));
+ final String httpsKsType = (String) (conf.get(Config.DRPC_HTTPS_KEYSTORE_TYPE));
+ final String httpsKeyPassword = (String) (conf.get(Config.DRPC_HTTPS_KEY_PASSWORD));
+ final String httpsTsPath = (String) (conf.get(Config.DRPC_HTTPS_TRUSTSTORE_PATH));
+ final String httpsTsPassword = (String) (conf.get(Config.DRPC_HTTPS_TRUSTSTORE_PASSWORD));
+ final String httpsTsType = (String) (conf.get(Config.DRPC_HTTPS_TRUSTSTORE_TYPE));
+ final Boolean httpsWantClientAuth = (Boolean) (conf.get(Config.DRPC_HTTPS_WANT_CLIENT_AUTH));
+ final Boolean httpsNeedClientAuth = (Boolean) (conf.get(Config.DRPC_HTTPS_NEED_CLIENT_AUTH));
+
+ //TODO a better way to do this would be great.
+ DRPCApplication.setup(drpc);
+ ret = UIHelpers.jettyCreateServer(drpcHttpPort, null, httpsPort);
+
+ UIHelpers.configSsl(ret, httpsPort, httpsKsPath, httpsKsPassword, httpsKsType, httpsKeyPassword, httpsTsPath, httpsTsPassword, httpsTsType,
+ httpsNeedClientAuth, httpsWantClientAuth);
+
+ ServletContextHandler context = new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
+ context.setContextPath("/");
+ ret.setHandler(context);
+
+ ServletHolder jerseyServlet = context.addServlet(ServletContainer.class, "/*");
+ jerseyServlet.setInitOrder(1);
+ jerseyServlet.setInitParameter("javax.ws.rs.Application", DRPCApplication.class.getName());
+
+ UIHelpers.configFilters(context, filterConfigurations);
+ UIHelpers.addRequestContextFilter(context, Config.DRPC_HTTP_CREDS_PLUGIN, conf);
+ }
+ return ret;
+ }
+
+ private final DRPC _drpc;
+ private final ThriftServer _handlerServer;
+ private final ThriftServer _invokeServer;
+ private final Server _httpServer;
+ private boolean _closed = false;
+
+ public DRPCServer(Map<String, Object> conf) {
+ _drpc = new DRPC(conf);
+ DRPCThrift thrift = new DRPCThrift(_drpc);
+ _handlerServer = mkHandlerServer(thrift, Utils.getInt(conf.get(Config.DRPC_PORT), null), conf);
+ _invokeServer = mkInvokeServer(thrift, Utils.getInt(conf.get(Config.DRPC_INVOCATIONS_PORT), 3773), conf);
+ _httpServer = mkHttpServer(conf, _drpc);
+ }
+
+ @VisibleForTesting
+ void start() throws Exception {
+ LOG.info("Starting Distributed RPC servers...");
+ new Thread(() -> _invokeServer.serve()).start();
+
+ if (_httpServer != null) {
+ _httpServer.start();
+ }
+
+ if (_handlerServer != null) {
+ _handlerServer.serve();
+ } else {
+ _httpServer.join();
+ }
+ }
+
+ @Override
+ public synchronized void close() {
+ if (!_closed) {
+ //This is kind of useless...
+ meterShutdownCalls.mark();
+
+ if (_handlerServer != null) {
+ _handlerServer.stop();
+ }
+
+ if (_invokeServer != null) {
+ _invokeServer.stop();
+ }
+
+ //TODO this is causing issues...
+ //if (_httpServer != null) {
+ // _httpServer.destroy();
+ //}
+
+ _drpc.close();
+ _closed = true;
+ }
+ }
+
+ public static void main(String [] args) throws Exception {
+ Utils.setupDefaultUncaughtExceptionHandler();
+ Map<String, Object> conf = ConfigUtils.readStormConfig();
+ try (DRPCServer server = new DRPCServer(conf)) {
+ Utils.addShutdownHookWithForceKillIn1Sec(() -> server.close());
+ StormMetricsRegistry.startMetricsReporters(conf);
+ server.start();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/daemon/drpc/DRPCThrift.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/drpc/DRPCThrift.java b/storm-core/src/jvm/org/apache/storm/daemon/drpc/DRPCThrift.java
new file mode 100644
index 0000000..78e008d
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/drpc/DRPCThrift.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.drpc;
+
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.DRPCExecutionException;
+import org.apache.storm.generated.DRPCRequest;
+import org.apache.storm.generated.DistributedRPC;
+import org.apache.storm.generated.DistributedRPCInvocations;
+
+public class DRPCThrift implements DistributedRPC.Iface, DistributedRPCInvocations.Iface {
+ private final DRPC _drpc;
+
+ public DRPCThrift(DRPC drpc) {
+ _drpc = drpc;
+ }
+
+ @Override
+ public void result(String id, String result) throws AuthorizationException {
+ _drpc.returnResult(id, result);
+ }
+
+ @Override
+ public DRPCRequest fetchRequest(String functionName) throws AuthorizationException {
+ return _drpc.fetchRequest(functionName);
+ }
+
+ @Override
+ public void failRequest(String id) throws AuthorizationException {
+ _drpc.failRequest(id, null);
+ }
+
+ @Override
+ public void failRequestV2(String id, DRPCExecutionException e) throws AuthorizationException {
+ _drpc.failRequest(id, e);
+ }
+
+ @Override
+ public String execute(String functionName, String funcArgs)
+ throws DRPCExecutionException, AuthorizationException {
+ return _drpc.executeBlocking(functionName, funcArgs);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/daemon/drpc/OutstandingRequest.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/drpc/OutstandingRequest.java b/storm-core/src/jvm/org/apache/storm/daemon/drpc/OutstandingRequest.java
new file mode 100644
index 0000000..6e86936
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/drpc/OutstandingRequest.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.drpc;
+
+import org.apache.storm.generated.DRPCExecutionException;
+import org.apache.storm.generated.DRPCRequest;
+import org.apache.storm.utils.Time;
+
+public abstract class OutstandingRequest {
+ private final long _start;
+ private final String _function;
+ private final DRPCRequest _req;
+ private volatile boolean _fetched = false;
+
+ public OutstandingRequest(String function, DRPCRequest req) {
+ _start = Time.currentTimeMillis();
+ _function = function;
+ _req = req;
+ }
+
+ public DRPCRequest getRequest() {
+ return _req;
+ }
+
+ public void fetched() {
+ _fetched = true;
+ }
+
+ public boolean wasFetched() {
+ return _fetched;
+ }
+
+ public String getFunction() {
+ return _function;
+ }
+
+ public boolean isTimedOut(long timeoutMs) {
+ return (_start + timeoutMs) <= Time.currentTimeMillis();
+ }
+
+ public abstract void returnResult(String result);
+ public abstract void fail(DRPCExecutionException e);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/daemon/drpc/RequestFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/drpc/RequestFactory.java b/storm-core/src/jvm/org/apache/storm/daemon/drpc/RequestFactory.java
new file mode 100644
index 0000000..139e243
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/drpc/RequestFactory.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.drpc;
+
+import org.apache.storm.generated.DRPCRequest;
+
+public interface RequestFactory<T extends OutstandingRequest> {
+ public T mkRequest(String function, DRPCRequest req);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/AuthorizationExceptionMapper.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/AuthorizationExceptionMapper.java b/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/AuthorizationExceptionMapper.java
new file mode 100644
index 0000000..75c3100
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/AuthorizationExceptionMapper.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.drpc.webapp;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.ExceptionMapper;
+import javax.ws.rs.ext.Provider;
+
+import org.apache.storm.generated.AuthorizationException;
+import org.json.simple.JSONValue;
+
+@Provider
+public class AuthorizationExceptionMapper implements ExceptionMapper<AuthorizationException> {
+ @Override
+ public Response toResponse(AuthorizationException ex) {
+ Map<String, String> body = new HashMap<>();
+ body.put("error", "Not Authorized");
+ body.put("errorMessage", ex.get_msg());
+ return Response.status(403).entity(JSONValue.toJSONString(body)).type("application/json").build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCApplication.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCApplication.java b/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCApplication.java
new file mode 100644
index 0000000..4a0ce8c
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCApplication.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.drpc.webapp;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.ws.rs.ApplicationPath;
+import javax.ws.rs.core.Application;
+
+import org.apache.storm.daemon.drpc.DRPC;
+
+@ApplicationPath("")
+public class DRPCApplication extends Application {
+ private static DRPC _drpc;
+ private final Set<Object> singletons = new HashSet<Object>();
+
+ public DRPCApplication() {
+ singletons.add(new DRPCResource(_drpc));
+ singletons.add(new DRPCExceptionMapper());
+ singletons.add(new AuthorizationExceptionMapper());
+ }
+
+ @Override
+ public Set<Object> getSingletons() {
+ return singletons;
+ }
+
+ public static void setup(DRPC drpc) {
+ _drpc = drpc;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCExceptionMapper.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCExceptionMapper.java b/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCExceptionMapper.java
new file mode 100644
index 0000000..60cfc93
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCExceptionMapper.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.drpc.webapp;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.ResponseBuilder;
+import javax.ws.rs.ext.ExceptionMapper;
+import javax.ws.rs.ext.Provider;
+
+import org.apache.storm.generated.DRPCExecutionException;
+import org.json.simple.JSONValue;
+
+@Provider
+public class DRPCExceptionMapper implements ExceptionMapper<DRPCExecutionException> {
+
+ @Override
+ public Response toResponse(DRPCExecutionException ex) {
+ ResponseBuilder builder = Response.status(500);
+ switch (ex.get_type()) {
+ case FAILED_REQUEST:
+ builder.status(400);
+ break;
+ case SERVER_SHUTDOWN:
+ builder.status(503); //Not available
+ break;
+ case SERVER_TIMEOUT:
+ builder.status(504); //proxy timeout
+ break;
+ case INTERNAL_ERROR:
+ //fall throw on purpose
+ default:
+ //Empty (Still 500)
+ break;
+
+ }
+ Map<String, String> body = new HashMap<>();
+ //TODO I would love to standardize this...
+ body.put("error", ex.is_set_type() ? ex.get_type().toString() : "Internal Error");
+ body.put("errorMessage", ex.get_msg());
+ return builder.entity(JSONValue.toJSONString(body)).type("application/json").build();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCResource.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCResource.java b/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCResource.java
new file mode 100644
index 0000000..d6490af
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/DRPCResource.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.drpc.webapp;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.core.Context;
+
+import org.apache.storm.daemon.drpc.DRPC;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.thrift.TException;
+
+import com.codahale.metrics.Meter;
+
+@Path("/drpc/")
+public class DRPCResource {
+ private static final Meter meterHttpRequests = StormMetricsRegistry.registerMeter("drpc:num-execute-http-requests");
+ private final DRPC _drpc;
+ public DRPCResource(DRPC drpc) {
+ _drpc = drpc;
+ }
+
+ //TODO put in some better exception mapping...
+ //TODO move populateContext to a filter...
+ @POST
+ @Path("/{func}")
+ public String post(@PathParam("func") String func, String args, @Context HttpServletRequest request) throws TException {
+ meterHttpRequests.mark();
+ return _drpc.executeBlocking(func, args);
+ }
+
+ @GET
+ @Path("/{func}/{args}")
+ public String get(@PathParam("func") String func, @PathParam("args") String args, @Context HttpServletRequest request) throws TException {
+ meterHttpRequests.mark();
+ return _drpc.executeBlocking(func, args);
+ }
+
+ @GET
+ @Path("/{func}")
+ public String get(@PathParam("func") String func, @Context HttpServletRequest request) throws TException {
+ meterHttpRequests.mark();
+ return _drpc.executeBlocking(func, "");
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/ReqContextFilter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/ReqContextFilter.java b/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/ReqContextFilter.java
new file mode 100644
index 0000000..521901a
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/drpc/webapp/ReqContextFilter.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.drpc.webapp;
+import java.io.IOException;
+
+import javax.servlet.Filter;
+import javax.servlet.FilterChain;
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.storm.security.auth.IHttpCredentialsPlugin;
+import org.apache.storm.security.auth.ReqContext;
+
+public class ReqContextFilter implements Filter {
+ private final IHttpCredentialsPlugin _httpCredsHandler;
+
+ public ReqContextFilter(IHttpCredentialsPlugin httpCredsHandler) {
+ _httpCredsHandler = httpCredsHandler;
+ }
+
+ /**
+ * Populate the Storm RequestContext from an servlet request. This should be called in each handler
+ * @param request the request to populate
+ */
+ public void populateContext(HttpServletRequest request) {
+ if (_httpCredsHandler != null) {
+ _httpCredsHandler.populateContext(ReqContext.context(), request);
+ }
+ }
+
+ public void init(FilterConfig config) throws ServletException {
+ //NOOP
+ //We could add in configs through the web.xml if we wanted something stand alone here...
+ }
+
+ public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
+ handle((HttpServletRequest)request, (HttpServletResponse)response, chain);
+ }
+
+ public void handle(HttpServletRequest request, HttpServletResponse response, FilterChain chain) throws IOException, ServletException{
+ if (request != null) {
+ populateContext(request);
+ }
+ chain.doFilter(request, response);
+ }
+
+ public void destroy() {
+ //NOOP
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/drpc/DRPCInvocationsClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/drpc/DRPCInvocationsClient.java b/storm-core/src/jvm/org/apache/storm/drpc/DRPCInvocationsClient.java
index 9d01e81..956999e 100644
--- a/storm-core/src/jvm/org/apache/storm/drpc/DRPCInvocationsClient.java
+++ b/storm-core/src/jvm/org/apache/storm/drpc/DRPCInvocationsClient.java
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.storm.generated.DRPCRequest;
import org.apache.storm.generated.DistributedRPCInvocations;
import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.DRPCExecutionException;
import org.apache.storm.security.auth.ThriftClient;
import org.apache.storm.security.auth.ThriftConnectionType;
import org.apache.thrift.transport.TTransportException;
@@ -110,4 +111,20 @@ public class DRPCInvocationsClient extends ThriftClient implements DistributedRP
public DistributedRPCInvocations.Client getClient() {
return client.get();
}
+
+ @Override
+ public void failRequestV2(String id, DRPCExecutionException ex) throws AuthorizationException, TException {
+ DistributedRPCInvocations.Client c = client.get();
+ try {
+ if (c == null) {
+ throw new TException("Client is not connected...");
+ }
+ c.failRequestV2(id, ex);
+ } catch(AuthorizationException aze) {
+ throw aze;
+ } catch(TException e) {
+ client.compareAndSet(c, null);
+ throw e;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/generated/DRPCExceptionType.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/generated/DRPCExceptionType.java b/storm-core/src/jvm/org/apache/storm/generated/DRPCExceptionType.java
new file mode 100644
index 0000000..2ccf1b1
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/generated/DRPCExceptionType.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.3)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package org.apache.storm.generated;
+
+
+import java.util.Map;
+import java.util.HashMap;
+import org.apache.thrift.TEnum;
+
+public enum DRPCExceptionType implements org.apache.thrift.TEnum {
+ INTERNAL_ERROR(0),
+ SERVER_SHUTDOWN(1),
+ SERVER_TIMEOUT(2),
+ FAILED_REQUEST(3);
+
+ private final int value;
+
+ private DRPCExceptionType(int value) {
+ this.value = value;
+ }
+
+ /**
+ * Get the integer value of this enum value, as defined in the Thrift IDL.
+ */
+ public int getValue() {
+ return value;
+ }
+
+ /**
+ * Find a the enum type by its integer value, as defined in the Thrift IDL.
+ * @return null if the value is not found.
+ */
+ public static DRPCExceptionType findByValue(int value) {
+ switch (value) {
+ case 0:
+ return INTERNAL_ERROR;
+ case 1:
+ return SERVER_SHUTDOWN;
+ case 2:
+ return SERVER_TIMEOUT;
+ case 3:
+ return FAILED_REQUEST;
+ default:
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/generated/DRPCExecutionException.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/generated/DRPCExecutionException.java b/storm-core/src/jvm/org/apache/storm/generated/DRPCExecutionException.java
index c7ba94f..fd701ff 100644
--- a/storm-core/src/jvm/org/apache/storm/generated/DRPCExecutionException.java
+++ b/storm-core/src/jvm/org/apache/storm/generated/DRPCExecutionException.java
@@ -56,6 +56,7 @@ public class DRPCExecutionException extends TException implements org.apache.thr
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("DRPCExecutionException");
private static final org.apache.thrift.protocol.TField MSG_FIELD_DESC = new org.apache.thrift.protocol.TField("msg", org.apache.thrift.protocol.TType.STRING, (short)1);
+ private static final org.apache.thrift.protocol.TField TYPE_FIELD_DESC = new org.apache.thrift.protocol.TField("type", org.apache.thrift.protocol.TType.I32, (short)2);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
@@ -64,10 +65,16 @@ public class DRPCExecutionException extends TException implements org.apache.thr
}
private String msg; // required
+ private DRPCExceptionType type; // optional
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
- MSG((short)1, "msg");
+ MSG((short)1, "msg"),
+ /**
+ *
+ * @see DRPCExceptionType
+ */
+ TYPE((short)2, "type");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -84,6 +91,8 @@ public class DRPCExecutionException extends TException implements org.apache.thr
switch(fieldId) {
case 1: // MSG
return MSG;
+ case 2: // TYPE
+ return TYPE;
default:
return null;
}
@@ -124,11 +133,14 @@ public class DRPCExecutionException extends TException implements org.apache.thr
}
// isset id assignments
+ private static final _Fields optionals[] = {_Fields.TYPE};
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
tmpMap.put(_Fields.MSG, new org.apache.thrift.meta_data.FieldMetaData("msg", org.apache.thrift.TFieldRequirementType.REQUIRED,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.TYPE, new org.apache.thrift.meta_data.FieldMetaData("type", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, DRPCExceptionType.class)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(DRPCExecutionException.class, metaDataMap);
}
@@ -150,6 +162,9 @@ public class DRPCExecutionException extends TException implements org.apache.thr
if (other.is_set_msg()) {
this.msg = other.msg;
}
+ if (other.is_set_type()) {
+ this.type = other.type;
+ }
}
public DRPCExecutionException deepCopy() {
@@ -159,6 +174,7 @@ public class DRPCExecutionException extends TException implements org.apache.thr
@Override
public void clear() {
this.msg = null;
+ this.type = null;
}
public String get_msg() {
@@ -184,6 +200,37 @@ public class DRPCExecutionException extends TException implements org.apache.thr
}
}
+ /**
+ *
+ * @see DRPCExceptionType
+ */
+ public DRPCExceptionType get_type() {
+ return this.type;
+ }
+
+ /**
+ *
+ * @see DRPCExceptionType
+ */
+ public void set_type(DRPCExceptionType type) {
+ this.type = type;
+ }
+
+ public void unset_type() {
+ this.type = null;
+ }
+
+ /** Returns true if field type is set (has been assigned a value) and false otherwise */
+ public boolean is_set_type() {
+ return this.type != null;
+ }
+
+ public void set_type_isSet(boolean value) {
+ if (!value) {
+ this.type = null;
+ }
+ }
+
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case MSG:
@@ -194,6 +241,14 @@ public class DRPCExecutionException extends TException implements org.apache.thr
}
break;
+ case TYPE:
+ if (value == null) {
+ unset_type();
+ } else {
+ set_type((DRPCExceptionType)value);
+ }
+ break;
+
}
}
@@ -202,6 +257,9 @@ public class DRPCExecutionException extends TException implements org.apache.thr
case MSG:
return get_msg();
+ case TYPE:
+ return get_type();
+
}
throw new IllegalStateException();
}
@@ -215,6 +273,8 @@ public class DRPCExecutionException extends TException implements org.apache.thr
switch (field) {
case MSG:
return is_set_msg();
+ case TYPE:
+ return is_set_type();
}
throw new IllegalStateException();
}
@@ -241,6 +301,15 @@ public class DRPCExecutionException extends TException implements org.apache.thr
return false;
}
+ boolean this_present_type = true && this.is_set_type();
+ boolean that_present_type = true && that.is_set_type();
+ if (this_present_type || that_present_type) {
+ if (!(this_present_type && that_present_type))
+ return false;
+ if (!this.type.equals(that.type))
+ return false;
+ }
+
return true;
}
@@ -253,6 +322,11 @@ public class DRPCExecutionException extends TException implements org.apache.thr
if (present_msg)
list.add(msg);
+ boolean present_type = true && (is_set_type());
+ list.add(present_type);
+ if (present_type)
+ list.add(type.getValue());
+
return list.hashCode();
}
@@ -274,6 +348,16 @@ public class DRPCExecutionException extends TException implements org.apache.thr
return lastComparison;
}
}
+ lastComparison = Boolean.valueOf(is_set_type()).compareTo(other.is_set_type());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_type()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.type, other.type);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -301,6 +385,16 @@ public class DRPCExecutionException extends TException implements org.apache.thr
sb.append(this.msg);
}
first = false;
+ if (is_set_type()) {
+ if (!first) sb.append(", ");
+ sb.append("type:");
+ if (this.type == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.type);
+ }
+ first = false;
+ }
sb.append(")");
return sb.toString();
}
@@ -356,6 +450,14 @@ public class DRPCExecutionException extends TException implements org.apache.thr
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
+ case 2: // TYPE
+ if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
+ struct.type = org.apache.storm.generated.DRPCExceptionType.findByValue(iprot.readI32());
+ struct.set_type_isSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
@@ -374,6 +476,13 @@ public class DRPCExecutionException extends TException implements org.apache.thr
oprot.writeString(struct.msg);
oprot.writeFieldEnd();
}
+ if (struct.type != null) {
+ if (struct.is_set_type()) {
+ oprot.writeFieldBegin(TYPE_FIELD_DESC);
+ oprot.writeI32(struct.type.getValue());
+ oprot.writeFieldEnd();
+ }
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -392,6 +501,14 @@ public class DRPCExecutionException extends TException implements org.apache.thr
public void write(org.apache.thrift.protocol.TProtocol prot, DRPCExecutionException struct) throws org.apache.thrift.TException {
TTupleProtocol oprot = (TTupleProtocol) prot;
oprot.writeString(struct.msg);
+ BitSet optionals = new BitSet();
+ if (struct.is_set_type()) {
+ optionals.set(0);
+ }
+ oprot.writeBitSet(optionals, 1);
+ if (struct.is_set_type()) {
+ oprot.writeI32(struct.type.getValue());
+ }
}
@Override
@@ -399,6 +516,11 @@ public class DRPCExecutionException extends TException implements org.apache.thr
TTupleProtocol iprot = (TTupleProtocol) prot;
struct.msg = iprot.readString();
struct.set_msg_isSet(true);
+ BitSet incoming = iprot.readBitSet(1);
+ if (incoming.get(0)) {
+ struct.type = org.apache.storm.generated.DRPCExceptionType.findByValue(iprot.readI32());
+ struct.set_type_isSet(true);
+ }
}
}