You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/05/21 18:07:06 UTC
[11/14] STORM-216: Added Authentication and Authorization.
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/ConfigValidation.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/ConfigValidation.java b/storm-core/src/jvm/backtype/storm/ConfigValidation.java
index 3accb82..e990921 100644
--- a/storm-core/src/jvm/backtype/storm/ConfigValidation.java
+++ b/storm-core/src/jvm/backtype/storm/ConfigValidation.java
@@ -18,6 +18,8 @@
package backtype.storm;
import java.util.Map;
+import java.util.Map;
+
/**
* Provides functionality for validating configuration fields.
*/
@@ -35,51 +37,147 @@ public class ConfigValidation {
*/
public void validateField(String name, Object field) throws IllegalArgumentException;
}
+
+ /**
+ * Declares a method for validating configuration values that is nestable.
+ */
+ public static abstract class NestableFieldValidator implements FieldValidator {
+ @Override
+ public void validateField(String name, Object field) throws IllegalArgumentException {
+ validateField(null, name, field);
+ }
+
+ /**
+ * Validates the given field.
+ * @param pd describes the parent wrapping this validator.
+ * @param name the name of the field.
+ * @param field The field to be validated.
+ * @throws IllegalArgumentException if the field fails validation.
+ */
+ public abstract void validateField(String pd, String name, Object field) throws IllegalArgumentException;
+ }
/**
- * Returns a new FieldValidator for a List of the given Class.
+ * Returns a new NestableFieldValidator for a given class.
+ * @param cls the Class the field should be a type of
+ * @param nullAllowed whether or not a value of null is valid
+ * @return a NestableFieldValidator for that class
+ */
+ public static NestableFieldValidator fv(final Class cls, final boolean nullAllowed) {
+ return new NestableFieldValidator() {
+ @Override
+ public void validateField(String pd, String name, Object field)
+ throws IllegalArgumentException {
+ if (nullAllowed && field == null) {
+ return;
+ }
+ if (! cls.isInstance(field)) {
+ throw new IllegalArgumentException(
+ pd + name + " must be a " + cls.getName() + ". ("+field+")");
+ }
+ }
+ };
+ }
+
+ /**
+ * Returns a new NestableFieldValidator for a List of the given Class.
* @param cls the Class of elements composing the list
- * @return a FieldValidator for a list of the given class
+ * @param nullAllowed whether or not a value of null is valid
+ * @return a NestableFieldValidator for a list of the given class
*/
- static FieldValidator FieldListValidatorFactory(final Class cls) {
- return new FieldValidator() {
+ public static NestableFieldValidator listFv(Class cls, boolean nullAllowed) {
+ return listFv(fv(cls, false), nullAllowed);
+ }
+
+ /**
+ * Returns a new NestableFieldValidator for a List where each item is validated by validator.
+ * @param validator used to validate each item in the list
+ * @param nullAllowed whether or not a value of null is valid
+ * @return a NestableFieldValidator for a list with each item validated by a different validator.
+ */
+ public static NestableFieldValidator listFv(final NestableFieldValidator validator,
+ final boolean nullAllowed) {
+ return new NestableFieldValidator() {
@Override
- public void validateField(String name, Object field)
+ public void validateField(String pd, String name, Object field)
throws IllegalArgumentException {
- if (field == null) {
- // A null value is acceptable.
+ if (nullAllowed && field == null) {
return;
}
if (field instanceof Iterable) {
for (Object e : (Iterable)field) {
- if (! cls.isInstance(e)) {
- throw new IllegalArgumentException(
- "Each element of the list " + name + " must be a " +
- cls.getName() + ".");
- }
+ validator.validateField(pd + "Each element of the list ", name, e);
}
return;
}
throw new IllegalArgumentException(
- "Field " + name + " must be an Iterable of " + cls.getName());
+ "Field " + name + " must be an Iterable but was " +
+ ((field == null) ? "null" : ("a " + field.getClass())));
}
};
}
/**
+ * Returns a new NestableFieldValidator for a Map of key to val.
+ * @param key the Class of keys in the map
+ * @param val the Class of values in the map
+ * @param nullAllowed whether or not a value of null is valid
+ * @return a NestableFieldValidator for a Map of key to val
+ */
+ public static NestableFieldValidator mapFv(Class key, Class val,
+ boolean nullAllowed) {
+ return mapFv(fv(key, false), fv(val, false), nullAllowed);
+ }
+
+ /**
+ * Returns a new NestableFieldValidator for a Map.
+ * @param key a validator for the keys in the map
+ * @param val a validator for the values in the map
+ * @param nullAllowed whether or not a value of null is valid
+ * @return a NestableFieldValidator for a Map
+ */
+ public static NestableFieldValidator mapFv(final NestableFieldValidator key,
+ final NestableFieldValidator val, final boolean nullAllowed) {
+ return new NestableFieldValidator() {
+ @SuppressWarnings("unchecked")
+ @Override
+ public void validateField(String pd, String name, Object field)
+ throws IllegalArgumentException {
+ if (nullAllowed && field == null) {
+ return;
+ }
+ if (field instanceof Map) {
+ for (Map.Entry<Object, Object> entry: ((Map<Object, Object>)field).entrySet()) {
+ key.validateField("Each key of the map ", name, entry.getKey());
+ val.validateField("Each value in the map ", name, entry.getValue());
+ }
+ return;
+ }
+ throw new IllegalArgumentException(
+ "Field " + name + " must be a Map");
+ }
+ };
+ }
+
+ /**
* Validates a list of Numbers.
*/
- public static Object NumbersValidator = FieldListValidatorFactory(Number.class);
+ public static Object NumbersValidator = listFv(Number.class, true);
/**
- * Validates is a list of Strings.
+ * Validates a list of Strings.
+ */
+ public static Object StringsValidator = listFv(String.class, true);
+
+ /**
+ * Validates a map of Strings to Numbers.
*/
- public static Object StringsValidator = FieldListValidatorFactory(String.class);
+ public static Object MapOfStringToNumberValidator = mapFv(String.class, Number.class, true);
/**
* Validates is a list of Maps.
*/
- public static Object MapsValidator = FieldListValidatorFactory(Map.class);
+ public static Object MapsValidator = listFv(Map.class, true);
/**
* Validates a power of 2.
@@ -105,6 +203,28 @@ public class ConfigValidation {
};
/**
+ * Validates a positive integer.
+ */
+ public static Object PositiveIntegerValidator = new FieldValidator() {
+ @Override
+ public void validateField(String name, Object o) throws IllegalArgumentException {
+ if (o == null) {
+ // A null value is acceptable.
+ return;
+ }
+ final long i;
+ if (o instanceof Number &&
+ (i = ((Number)o).longValue()) == ((Number)o).doubleValue())
+ {
+ if (i > 0) {
+ return;
+ }
+ }
+ throw new IllegalArgumentException("Field " + name + " must be a positive integer.");
+ }
+ };
+
+ /**
* Validates Kryo Registration
*/
public static Object KryoRegValidator = new FieldValidator() {
@@ -141,7 +261,7 @@ public class ConfigValidation {
*/
public static Object StringOrStringListValidator = new FieldValidator() {
- private FieldValidator fv = FieldListValidatorFactory(String.class);
+ private FieldValidator fv = listFv(String.class, false);
@Override
public void validateField(String name, Object o) throws IllegalArgumentException {
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/Constants.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Constants.java b/storm-core/src/jvm/backtype/storm/Constants.java
index 39d3ffa..35c252f 100644
--- a/storm-core/src/jvm/backtype/storm/Constants.java
+++ b/storm-core/src/jvm/backtype/storm/Constants.java
@@ -31,5 +31,6 @@ public class Constants {
public static final String METRICS_COMPONENT_ID_PREFIX = "__metrics";
public static final String METRICS_STREAM_ID = "__metrics";
public static final String METRICS_TICK_STREAM_ID = "__metrics_tick";
+ public static final String CREDENTIALS_CHANGED_STREAM_ID = "__credentials";
}
-
\ No newline at end of file
+
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/ICredentialsListener.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/ICredentialsListener.java b/storm-core/src/jvm/backtype/storm/ICredentialsListener.java
new file mode 100644
index 0000000..1a7bc1b
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/ICredentialsListener.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm;
+
+import java.util.Map;
+
+/**
+ * Allows a bolt or a spout to be informed when the credentials of the topology have changed.
+ */
+public interface ICredentialsListener {
+ /**
+ * Called when the credentials of a topology have changed.
+ * @param credentials the new credentials, could be null.
+ */
+ public void setCredentials(Map<String,String> credentials);
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/ILocalCluster.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/ILocalCluster.java b/storm-core/src/jvm/backtype/storm/ILocalCluster.java
index 818dfb0..7d5aa35 100644
--- a/storm-core/src/jvm/backtype/storm/ILocalCluster.java
+++ b/storm-core/src/jvm/backtype/storm/ILocalCluster.java
@@ -26,6 +26,7 @@ import backtype.storm.generated.NotAliveException;
import backtype.storm.generated.RebalanceOptions;
import backtype.storm.generated.StormTopology;
import backtype.storm.generated.TopologyInfo;
+import backtype.storm.generated.Credentials;
import java.util.Map;
@@ -33,6 +34,7 @@ import java.util.Map;
public interface ILocalCluster {
void submitTopology(String topologyName, Map conf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException;
void submitTopologyWithOpts(String topologyName, Map conf, StormTopology topology, SubmitOptions submitOpts) throws AlreadyAliveException, InvalidTopologyException;
+ void uploadNewCredentials(String topologyName, Credentials creds);
void killTopology(String topologyName) throws NotAliveException;
void killTopologyWithOpts(String name, KillOptions options) throws NotAliveException;
void activate(String topologyName) throws NotAliveException;
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/StormSubmitter.java b/storm-core/src/jvm/backtype/storm/StormSubmitter.java
index 5dfb34b..d5da103 100644
--- a/storm-core/src/jvm/backtype/storm/StormSubmitter.java
+++ b/storm-core/src/jvm/backtype/storm/StormSubmitter.java
@@ -19,6 +19,8 @@ package backtype.storm;
import java.io.File;
import java.nio.ByteBuffer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import java.util.HashMap;
import java.util.Map;
@@ -28,13 +30,9 @@ import org.json.simple.JSONValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import backtype.storm.generated.AlreadyAliveException;
-import backtype.storm.generated.ClusterSummary;
-import backtype.storm.generated.InvalidTopologyException;
-import backtype.storm.generated.Nimbus;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.generated.SubmitOptions;
-import backtype.storm.generated.TopologySummary;
+import backtype.storm.security.auth.IAutoCredentials;
+import backtype.storm.security.auth.AuthUtils;
+import backtype.storm.generated.*;
import backtype.storm.utils.BufferFileInputStream;
import backtype.storm.utils.NimbusClient;
import backtype.storm.utils.Utils;
@@ -49,12 +47,99 @@ public class StormSubmitter {
private static final int THRIFT_CHUNK_SIZE_BYTES = 307200;
- private static Nimbus.Iface localNimbus = null;
+ private static ILocalCluster localNimbus = null;
- public static void setLocalNimbus(Nimbus.Iface localNimbusHandler) {
+ public static void setLocalNimbus(ILocalCluster localNimbusHandler) {
StormSubmitter.localNimbus = localNimbusHandler;
}
+ private static String generateZookeeperDigestSecretPayload() {
+ return Utils.secureRandomLong() + ":" + Utils.secureRandomLong();
+ }
+
+ public static final Pattern zkDigestPattern = Pattern.compile("\\S+:\\S+");
+
+ public static boolean validateZKDigestPayload(String payload) {
+ if (payload != null) {
+ Matcher m = zkDigestPattern.matcher(payload);
+ return m.matches();
+ }
+ return false;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static Map prepareZookeeperAuthentication(Map conf) {
+ Map toRet = new HashMap();
+
+ // Is the topology ZooKeeper authentication configuration unset?
+ if (! conf.containsKey(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD) ||
+ conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD) == null ||
+ ! validateZKDigestPayload((String)
+ conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD))) {
+
+ String secretPayload = generateZookeeperDigestSecretPayload();
+ toRet.put(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD, secretPayload);
+ LOG.info("Generated ZooKeeper secret payload for MD5-digest: " + secretPayload);
+ }
+
+ // This should always be set to digest.
+ toRet.put(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME, "digest");
+
+ return toRet;
+ }
+
+ private static Map<String,String> populateCredentials(Map conf, Map<String, String> creds) {
+ Map<String,String> ret = new HashMap<String,String>();
+ for (IAutoCredentials autoCred: AuthUtils.GetAutoCredentials(conf)) {
+ LOG.info("Running "+autoCred);
+ autoCred.populateCredentials(ret);
+ }
+ if (creds != null) {
+ ret.putAll(creds);
+ }
+ return ret;
+ }
+
+ /**
+ * Push a new set of credentials to the running topology.
+ * @param name the name of the topology to push credentials to.
+ * @param stormConf the topology-specific configuration, if desired. See {@link Config}.
+ * @param credentials the credentials to push.
+ * @throws AuthorizationException if you are not authorized ot push credentials.
+ * @throws NotAliveException if the topology is not alive
+ * @throws InvalidTopologyException if any other error happens
+ */
+ public static void pushCredentials(String name, Map stormConf, Map<String, String> credentials)
+ throws AuthorizationException, NotAliveException, InvalidTopologyException {
+ stormConf = new HashMap(stormConf);
+ stormConf.putAll(Utils.readCommandLineOpts());
+ Map conf = Utils.readStormConfig();
+ conf.putAll(stormConf);
+ Map<String,String> fullCreds = populateCredentials(conf, credentials);
+ if (fullCreds.isEmpty()) {
+ LOG.warn("No credentials were found to push to "+name);
+ return;
+ }
+ try {
+ if(localNimbus!=null) {
+ LOG.info("Pushing Credentials to topology " + name + " in local mode");
+ localNimbus.uploadNewCredentials(name, new Credentials(fullCreds));
+ } else {
+ NimbusClient client = NimbusClient.getConfiguredClient(conf);
+ try {
+ LOG.info("Uploading new credentials to " + name);
+ client.getClient().uploadNewCredentials(name, new Credentials(fullCreds));
+ } finally {
+ client.close();
+ }
+ }
+ LOG.info("Finished submitting topology: " + name);
+ } catch(TException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+
/**
* Submits a topology to run on the cluster. A topology runs forever or until
* explicitly killed.
@@ -65,8 +150,10 @@ public class StormSubmitter {
* @param topology the processing to execute.
* @throws AlreadyAliveException if a topology with this name is already running
* @throws InvalidTopologyException if an invalid topology was submitted
+ * @throws AuthorizationException if authorization is failed
*/
- public static void submitTopology(String name, Map stormConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException {
+ public static void submitTopology(String name, Map stormConf, StormTopology topology)
+ throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
submitTopology(name, stormConf, topology, null, null);
}
@@ -82,8 +169,11 @@ public class StormSubmitter {
* @param progressListener to track the progress of the jar upload process
* @throws AlreadyAliveException if a topology with this name is already running
* @throws InvalidTopologyException if an invalid topology was submitted
+ * @throws AuthorizationException if authorization is failed
*/
- public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts, ProgressListener progressListener) throws AlreadyAliveException, InvalidTopologyException {
+ @SuppressWarnings("unchecked")
+ public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts,
+ ProgressListener progressListener) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
if(!Utils.isValidConf(stormConf)) {
throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
}
@@ -91,24 +181,45 @@ public class StormSubmitter {
stormConf.putAll(Utils.readCommandLineOpts());
Map conf = Utils.readStormConfig();
conf.putAll(stormConf);
+ stormConf.putAll(prepareZookeeperAuthentication(conf));
+
+ Map<String,String> passedCreds = new HashMap<String, String>();
+ if (opts != null) {
+ Credentials tmpCreds = opts.get_creds();
+ if (tmpCreds != null) {
+ passedCreds = tmpCreds.get_creds();
+ }
+ }
+ Map<String,String> fullCreds = populateCredentials(conf, passedCreds);
+ if (!fullCreds.isEmpty()) {
+ if (opts == null) {
+ opts = new SubmitOptions(TopologyInitialStatus.ACTIVE);
+ }
+ opts.set_creds(new Credentials(fullCreds));
+ }
try {
- String serConf = JSONValue.toJSONString(stormConf);
if(localNimbus!=null) {
LOG.info("Submitting topology " + name + " in local mode");
- localNimbus.submitTopology(name, null, serConf, topology);
+ if(opts!=null) {
+ localNimbus.submitTopologyWithOpts(name, stormConf, topology, opts);
+ } else {
+ // this is for backwards compatibility
+ localNimbus.submitTopology(name, stormConf, topology);
+ }
} else {
+ String serConf = JSONValue.toJSONString(stormConf);
NimbusClient client = NimbusClient.getConfiguredClient(conf);
if(topologyNameExists(conf, name)) {
throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
}
- submitJar(conf, progressListener);
+ String jar = submitJar(conf, progressListener);
try {
LOG.info("Submitting topology " + name + " in distributed mode with conf " + serConf);
if(opts!=null) {
- client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts);
+ client.getClient().submitTopologyWithOpts(name, jar, serConf, topology, opts);
} else {
// this is for backwards compatibility
- client.getClient().submitTopology(name, submittedJar, serConf, topology);
+ client.getClient().submitTopology(name, jar, serConf, topology);
}
} catch(InvalidTopologyException e) {
LOG.warn("Topology submission exception: "+e.get_msg());
@@ -136,9 +247,10 @@ public class StormSubmitter {
* @param topology the processing to execute.
* @throws AlreadyAliveException if a topology with this name is already running
* @throws InvalidTopologyException if an invalid topology was submitted
+ * @throws AuthorizationException if authorization is failed
*/
- public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException {
+ public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
submitTopologyWithProgressBar(name, stormConf, topology, null);
}
@@ -153,9 +265,10 @@ public class StormSubmitter {
* @param opts to manipulate the starting of the topology
* @throws AlreadyAliveException if a topology with this name is already running
* @throws InvalidTopologyException if an invalid topology was submitted
+ * @throws AuthorizationException if authorization is failed
*/
- public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException {
+ public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
// show a progress bar so we know we're not stuck (especially on slow connections)
submitTopology(name, stormConf, topology, opts, new StormSubmitter.ProgressListener() {
@Override
@@ -198,16 +311,8 @@ public class StormSubmitter {
}
}
- private static String submittedJar = null;
-
- private static void submitJar(Map conf, ProgressListener listener) {
- if(submittedJar==null) {
- LOG.info("Jar not uploaded to master yet. Submitting jar...");
- String localJar = System.getProperty("storm.jar");
- submittedJar = submitJar(conf, localJar, listener);
- } else {
- LOG.info("Jar already uploaded to master. Not submitting jar.");
- }
+ private static String submitJar(Map conf, ProgressListener listener) {
+ return submitJar(conf, System.getProperty("storm.jar"), listener);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/drpc/DRPCInvocationsClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/drpc/DRPCInvocationsClient.java b/storm-core/src/jvm/backtype/storm/drpc/DRPCInvocationsClient.java
index fadebf6..987cde0 100644
--- a/storm-core/src/jvm/backtype/storm/drpc/DRPCInvocationsClient.java
+++ b/storm-core/src/jvm/backtype/storm/drpc/DRPCInvocationsClient.java
@@ -17,36 +17,33 @@
*/
package backtype.storm.drpc;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
import backtype.storm.generated.DRPCRequest;
import backtype.storm.generated.DistributedRPCInvocations;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.security.auth.ThriftClient;
+import backtype.storm.security.auth.ThriftConnectionType;
+import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class DRPCInvocationsClient implements DistributedRPCInvocations.Iface {
- private TTransport conn;
- private DistributedRPCInvocations.Client client;
+public class DRPCInvocationsClient extends ThriftClient implements DistributedRPCInvocations.Iface {
+ public static Logger LOG = LoggerFactory.getLogger(DRPCInvocationsClient.class);
+ private final AtomicReference<DistributedRPCInvocations.Client> client =
+ new AtomicReference<DistributedRPCInvocations.Client>();
private String host;
- private int port;
+ private int port;
- public DRPCInvocationsClient(String host, int port) {
- try {
- this.host = host;
- this.port = port;
- connect();
- } catch(TException e) {
- throw new RuntimeException(e);
- }
- }
-
- private void connect() throws TException {
- conn = new TFramedTransport(new TSocket(host, port));
- client = new DistributedRPCInvocations.Client(new TBinaryProtocol(conn));
- conn.open();
+ public DRPCInvocationsClient(Map conf, String host, int port) throws TTransportException {
+ super(conf, ThriftConnectionType.DRPC_INVOCATIONS, host, port, null);
+ this.host = host;
+ this.port = port;
+ client.set(new DistributedRPCInvocations.Client(_protocol));
}
-
+
public String getHost() {
return host;
}
@@ -55,37 +52,57 @@ public class DRPCInvocationsClient implements DistributedRPCInvocations.Iface {
return port;
}
- public void result(String id, String result) throws TException {
+ public void reconnectClient() throws TException {
+ if (client.get() == null) {
+ reconnect();
+ client.set(new DistributedRPCInvocations.Client(_protocol));
+ }
+ }
+
+ public boolean isConnected() {
+ return client.get() != null;
+ }
+
+ public void result(String id, String result) throws TException, AuthorizationException {
+ DistributedRPCInvocations.Client c = client.get();
try {
- if(client==null) connect();
- client.result(id, result);
+ if (c == null) {
+ throw new TException("Client is not connected...");
+ }
+ c.result(id, result);
} catch(TException e) {
- client = null;
+ client.compareAndSet(c, null);
throw e;
}
}
- public DRPCRequest fetchRequest(String func) throws TException {
+ public DRPCRequest fetchRequest(String func) throws TException, AuthorizationException {
+ DistributedRPCInvocations.Client c = client.get();
try {
- if(client==null) connect();
- return client.fetchRequest(func);
+ if (c == null) {
+ throw new TException("Client is not connected...");
+ }
+ return c.fetchRequest(func);
} catch(TException e) {
- client = null;
+ client.compareAndSet(c, null);
throw e;
}
}
- public void failRequest(String id) throws TException {
+ public void failRequest(String id) throws TException, AuthorizationException {
+ DistributedRPCInvocations.Client c = client.get();
try {
- if(client==null) connect();
- client.failRequest(id);
+ if (c == null) {
+ throw new TException("Client is not connected...");
+ }
+ c.failRequest(id);
} catch(TException e) {
- client = null;
+ client.compareAndSet(c, null);
throw e;
}
}
- public void close() {
- conn.close();
+ public DistributedRPCInvocations.Client getClient() {
+ return client.get();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/drpc/DRPCSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/drpc/DRPCSpout.java b/storm-core/src/jvm/backtype/storm/drpc/DRPCSpout.java
index 918cbc0..82fd6cd 100644
--- a/storm-core/src/jvm/backtype/storm/drpc/DRPCSpout.java
+++ b/storm-core/src/jvm/backtype/storm/drpc/DRPCSpout.java
@@ -21,6 +21,7 @@ import backtype.storm.Config;
import backtype.storm.ILocalDRPC;
import backtype.storm.generated.DRPCRequest;
import backtype.storm.generated.DistributedRPCInvocations;
+import backtype.storm.generated.AuthorizationException;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
@@ -31,18 +32,30 @@ import backtype.storm.utils.ServiceRegistry;
import backtype.storm.utils.Utils;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
+import java.util.LinkedList;
import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.Callable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
import org.json.simple.JSONValue;
public class DRPCSpout extends BaseRichSpout {
+ //ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE PROBLEMS
+ static final long serialVersionUID = 2387848310969237877L;
+
public static Logger LOG = LoggerFactory.getLogger(DRPCSpout.class);
SpoutOutputCollector _collector;
List<DRPCInvocationsClient> _clients = new ArrayList<DRPCInvocationsClient>();
+ transient LinkedList<Future<Void>> _futures = null;
+ transient ExecutorService _backround = null;
String _function;
String _local_drpc_id = null;
@@ -65,11 +78,60 @@ public class DRPCSpout extends BaseRichSpout {
_function = function;
_local_drpc_id = drpc.getServiceId();
}
-
+
+ private class Adder implements Callable<Void> {
+ private String server;
+ private int port;
+ private Map conf;
+
+ public Adder(String server, int port, Map conf) {
+ this.server = server;
+ this.port = port;
+ this.conf = conf;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ DRPCInvocationsClient c = new DRPCInvocationsClient(conf, server, port);
+ synchronized (_clients) {
+ _clients.add(c);
+ }
+ return null;
+ }
+ }
+
+ private void reconnect(final DRPCInvocationsClient c) {
+ _futures.add(_backround.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ c.reconnectClient();
+ return null;
+ }
+ }));
+ }
+
+ private void checkFutures() {
+ Iterator<Future<Void>> i = _futures.iterator();
+ while (i.hasNext()) {
+ Future<Void> f = i.next();
+ if (f.isDone()) {
+ i.remove();
+ }
+ try {
+ f.get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
if(_local_drpc_id==null) {
+ _backround = Executors.newCachedThreadPool();
+ _futures = new LinkedList<Future<Void>>();
+
int numTasks = context.getComponentTasks(context.getThisComponentId()).size();
int index = context.getThisTaskIndex();
@@ -78,13 +140,14 @@ public class DRPCSpout extends BaseRichSpout {
if(servers == null || servers.isEmpty()) {
throw new RuntimeException("No DRPC servers configured for topology");
}
- if(numTasks < servers.size()) {
- for(String s: servers) {
- _clients.add(new DRPCInvocationsClient(s, port));
+
+ if (numTasks < servers.size()) {
+ for (String s: servers) {
+ _futures.add(_backround.submit(new Adder(s, port, conf)));
}
- } else {
+ } else {
int i = index % servers.size();
- _clients.add(new DRPCInvocationsClient(servers.get(i), port));
+ _futures.add(_backround.submit(new Adder(servers.get(i), port, conf)));
}
}
@@ -101,8 +164,18 @@ public class DRPCSpout extends BaseRichSpout {
public void nextTuple() {
boolean gotRequest = false;
if(_local_drpc_id==null) {
- for(int i=0; i<_clients.size(); i++) {
- DRPCInvocationsClient client = _clients.get(i);
+ int size = 0;
+ synchronized (_clients) {
+ size = _clients.size(); //This will only ever grow, so no need to worry about falling off the end
+ }
+ for(int i=0; i<size; i++) {
+ DRPCInvocationsClient client;
+ synchronized (_clients) {
+ client = _clients.get(i);
+ }
+ if (!client.isConnected()) {
+ continue;
+ }
try {
DRPCRequest req = client.fetchRequest(_function);
if(req.get_request_id().length() > 0) {
@@ -114,10 +187,17 @@ public class DRPCSpout extends BaseRichSpout {
_collector.emit(new Values(req.get_func_args(), JSONValue.toJSONString(returnInfo)), new DRPCMessageId(req.get_request_id(), i));
break;
}
+ } catch (TException e) {
+ reconnect(client);
+ LOG.error("Failed to fetch DRPC result from DRPC server", e);
+ } catch (AuthorizationException aze) {
+ reconnect(client);
+ LOG.error("Not authorized to fetch DRPC result from DRPC server", aze);
} catch (Exception e) {
LOG.error("Failed to fetch DRPC result from DRPC server", e);
}
}
+ checkFutures();
} else {
DistributedRPCInvocations.Iface drpc = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(_local_drpc_id);
if(drpc!=null) { // can happen during shutdown of drpc while topology is still up
@@ -133,6 +213,8 @@ public class DRPCSpout extends BaseRichSpout {
}
} catch (TException e) {
throw new RuntimeException(e);
+ } catch (AuthorizationException aze) {
+ throw new RuntimeException(aze);
}
}
}
@@ -159,6 +241,8 @@ public class DRPCSpout extends BaseRichSpout {
client.failRequest(did.id);
} catch (TException e) {
LOG.error("Failed to fail request", e);
+ } catch (AuthorizationException aze) {
+ LOG.error("Not authorized to failREquest from DRPC server", aze);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/drpc/ReturnResults.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/drpc/ReturnResults.java b/storm-core/src/jvm/backtype/storm/drpc/ReturnResults.java
index 34cca98..3d50679 100644
--- a/storm-core/src/jvm/backtype/storm/drpc/ReturnResults.java
+++ b/storm-core/src/jvm/backtype/storm/drpc/ReturnResults.java
@@ -19,6 +19,7 @@ package backtype.storm.drpc;
import backtype.storm.Config;
import backtype.storm.generated.DistributedRPCInvocations;
+import backtype.storm.generated.AuthorizationException;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
@@ -33,18 +34,23 @@ import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
import org.json.simple.JSONValue;
public class ReturnResults extends BaseRichBolt {
+ //ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE PROBLEMS
+ static final long serialVersionUID = -774882142710631591L;
+
public static final Logger LOG = LoggerFactory.getLogger(ReturnResults.class);
OutputCollector _collector;
boolean local;
-
+ Map _conf;
Map<List, DRPCInvocationsClient> _clients = new HashMap<List, DRPCInvocationsClient>();
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ _conf = stormConf;
_collector = collector;
local = stormConf.get(Config.STORM_CLUSTER_MODE).equals("local");
}
@@ -68,17 +74,40 @@ public class ReturnResults extends BaseRichBolt {
}};
if(!_clients.containsKey(server)) {
- _clients.put(server, new DRPCInvocationsClient(host, port));
+ try {
+ _clients.put(server, new DRPCInvocationsClient(_conf, host, port));
+ } catch (TTransportException ex) {
+ throw new RuntimeException(ex);
+ }
}
client = _clients.get(server);
}
-
+
try {
client.result(id, result);
_collector.ack(input);
} catch(TException e) {
LOG.error("Failed to return results to DRPC server", e);
_collector.fail(input);
+ if (client instanceof DRPCInvocationsClient) {
+ try {
+ LOG.info("reconnecting... ");
+ ((DRPCInvocationsClient)client).reconnectClient(); //Blocking call
+ } catch (TException e2) {
+ throw new RuntimeException(e2);
+ }
+ }
+ } catch (AuthorizationException aze) {
+ LOG.error("Not authorized to return results to DRPC server", aze);
+ _collector.fail(input);
+ if (client instanceof DRPCInvocationsClient) {
+ try {
+ LOG.info("reconnecting... ");
+ ((DRPCInvocationsClient)client).reconnectClient(); //Blocking call
+ } catch (TException e2) {
+ throw new RuntimeException(e2);
+ }
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/generated/AuthorizationException.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/AuthorizationException.java b/storm-core/src/jvm/backtype/storm/generated/AuthorizationException.java
new file mode 100644
index 0000000..9efc9da
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/generated/AuthorizationException.java
@@ -0,0 +1,328 @@
+/**
+ * Autogenerated by Thrift Compiler (0.7.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+package backtype.storm.generated;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AuthorizationException extends Exception implements org.apache.thrift.TBase<AuthorizationException, AuthorizationException._Fields>, java.io.Serializable, Cloneable {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("AuthorizationException");
+
+ private static final org.apache.thrift.protocol.TField MSG_FIELD_DESC = new org.apache.thrift.protocol.TField("msg", org.apache.thrift.protocol.TType.STRING, (short)1);
+
+ private String msg; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ MSG((short)1, "msg");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // MSG
+ return MSG;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.MSG, new org.apache.thrift.meta_data.FieldMetaData("msg", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(AuthorizationException.class, metaDataMap);
+ }
+
+ public AuthorizationException() {
+ }
+
+ public AuthorizationException(
+ String msg)
+ {
+ this();
+ this.msg = msg;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public AuthorizationException(AuthorizationException other) {
+ if (other.is_set_msg()) {
+ this.msg = other.msg;
+ }
+ }
+
+ public AuthorizationException deepCopy() {
+ return new AuthorizationException(this);
+ }
+
+ @Override
+ public void clear() {
+ this.msg = null;
+ }
+
+ public String get_msg() {
+ return this.msg;
+ }
+
+ public void set_msg(String msg) {
+ this.msg = msg;
+ }
+
+ public void unset_msg() {
+ this.msg = null;
+ }
+
+ /** Returns true if field msg is set (has been assigned a value) and false otherwise */
+ public boolean is_set_msg() {
+ return this.msg != null;
+ }
+
+ public void set_msg_isSet(boolean value) {
+ if (!value) {
+ this.msg = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case MSG:
+ if (value == null) {
+ unset_msg();
+ } else {
+ set_msg((String)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case MSG:
+ return get_msg();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case MSG:
+ return is_set_msg();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof AuthorizationException)
+ return this.equals((AuthorizationException)that);
+ return false;
+ }
+
+ public boolean equals(AuthorizationException that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_msg = true && this.is_set_msg();
+ boolean that_present_msg = true && that.is_set_msg();
+ if (this_present_msg || that_present_msg) {
+ if (!(this_present_msg && that_present_msg))
+ return false;
+ if (!this.msg.equals(that.msg))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ HashCodeBuilder builder = new HashCodeBuilder();
+
+ boolean present_msg = true && (is_set_msg());
+ builder.append(present_msg);
+ if (present_msg)
+ builder.append(msg);
+
+ return builder.toHashCode();
+ }
+
+ public int compareTo(AuthorizationException other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+ AuthorizationException typedOther = (AuthorizationException)other;
+
+ lastComparison = Boolean.valueOf(is_set_msg()).compareTo(typedOther.is_set_msg());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_msg()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.msg, typedOther.msg);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField field;
+ iprot.readStructBegin();
+ while (true)
+ {
+ field = iprot.readFieldBegin();
+ if (field.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (field.id) {
+ case 1: // MSG
+ if (field.type == org.apache.thrift.protocol.TType.STRING) {
+ this.msg = iprot.readString();
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+ validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (this.msg != null) {
+ oprot.writeFieldBegin(MSG_FIELD_DESC);
+ oprot.writeString(this.msg);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("AuthorizationException(");
+ boolean first = true;
+
+ sb.append("msg:");
+ if (this.msg == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.msg);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ if (!is_set_msg()) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'msg' is unset! Struct:" + toString());
+ }
+
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/generated/Credentials.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/Credentials.java b/storm-core/src/jvm/backtype/storm/generated/Credentials.java
new file mode 100644
index 0000000..105cec1
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/generated/Credentials.java
@@ -0,0 +1,373 @@
+/**
+ * Autogenerated by Thrift Compiler (0.7.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+package backtype.storm.generated;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Credentials implements org.apache.thrift.TBase<Credentials, Credentials._Fields>, java.io.Serializable, Cloneable {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("Credentials");
+
+ private static final org.apache.thrift.protocol.TField CREDS_FIELD_DESC = new org.apache.thrift.protocol.TField("creds", org.apache.thrift.protocol.TType.MAP, (short)1);
+
+ private Map<String,String> creds; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ CREDS((short)1, "creds");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // CREDS
+ return CREDS;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.CREDS, new org.apache.thrift.meta_data.FieldMetaData("creds", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING),
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(Credentials.class, metaDataMap);
+ }
+
+ public Credentials() {
+ }
+
+ public Credentials(
+ Map<String,String> creds)
+ {
+ this();
+ this.creds = creds;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public Credentials(Credentials other) {
+ if (other.is_set_creds()) {
+ Map<String,String> __this__creds = new HashMap<String,String>();
+ for (Map.Entry<String, String> other_element : other.creds.entrySet()) {
+
+ String other_element_key = other_element.getKey();
+ String other_element_value = other_element.getValue();
+
+ String __this__creds_copy_key = other_element_key;
+
+ String __this__creds_copy_value = other_element_value;
+
+ __this__creds.put(__this__creds_copy_key, __this__creds_copy_value);
+ }
+ this.creds = __this__creds;
+ }
+ }
+
+ public Credentials deepCopy() {
+ return new Credentials(this);
+ }
+
+ @Override
+ public void clear() {
+ this.creds = null;
+ }
+
+ public int get_creds_size() {
+ return (this.creds == null) ? 0 : this.creds.size();
+ }
+
+ public void put_to_creds(String key, String val) {
+ if (this.creds == null) {
+ this.creds = new HashMap<String,String>();
+ }
+ this.creds.put(key, val);
+ }
+
+ public Map<String,String> get_creds() {
+ return this.creds;
+ }
+
+ public void set_creds(Map<String,String> creds) {
+ this.creds = creds;
+ }
+
+ public void unset_creds() {
+ this.creds = null;
+ }
+
+ /** Returns true if field creds is set (has been assigned a value) and false otherwise */
+ public boolean is_set_creds() {
+ return this.creds != null;
+ }
+
+ public void set_creds_isSet(boolean value) {
+ if (!value) {
+ this.creds = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case CREDS:
+ if (value == null) {
+ unset_creds();
+ } else {
+ set_creds((Map<String,String>)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case CREDS:
+ return get_creds();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case CREDS:
+ return is_set_creds();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof Credentials)
+ return this.equals((Credentials)that);
+ return false;
+ }
+
+ public boolean equals(Credentials that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_creds = true && this.is_set_creds();
+ boolean that_present_creds = true && that.is_set_creds();
+ if (this_present_creds || that_present_creds) {
+ if (!(this_present_creds && that_present_creds))
+ return false;
+ if (!this.creds.equals(that.creds))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ HashCodeBuilder builder = new HashCodeBuilder();
+
+ boolean present_creds = true && (is_set_creds());
+ builder.append(present_creds);
+ if (present_creds)
+ builder.append(creds);
+
+ return builder.toHashCode();
+ }
+
+ public int compareTo(Credentials other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+ Credentials typedOther = (Credentials)other;
+
+ lastComparison = Boolean.valueOf(is_set_creds()).compareTo(typedOther.is_set_creds());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_creds()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.creds, typedOther.creds);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField field;
+ iprot.readStructBegin();
+ while (true)
+ {
+ field = iprot.readFieldBegin();
+ if (field.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (field.id) {
+ case 1: // CREDS
+ if (field.type == org.apache.thrift.protocol.TType.MAP) {
+ {
+ org.apache.thrift.protocol.TMap _map163 = iprot.readMapBegin();
+ this.creds = new HashMap<String,String>(2*_map163.size);
+ for (int _i164 = 0; _i164 < _map163.size; ++_i164)
+ {
+ String _key165; // required
+ String _val166; // required
+ _key165 = iprot.readString();
+ _val166 = iprot.readString();
+ this.creds.put(_key165, _val166);
+ }
+ iprot.readMapEnd();
+ }
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+ validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (this.creds != null) {
+ oprot.writeFieldBegin(CREDS_FIELD_DESC);
+ {
+ oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, this.creds.size()));
+ for (Map.Entry<String, String> _iter167 : this.creds.entrySet())
+ {
+ oprot.writeString(_iter167.getKey());
+ oprot.writeString(_iter167.getValue());
+ }
+ oprot.writeMapEnd();
+ }
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("Credentials(");
+ boolean first = true;
+
+ sb.append("creds:");
+ if (this.creds == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.creds);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ if (!is_set_creds()) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'creds' is unset! Struct:" + toString());
+ }
+
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/fe5f41aa/storm-core/src/jvm/backtype/storm/generated/DistributedRPC.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/DistributedRPC.java b/storm-core/src/jvm/backtype/storm/generated/DistributedRPC.java
index 7922340..06c4f5c 100644
--- a/storm-core/src/jvm/backtype/storm/generated/DistributedRPC.java
+++ b/storm-core/src/jvm/backtype/storm/generated/DistributedRPC.java
@@ -42,7 +42,7 @@ public class DistributedRPC {
public interface Iface {
- public String execute(String functionName, String funcArgs) throws DRPCExecutionException, org.apache.thrift.TException;
+ public String execute(String functionName, String funcArgs) throws DRPCExecutionException, AuthorizationException, org.apache.thrift.TException;
}
@@ -72,7 +72,7 @@ public class DistributedRPC {
super(iprot, oprot);
}
- public String execute(String functionName, String funcArgs) throws DRPCExecutionException, org.apache.thrift.TException
+ public String execute(String functionName, String funcArgs) throws DRPCExecutionException, AuthorizationException, org.apache.thrift.TException
{
send_execute(functionName, funcArgs);
return recv_execute();
@@ -86,7 +86,7 @@ public class DistributedRPC {
sendBase("execute", args);
}
- public String recv_execute() throws DRPCExecutionException, org.apache.thrift.TException
+ public String recv_execute() throws DRPCExecutionException, AuthorizationException, org.apache.thrift.TException
{
execute_result result = new execute_result();
receiveBase(result, "execute");
@@ -96,6 +96,9 @@ public class DistributedRPC {
if (result.e != null) {
throw result.e;
}
+ if (result.aze != null) {
+ throw result.aze;
+ }
throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "execute failed: unknown result");
}
@@ -142,7 +145,7 @@ public class DistributedRPC {
prot.writeMessageEnd();
}
- public String getResult() throws DRPCExecutionException, org.apache.thrift.TException {
+ public String getResult() throws DRPCExecutionException, AuthorizationException, org.apache.thrift.TException {
if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
throw new IllegalStateException("Method call not finished!");
}
@@ -184,6 +187,8 @@ public class DistributedRPC {
result.success = iface.execute(args.functionName, args.funcArgs);
} catch (DRPCExecutionException e) {
result.e = e;
+ } catch (AuthorizationException aze) {
+ result.aze = aze;
}
return result;
}
@@ -590,14 +595,17 @@ public class DistributedRPC {
private static final org.apache.thrift.protocol.TField SUCCESS_FIELD_DESC = new org.apache.thrift.protocol.TField("success", org.apache.thrift.protocol.TType.STRING, (short)0);
private static final org.apache.thrift.protocol.TField E_FIELD_DESC = new org.apache.thrift.protocol.TField("e", org.apache.thrift.protocol.TType.STRUCT, (short)1);
+ private static final org.apache.thrift.protocol.TField AZE_FIELD_DESC = new org.apache.thrift.protocol.TField("aze", org.apache.thrift.protocol.TType.STRUCT, (short)2);
private String success; // required
private DRPCExecutionException e; // required
+ private AuthorizationException aze; // required
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
SUCCESS((short)0, "success"),
- E((short)1, "e");
+ E((short)1, "e"),
+ AZE((short)2, "aze");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -616,6 +624,8 @@ public class DistributedRPC {
return SUCCESS;
case 1: // E
return E;
+ case 2: // AZE
+ return AZE;
default:
return null;
}
@@ -664,6 +674,8 @@ public class DistributedRPC {
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
tmpMap.put(_Fields.E, new org.apache.thrift.meta_data.FieldMetaData("e", org.apache.thrift.TFieldRequirementType.DEFAULT,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRUCT)));
+ tmpMap.put(_Fields.AZE, new org.apache.thrift.meta_data.FieldMetaData("aze", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRUCT)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(execute_result.class, metaDataMap);
}
@@ -673,11 +685,13 @@ public class DistributedRPC {
public execute_result(
String success,
- DRPCExecutionException e)
+ DRPCExecutionException e,
+ AuthorizationException aze)
{
this();
this.success = success;
this.e = e;
+ this.aze = aze;
}
/**
@@ -690,6 +704,9 @@ public class DistributedRPC {
if (other.is_set_e()) {
this.e = new DRPCExecutionException(other.e);
}
+ if (other.is_set_aze()) {
+ this.aze = new AuthorizationException(other.aze);
+ }
}
public execute_result deepCopy() {
@@ -700,6 +717,7 @@ public class DistributedRPC {
public void clear() {
this.success = null;
this.e = null;
+ this.aze = null;
}
public String get_success() {
@@ -748,6 +766,29 @@ public class DistributedRPC {
}
}
+ public AuthorizationException get_aze() {
+ return this.aze;
+ }
+
+ public void set_aze(AuthorizationException aze) {
+ this.aze = aze;
+ }
+
+ public void unset_aze() {
+ this.aze = null;
+ }
+
+ /** Returns true if field aze is set (has been assigned a value) and false otherwise */
+ public boolean is_set_aze() {
+ return this.aze != null;
+ }
+
+ public void set_aze_isSet(boolean value) {
+ if (!value) {
+ this.aze = null;
+ }
+ }
+
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case SUCCESS:
@@ -766,6 +807,14 @@ public class DistributedRPC {
}
break;
+ case AZE:
+ if (value == null) {
+ unset_aze();
+ } else {
+ set_aze((AuthorizationException)value);
+ }
+ break;
+
}
}
@@ -777,6 +826,9 @@ public class DistributedRPC {
case E:
return get_e();
+ case AZE:
+ return get_aze();
+
}
throw new IllegalStateException();
}
@@ -792,6 +844,8 @@ public class DistributedRPC {
return is_set_success();
case E:
return is_set_e();
+ case AZE:
+ return is_set_aze();
}
throw new IllegalStateException();
}
@@ -827,6 +881,15 @@ public class DistributedRPC {
return false;
}
+ boolean this_present_aze = true && this.is_set_aze();
+ boolean that_present_aze = true && that.is_set_aze();
+ if (this_present_aze || that_present_aze) {
+ if (!(this_present_aze && that_present_aze))
+ return false;
+ if (!this.aze.equals(that.aze))
+ return false;
+ }
+
return true;
}
@@ -844,6 +907,11 @@ public class DistributedRPC {
if (present_e)
builder.append(e);
+ boolean present_aze = true && (is_set_aze());
+ builder.append(present_aze);
+ if (present_aze)
+ builder.append(aze);
+
return builder.toHashCode();
}
@@ -875,6 +943,16 @@ public class DistributedRPC {
return lastComparison;
}
}
+ lastComparison = Boolean.valueOf(is_set_aze()).compareTo(typedOther.is_set_aze());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_aze()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.aze, typedOther.aze);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -907,6 +985,14 @@ public class DistributedRPC {
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
}
break;
+ case 2: // AZE
+ if (field.type == org.apache.thrift.protocol.TType.STRUCT) {
+ this.aze = new AuthorizationException();
+ this.aze.read(iprot);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
}
@@ -927,6 +1013,10 @@ public class DistributedRPC {
oprot.writeFieldBegin(E_FIELD_DESC);
this.e.write(oprot);
oprot.writeFieldEnd();
+ } else if (this.is_set_aze()) {
+ oprot.writeFieldBegin(AZE_FIELD_DESC);
+ this.aze.write(oprot);
+ oprot.writeFieldEnd();
}
oprot.writeFieldStop();
oprot.writeStructEnd();
@@ -952,6 +1042,14 @@ public class DistributedRPC {
sb.append(this.e);
}
first = false;
+ if (!first) sb.append(", ");
+ sb.append("aze:");
+ if (this.aze == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.aze);
+ }
+ first = false;
sb.append(")");
return sb.toString();
}