You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/12/19 00:54:12 UTC
[2/8] storm git commit: STORM-2217: Make DRPC pure java with Jersy
and prepare for separating classpaths.
http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/generated/DistributedRPCInvocations.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/generated/DistributedRPCInvocations.java b/storm-core/src/jvm/org/apache/storm/generated/DistributedRPCInvocations.java
index d51175d..7d1c1ab 100644
--- a/storm-core/src/jvm/org/apache/storm/generated/DistributedRPCInvocations.java
+++ b/storm-core/src/jvm/org/apache/storm/generated/DistributedRPCInvocations.java
@@ -62,6 +62,8 @@ public class DistributedRPCInvocations {
public void failRequest(String id) throws AuthorizationException, org.apache.thrift.TException;
+ public void failRequestV2(String id, DRPCExecutionException e) throws AuthorizationException, org.apache.thrift.TException;
+
}
public interface AsyncIface {
@@ -72,6 +74,8 @@ public class DistributedRPCInvocations {
public void failRequest(String id, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
+ public void failRequestV2(String id, DRPCExecutionException e, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
+
}
public static class Client extends org.apache.thrift.TServiceClient implements Iface {
@@ -167,6 +171,30 @@ public class DistributedRPCInvocations {
return;
}
+ public void failRequestV2(String id, DRPCExecutionException e) throws AuthorizationException, org.apache.thrift.TException
+ {
+ send_failRequestV2(id, e);
+ recv_failRequestV2();
+ }
+
+ public void send_failRequestV2(String id, DRPCExecutionException e) throws org.apache.thrift.TException
+ {
+ failRequestV2_args args = new failRequestV2_args();
+ args.set_id(id);
+ args.set_e(e);
+ sendBase("failRequestV2", args);
+ }
+
+ public void recv_failRequestV2() throws AuthorizationException, org.apache.thrift.TException
+ {
+ failRequestV2_result result = new failRequestV2_result();
+ receiveBase(result, "failRequestV2");
+ if (result.aze != null) {
+ throw result.aze;
+ }
+ return;
+ }
+
}
public static class AsyncClient extends org.apache.thrift.async.TAsyncClient implements AsyncIface {
public static class Factory implements org.apache.thrift.async.TAsyncClientFactory<AsyncClient> {
@@ -284,6 +312,41 @@ public class DistributedRPCInvocations {
}
}
+ public void failRequestV2(String id, DRPCExecutionException e, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
+ checkReady();
+ failRequestV2_call method_call = new failRequestV2_call(id, e, resultHandler, this, ___protocolFactory, ___transport);
+ this.___currentMethod = method_call;
+ ___manager.call(method_call);
+ }
+
+ public static class failRequestV2_call extends org.apache.thrift.async.TAsyncMethodCall {
+ private String id;
+ private DRPCExecutionException e;
+ public failRequestV2_call(String id, DRPCExecutionException e, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+ super(client, protocolFactory, transport, resultHandler, false);
+ this.id = id;
+ this.e = e;
+ }
+
+ public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
+ prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("failRequestV2", org.apache.thrift.protocol.TMessageType.CALL, 0));
+ failRequestV2_args args = new failRequestV2_args();
+ args.set_id(id);
+ args.set_e(e);
+ args.write(prot);
+ prot.writeMessageEnd();
+ }
+
+ public void getResult() throws AuthorizationException, org.apache.thrift.TException {
+ if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
+ throw new IllegalStateException("Method call not finished!");
+ }
+ org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
+ org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport);
+ (new Client(prot)).recv_failRequestV2();
+ }
+ }
+
}
public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor {
@@ -300,6 +363,7 @@ public class DistributedRPCInvocations {
processMap.put("result", new result());
processMap.put("fetchRequest", new fetchRequest());
processMap.put("failRequest", new failRequest());
+ processMap.put("failRequestV2", new failRequestV2());
return processMap;
}
@@ -375,6 +439,30 @@ public class DistributedRPCInvocations {
}
}
+ public static class failRequestV2<I extends Iface> extends org.apache.thrift.ProcessFunction<I, failRequestV2_args> {
+ public failRequestV2() {
+ super("failRequestV2");
+ }
+
+ public failRequestV2_args getEmptyArgsInstance() {
+ return new failRequestV2_args();
+ }
+
+ protected boolean isOneway() {
+ return false;
+ }
+
+ public failRequestV2_result getResult(I iface, failRequestV2_args args) throws org.apache.thrift.TException {
+ failRequestV2_result result = new failRequestV2_result();
+ try {
+ iface.failRequestV2(args.id, args.e);
+ } catch (AuthorizationException aze) {
+ result.aze = aze;
+ }
+ return result;
+ }
+ }
+
}
public static class AsyncProcessor<I extends AsyncIface> extends org.apache.thrift.TBaseAsyncProcessor<I> {
@@ -391,6 +479,7 @@ public class DistributedRPCInvocations {
processMap.put("result", new result());
processMap.put("fetchRequest", new fetchRequest());
processMap.put("failRequest", new failRequest());
+ processMap.put("failRequestV2", new failRequestV2());
return processMap;
}
@@ -563,6 +652,62 @@ public class DistributedRPCInvocations {
}
}
+ public static class failRequestV2<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, failRequestV2_args, Void> {
+ public failRequestV2() {
+ super("failRequestV2");
+ }
+
+ public failRequestV2_args getEmptyArgsInstance() {
+ return new failRequestV2_args();
+ }
+
+ public AsyncMethodCallback<Void> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
+ final org.apache.thrift.AsyncProcessFunction fcall = this;
+ return new AsyncMethodCallback<Void>() {
+ public void onComplete(Void o) {
+ failRequestV2_result result = new failRequestV2_result();
+ try {
+ fcall.sendResponse(fb,result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);
+ return;
+ } catch (Exception e) {
+ LOGGER.error("Exception writing to internal frame buffer", e);
+ }
+ fb.close();
+ }
+ public void onError(Exception e) {
+ byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
+ org.apache.thrift.TBase msg;
+ failRequestV2_result result = new failRequestV2_result();
+ if (e instanceof AuthorizationException) {
+ result.aze = (AuthorizationException) e;
+ result.set_aze_isSet(true);
+ msg = result;
+ }
+ else
+ {
+ msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
+ msg = (org.apache.thrift.TBase)new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage());
+ }
+ try {
+ fcall.sendResponse(fb,msg,msgType,seqid);
+ return;
+ } catch (Exception ex) {
+ LOGGER.error("Exception writing to internal frame buffer", ex);
+ }
+ fb.close();
+ }
+ };
+ }
+
+ protected boolean isOneway() {
+ return false;
+ }
+
+ public void start(I iface, failRequestV2_args args, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws TException {
+ iface.failRequestV2(args.id, args.e,resultHandler);
+ }
+ }
+
}
public static class result_args implements org.apache.thrift.TBase<result_args, result_args._Fields>, java.io.Serializable, Cloneable, Comparable<result_args> {
@@ -2932,4 +3077,828 @@ public class DistributedRPCInvocations {
}
+ public static class failRequestV2_args implements org.apache.thrift.TBase<failRequestV2_args, failRequestV2_args._Fields>, java.io.Serializable, Cloneable, Comparable<failRequestV2_args> {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("failRequestV2_args");
+
+ private static final org.apache.thrift.protocol.TField ID_FIELD_DESC = new org.apache.thrift.protocol.TField("id", org.apache.thrift.protocol.TType.STRING, (short)1);
+ private static final org.apache.thrift.protocol.TField E_FIELD_DESC = new org.apache.thrift.protocol.TField("e", org.apache.thrift.protocol.TType.STRUCT, (short)2);
+
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new failRequestV2_argsStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new failRequestV2_argsTupleSchemeFactory());
+ }
+
+ private String id; // required
+ private DRPCExecutionException e; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ ID((short)1, "id"),
+ E((short)2, "e");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // ID
+ return ID;
+ case 2: // E
+ return E;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.ID, new org.apache.thrift.meta_data.FieldMetaData("id", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.E, new org.apache.thrift.meta_data.FieldMetaData("e", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRUCT)));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(failRequestV2_args.class, metaDataMap);
+ }
+
+ public failRequestV2_args() {
+ }
+
+ public failRequestV2_args(
+ String id,
+ DRPCExecutionException e)
+ {
+ this();
+ this.id = id;
+ this.e = e;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public failRequestV2_args(failRequestV2_args other) {
+ if (other.is_set_id()) {
+ this.id = other.id;
+ }
+ if (other.is_set_e()) {
+ this.e = new DRPCExecutionException(other.e);
+ }
+ }
+
+ public failRequestV2_args deepCopy() {
+ return new failRequestV2_args(this);
+ }
+
+ @Override
+ public void clear() {
+ this.id = null;
+ this.e = null;
+ }
+
+ public String get_id() {
+ return this.id;
+ }
+
+ public void set_id(String id) {
+ this.id = id;
+ }
+
+ public void unset_id() {
+ this.id = null;
+ }
+
+ /** Returns true if field id is set (has been assigned a value) and false otherwise */
+ public boolean is_set_id() {
+ return this.id != null;
+ }
+
+ public void set_id_isSet(boolean value) {
+ if (!value) {
+ this.id = null;
+ }
+ }
+
+ public DRPCExecutionException get_e() {
+ return this.e;
+ }
+
+ public void set_e(DRPCExecutionException e) {
+ this.e = e;
+ }
+
+ public void unset_e() {
+ this.e = null;
+ }
+
+ /** Returns true if field e is set (has been assigned a value) and false otherwise */
+ public boolean is_set_e() {
+ return this.e != null;
+ }
+
+ public void set_e_isSet(boolean value) {
+ if (!value) {
+ this.e = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case ID:
+ if (value == null) {
+ unset_id();
+ } else {
+ set_id((String)value);
+ }
+ break;
+
+ case E:
+ if (value == null) {
+ unset_e();
+ } else {
+ set_e((DRPCExecutionException)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case ID:
+ return get_id();
+
+ case E:
+ return get_e();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case ID:
+ return is_set_id();
+ case E:
+ return is_set_e();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof failRequestV2_args)
+ return this.equals((failRequestV2_args)that);
+ return false;
+ }
+
+ public boolean equals(failRequestV2_args that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_id = true && this.is_set_id();
+ boolean that_present_id = true && that.is_set_id();
+ if (this_present_id || that_present_id) {
+ if (!(this_present_id && that_present_id))
+ return false;
+ if (!this.id.equals(that.id))
+ return false;
+ }
+
+ boolean this_present_e = true && this.is_set_e();
+ boolean that_present_e = true && that.is_set_e();
+ if (this_present_e || that_present_e) {
+ if (!(this_present_e && that_present_e))
+ return false;
+ if (!this.e.equals(that.e))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ List<Object> list = new ArrayList<Object>();
+
+ boolean present_id = true && (is_set_id());
+ list.add(present_id);
+ if (present_id)
+ list.add(id);
+
+ boolean present_e = true && (is_set_e());
+ list.add(present_e);
+ if (present_e)
+ list.add(e);
+
+ return list.hashCode();
+ }
+
+ @Override
+ public int compareTo(failRequestV2_args other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+
+ lastComparison = Boolean.valueOf(is_set_id()).compareTo(other.is_set_id());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_id()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.id, other.id);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(is_set_e()).compareTo(other.is_set_e());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_e()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.e, other.e);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("failRequestV2_args(");
+ boolean first = true;
+
+ sb.append("id:");
+ if (this.id == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.id);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("e:");
+ if (this.e == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.e);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class failRequestV2_argsStandardSchemeFactory implements SchemeFactory {
+ public failRequestV2_argsStandardScheme getScheme() {
+ return new failRequestV2_argsStandardScheme();
+ }
+ }
+
+ private static class failRequestV2_argsStandardScheme extends StandardScheme<failRequestV2_args> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, failRequestV2_args struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 1: // ID
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.id = iprot.readString();
+ struct.set_id_isSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 2: // E
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
+ struct.e = new DRPCExecutionException();
+ struct.e.read(iprot);
+ struct.set_e_isSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, failRequestV2_args struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.id != null) {
+ oprot.writeFieldBegin(ID_FIELD_DESC);
+ oprot.writeString(struct.id);
+ oprot.writeFieldEnd();
+ }
+ if (struct.e != null) {
+ oprot.writeFieldBegin(E_FIELD_DESC);
+ struct.e.write(oprot);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class failRequestV2_argsTupleSchemeFactory implements SchemeFactory {
+ public failRequestV2_argsTupleScheme getScheme() {
+ return new failRequestV2_argsTupleScheme();
+ }
+ }
+
+ private static class failRequestV2_argsTupleScheme extends TupleScheme<failRequestV2_args> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, failRequestV2_args struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ BitSet optionals = new BitSet();
+ if (struct.is_set_id()) {
+ optionals.set(0);
+ }
+ if (struct.is_set_e()) {
+ optionals.set(1);
+ }
+ oprot.writeBitSet(optionals, 2);
+ if (struct.is_set_id()) {
+ oprot.writeString(struct.id);
+ }
+ if (struct.is_set_e()) {
+ struct.e.write(oprot);
+ }
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, failRequestV2_args struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ BitSet incoming = iprot.readBitSet(2);
+ if (incoming.get(0)) {
+ struct.id = iprot.readString();
+ struct.set_id_isSet(true);
+ }
+ if (incoming.get(1)) {
+ struct.e = new DRPCExecutionException();
+ struct.e.read(iprot);
+ struct.set_e_isSet(true);
+ }
+ }
+ }
+
+ }
+
+ public static class failRequestV2_result implements org.apache.thrift.TBase<failRequestV2_result, failRequestV2_result._Fields>, java.io.Serializable, Cloneable, Comparable<failRequestV2_result> {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("failRequestV2_result");
+
+ private static final org.apache.thrift.protocol.TField AZE_FIELD_DESC = new org.apache.thrift.protocol.TField("aze", org.apache.thrift.protocol.TType.STRUCT, (short)1);
+
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new failRequestV2_resultStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new failRequestV2_resultTupleSchemeFactory());
+ }
+
+ private AuthorizationException aze; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ AZE((short)1, "aze");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // AZE
+ return AZE;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.AZE, new org.apache.thrift.meta_data.FieldMetaData("aze", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRUCT)));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(failRequestV2_result.class, metaDataMap);
+ }
+
+ public failRequestV2_result() {
+ }
+
+ public failRequestV2_result(
+ AuthorizationException aze)
+ {
+ this();
+ this.aze = aze;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public failRequestV2_result(failRequestV2_result other) {
+ if (other.is_set_aze()) {
+ this.aze = new AuthorizationException(other.aze);
+ }
+ }
+
+ public failRequestV2_result deepCopy() {
+ return new failRequestV2_result(this);
+ }
+
+ @Override
+ public void clear() {
+ this.aze = null;
+ }
+
+ public AuthorizationException get_aze() {
+ return this.aze;
+ }
+
+ public void set_aze(AuthorizationException aze) {
+ this.aze = aze;
+ }
+
+ public void unset_aze() {
+ this.aze = null;
+ }
+
+ /** Returns true if field aze is set (has been assigned a value) and false otherwise */
+ public boolean is_set_aze() {
+ return this.aze != null;
+ }
+
+ public void set_aze_isSet(boolean value) {
+ if (!value) {
+ this.aze = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case AZE:
+ if (value == null) {
+ unset_aze();
+ } else {
+ set_aze((AuthorizationException)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case AZE:
+ return get_aze();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case AZE:
+ return is_set_aze();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof failRequestV2_result)
+ return this.equals((failRequestV2_result)that);
+ return false;
+ }
+
+ public boolean equals(failRequestV2_result that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_aze = true && this.is_set_aze();
+ boolean that_present_aze = true && that.is_set_aze();
+ if (this_present_aze || that_present_aze) {
+ if (!(this_present_aze && that_present_aze))
+ return false;
+ if (!this.aze.equals(that.aze))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ List<Object> list = new ArrayList<Object>();
+
+ boolean present_aze = true && (is_set_aze());
+ list.add(present_aze);
+ if (present_aze)
+ list.add(aze);
+
+ return list.hashCode();
+ }
+
+ @Override
+ public int compareTo(failRequestV2_result other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+
+ lastComparison = Boolean.valueOf(is_set_aze()).compareTo(other.is_set_aze());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_aze()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.aze, other.aze);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("failRequestV2_result(");
+ boolean first = true;
+
+ sb.append("aze:");
+ if (this.aze == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.aze);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class failRequestV2_resultStandardSchemeFactory implements SchemeFactory {
+ public failRequestV2_resultStandardScheme getScheme() {
+ return new failRequestV2_resultStandardScheme();
+ }
+ }
+
+ private static class failRequestV2_resultStandardScheme extends StandardScheme<failRequestV2_result> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, failRequestV2_result struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 1: // AZE
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
+ struct.aze = new AuthorizationException();
+ struct.aze.read(iprot);
+ struct.set_aze_isSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, failRequestV2_result struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.aze != null) {
+ oprot.writeFieldBegin(AZE_FIELD_DESC);
+ struct.aze.write(oprot);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class failRequestV2_resultTupleSchemeFactory implements SchemeFactory {
+ public failRequestV2_resultTupleScheme getScheme() {
+ return new failRequestV2_resultTupleScheme();
+ }
+ }
+
+ private static class failRequestV2_resultTupleScheme extends TupleScheme<failRequestV2_result> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, failRequestV2_result struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ BitSet optionals = new BitSet();
+ if (struct.is_set_aze()) {
+ optionals.set(0);
+ }
+ oprot.writeBitSet(optionals, 1);
+ if (struct.is_set_aze()) {
+ struct.aze.write(oprot);
+ }
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, failRequestV2_result struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ BitSet incoming = iprot.readBitSet(1);
+ if (incoming.get(0)) {
+ struct.aze = new AuthorizationException();
+ struct.aze.read(iprot);
+ struct.set_aze_isSet(true);
+ }
+ }
+ }
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/security/auth/AuthUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/security/auth/AuthUtils.java b/storm-core/src/jvm/org/apache/storm/security/auth/AuthUtils.java
index 353f679..e38a575 100644
--- a/storm-core/src/jvm/org/apache/storm/security/auth/AuthUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/security/auth/AuthUtils.java
@@ -30,6 +30,7 @@ import java.security.URIParameter;
import java.security.MessageDigest;
import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang.StringUtils;
import org.apache.storm.security.INimbusCredentialPlugin;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
@@ -178,7 +179,7 @@ public class AuthUtils {
* @param conf daemon configuration
* @return the plugin
*/
- public static IGroupMappingServiceProvider GetGroupMappingServiceProviderPlugin(Map conf) {
+ public static IGroupMappingServiceProvider GetGroupMappingServiceProviderPlugin(Map<String, Object> conf) {
IGroupMappingServiceProvider gmsp = null;
try {
String gmsp_klassName = (String) conf.get(Config.STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN);
@@ -318,11 +319,14 @@ public class AuthUtils {
}
}
- private static IHttpCredentialsPlugin GetHttpCredentialsPlugin(Map conf,
+ public static IHttpCredentialsPlugin GetHttpCredentialsPlugin(Map<String, Object> conf,
String klassName) {
try {
- IHttpCredentialsPlugin plugin = Utils.newInstance(klassName);
- plugin.prepare(conf);
+ IHttpCredentialsPlugin plugin = null;
+ if (StringUtils.isNotBlank(klassName)) {
+ plugin = Utils.newInstance(klassName);
+ plugin.prepare(conf);
+ }
return plugin;
} catch (Exception e) {
throw new RuntimeException(e);
@@ -348,7 +352,7 @@ public class AuthUtils {
*/
public static IHttpCredentialsPlugin GetDrpcHttpCredentialsPlugin(Map conf) {
String klassName = (String)conf.get(Config.DRPC_HTTP_CREDS_PLUGIN);
- return AuthUtils.GetHttpCredentialsPlugin(conf, klassName);
+ return klassName == null ? null : AuthUtils.GetHttpCredentialsPlugin(conf, klassName);
}
private static final String USERNAME = "username";
http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/security/auth/ReqContext.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/security/auth/ReqContext.java b/storm-core/src/jvm/org/apache/storm/security/auth/ReqContext.java
index abc83b4..6d92fd9 100644
--- a/storm-core/src/jvm/org/apache/storm/security/auth/ReqContext.java
+++ b/storm-core/src/jvm/org/apache/storm/security/auth/ReqContext.java
@@ -17,17 +17,17 @@
*/
package org.apache.storm.security.auth;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
import java.net.InetAddress;
-import com.google.common.annotations.VisibleForTesting;
-
import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.Principal;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
import javax.security.auth.Subject;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* context request context includes info about:
*
@@ -40,7 +40,6 @@ public class ReqContext {
private Subject _subject;
private InetAddress _remoteAddr;
private Integer _reqID;
- private Map _storm_conf;
private Principal realPrincipal;
@Override
@@ -53,7 +52,7 @@ public class ReqContext {
", ThreadId=" + Thread.currentThread().toString() +
'}';
}
-
+
/**
* @return a request context associated with current thread
*/
@@ -91,7 +90,6 @@ public class ReqContext {
_reqID = uniqueId.incrementAndGet();
}
-
/**
* client address
*/
http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java b/storm-core/src/jvm/org/apache/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java
index bcff708..8aa1fd3 100644
--- a/storm-core/src/jvm/org/apache/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java
+++ b/storm-core/src/jvm/org/apache/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java
@@ -35,6 +35,8 @@ import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+
public class DRPCSimpleACLAuthorizer extends DRPCAuthorizerBase {
public static final Logger LOG =
LoggerFactory.getLogger(DRPCSimpleACLAuthorizer.class);
@@ -47,7 +49,8 @@ public class DRPCSimpleACLAuthorizer extends DRPCAuthorizerBase {
protected IPrincipalToLocal _ptol;
protected boolean _permitWhenMissingFunctionEntry = false;
- protected static class AclFunctionEntry {
+ @VisibleForTesting
+ public static class AclFunctionEntry {
final public Set<String> clientUsers;
final public String invocationUser;
public AclFunctionEntry(Collection<String> clientUsers,
http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/ui/UIHelpers.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/ui/UIHelpers.java b/storm-core/src/jvm/org/apache/storm/ui/UIHelpers.java
index e046061..e97d52d 100644
--- a/storm-core/src/jvm/org/apache/storm/ui/UIHelpers.java
+++ b/storm-core/src/jvm/org/apache/storm/ui/UIHelpers.java
@@ -19,8 +19,13 @@ package org.apache.storm.ui;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
+
+import org.apache.storm.Config;
+import org.apache.storm.daemon.drpc.webapp.ReqContextFilter;
import org.apache.storm.generated.ExecutorInfo;
import org.apache.storm.logging.filters.AccessLoggingFilter;
+import org.apache.storm.security.auth.AuthUtils;
+import org.apache.storm.security.auth.IHttpCredentialsPlugin;
import org.apache.storm.utils.Utils;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.DispatcherType;
@@ -40,6 +45,7 @@ import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.URLEncoder;
import java.util.*;
+import java.util.Map.Entry;
public class UIHelpers {
@@ -160,34 +166,52 @@ public class UIHelpers {
}
public static void configFilter(Server server, Servlet servlet, List<FilterConfiguration> filtersConfs) {
+ configFilter(server, servlet, filtersConfs, null);
+ }
+
+ public static void configFilter(Server server, Servlet servlet, List<FilterConfiguration> filtersConfs, Map<String, String> params) {
if (filtersConfs != null) {
ServletHolder servletHolder = new ServletHolder(servlet);
+ servletHolder.setInitOrder(0);
+ if (params != null) {
+ servletHolder.setInitParameters(params);
+ }
ServletContextHandler context = new ServletContextHandler(server, "/");
context.addServlet(servletHolder, "/");
- context.addFilter(corsFilterHandle(), "/*", EnumSet.allOf(DispatcherType.class));
- for (FilterConfiguration filterConf : filtersConfs) {
- String filterName = filterConf.getFilterName();
- String filterClass = filterConf.getFilterClass();
- Map filterParams = filterConf.getFilterParams();
- if (filterClass != null) {
- FilterHolder filterHolder = new FilterHolder();
- filterHolder.setClassName(filterClass);
- if (filterName != null) {
- filterHolder.setName(filterName);
- } else {
- filterHolder.setName(filterClass);
- }
- if (filterParams != null) {
- filterHolder.setInitParameters(filterParams);
- } else {
- filterHolder.setInitParameters(new HashMap<String, String>());
- }
- context.addFilter(filterHolder, "/*", FilterMapping.ALL);
+ configFilters(context, filtersConfs);
+ server.setHandler(context);
+ }
+ }
+
+ public static void configFilters(ServletContextHandler context, List<FilterConfiguration> filtersConfs) {
+ context.addFilter(corsFilterHandle(), "/*", EnumSet.allOf(DispatcherType.class));
+ for (FilterConfiguration filterConf : filtersConfs) {
+ String filterName = filterConf.getFilterName();
+ String filterClass = filterConf.getFilterClass();
+ Map filterParams = filterConf.getFilterParams();
+ if (filterClass != null) {
+ FilterHolder filterHolder = new FilterHolder();
+ filterHolder.setClassName(filterClass);
+ if (filterName != null) {
+ filterHolder.setName(filterName);
+ } else {
+ filterHolder.setName(filterClass);
+ }
+ if (filterParams != null) {
+ filterHolder.setInitParameters(filterParams);
+ } else {
+ filterHolder.setInitParameters(new HashMap<String, String>());
}
+ context.addFilter(filterHolder, "/*", FilterMapping.ALL);
}
- context.addFilter(mkAccessLoggingFilterHandle(), "/*", EnumSet.allOf(DispatcherType.class));
- server.setHandler(context);
}
+ context.addFilter(mkAccessLoggingFilterHandle(), "/*", EnumSet.allOf(DispatcherType.class));
+ }
+
+ public static void addRequestContextFilter(ServletContextHandler context, String configName, Map<String, Object> conf) {
+ IHttpCredentialsPlugin auth = AuthUtils.GetHttpCredentialsPlugin(conf, (String)conf.get(configName));
+ ReqContextFilter filter = new ReqContextFilter(auth);
+ context.addFilter(new FilterHolder(filter), "/*", FilterMapping.ALL);
}
private static Server removeNonSslConnector(Server server) {
@@ -202,7 +226,7 @@ public class UIHelpers {
/**
* Construct a Jetty Server instance.
*/
- private static Server jettyCreateServer(Integer port, String host, Integer httpsPort) {
+ public static Server jettyCreateServer(Integer port, String host, Integer httpsPort) {
SelectChannelConnector connector = new SelectChannelConnector();
connector.setPort(Utils.getInt(port, 80));
connector.setHost(host);
http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index a9856c4..cabfcc6 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -1950,6 +1950,7 @@ public class Utils {
public void run() {
try {
Time.sleepSecs(1);
+ LOG.warn("Forceing Halt...");
Runtime.getRuntime().halt(20);
} catch (Exception e) {
LOG.warn("Exception in the ShutDownHook", e);
http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/py/storm/DistributedRPCInvocations-remote
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/DistributedRPCInvocations-remote b/storm-core/src/py/storm/DistributedRPCInvocations-remote
index 01435b6..d1f100e 100644
--- a/storm-core/src/py/storm/DistributedRPCInvocations-remote
+++ b/storm-core/src/py/storm/DistributedRPCInvocations-remote
@@ -45,6 +45,7 @@ if len(sys.argv) <= 1 or sys.argv[1] == '--help':
print(' void result(string id, string result)')
print(' DRPCRequest fetchRequest(string functionName)')
print(' void failRequest(string id)')
+ print(' void failRequestV2(string id, DRPCExecutionException e)')
print('')
sys.exit(0)
@@ -119,6 +120,12 @@ elif cmd == 'failRequest':
sys.exit(1)
pp.pprint(client.failRequest(args[0],))
+elif cmd == 'failRequestV2':
+ if len(args) != 2:
+ print('failRequestV2 requires 2 args')
+ sys.exit(1)
+ pp.pprint(client.failRequestV2(args[0],eval(args[1]),))
+
else:
print('Unrecognized method %s' % cmd)
sys.exit(1)
http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/py/storm/DistributedRPCInvocations.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/DistributedRPCInvocations.py b/storm-core/src/py/storm/DistributedRPCInvocations.py
index 207fa9d..df873fe 100644
--- a/storm-core/src/py/storm/DistributedRPCInvocations.py
+++ b/storm-core/src/py/storm/DistributedRPCInvocations.py
@@ -59,6 +59,14 @@ class Iface:
"""
pass
+ def failRequestV2(self, id, e):
+ """
+ Parameters:
+ - id
+ - e
+ """
+ pass
+
class Client(Iface):
def __init__(self, iprot, oprot=None):
@@ -164,6 +172,39 @@ class Client(Iface):
raise result.aze
return
+ def failRequestV2(self, id, e):
+ """
+ Parameters:
+ - id
+ - e
+ """
+ self.send_failRequestV2(id, e)
+ self.recv_failRequestV2()
+
+ def send_failRequestV2(self, id, e):
+ self._oprot.writeMessageBegin('failRequestV2', TMessageType.CALL, self._seqid)
+ args = failRequestV2_args()
+ args.id = id
+ args.e = e
+ args.write(self._oprot)
+ self._oprot.writeMessageEnd()
+ self._oprot.trans.flush()
+
+ def recv_failRequestV2(self):
+ iprot = self._iprot
+ (fname, mtype, rseqid) = iprot.readMessageBegin()
+ if mtype == TMessageType.EXCEPTION:
+ x = TApplicationException()
+ x.read(iprot)
+ iprot.readMessageEnd()
+ raise x
+ result = failRequestV2_result()
+ result.read(iprot)
+ iprot.readMessageEnd()
+ if result.aze is not None:
+ raise result.aze
+ return
+
class Processor(Iface, TProcessor):
def __init__(self, handler):
@@ -172,6 +213,7 @@ class Processor(Iface, TProcessor):
self._processMap["result"] = Processor.process_result
self._processMap["fetchRequest"] = Processor.process_fetchRequest
self._processMap["failRequest"] = Processor.process_failRequest
+ self._processMap["failRequestV2"] = Processor.process_failRequestV2
def process(self, iprot, oprot):
(name, type, seqid) = iprot.readMessageBegin()
@@ -254,6 +296,28 @@ class Processor(Iface, TProcessor):
oprot.writeMessageEnd()
oprot.trans.flush()
+ def process_failRequestV2(self, seqid, iprot, oprot):
+ args = failRequestV2_args()
+ args.read(iprot)
+ iprot.readMessageEnd()
+ result = failRequestV2_result()
+ try:
+ self._handler.failRequestV2(args.id, args.e)
+ msg_type = TMessageType.REPLY
+ except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+ raise
+ except AuthorizationException as aze:
+ msg_type = TMessageType.REPLY
+ result.aze = aze
+ except Exception as ex:
+ msg_type = TMessageType.EXCEPTION
+ logging.exception(ex)
+ result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+ oprot.writeMessageBegin("failRequestV2", msg_type, seqid)
+ result.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
+
# HELPER FUNCTIONS AND STRUCTURES
@@ -675,3 +739,148 @@ class failRequest_result:
def __ne__(self, other):
return not (self == other)
+
+class failRequestV2_args:
+ """
+ Attributes:
+ - id
+ - e
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.STRING, 'id', None, None, ), # 1
+ (2, TType.STRUCT, 'e', (DRPCExecutionException, DRPCExecutionException.thrift_spec), None, ), # 2
+ )
+
+ def __init__(self, id=None, e=None,):
+ self.id = id
+ self.e = e
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.STRING:
+ self.id = iprot.readString().decode('utf-8')
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.STRUCT:
+ self.e = DRPCExecutionException()
+ self.e.read(iprot)
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('failRequestV2_args')
+ if self.id is not None:
+ oprot.writeFieldBegin('id', TType.STRING, 1)
+ oprot.writeString(self.id.encode('utf-8'))
+ oprot.writeFieldEnd()
+ if self.e is not None:
+ oprot.writeFieldBegin('e', TType.STRUCT, 2)
+ self.e.write(oprot)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ return
+
+
+ def __hash__(self):
+ value = 17
+ value = (value * 31) ^ hash(self.id)
+ value = (value * 31) ^ hash(self.e)
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class failRequestV2_result:
+ """
+ Attributes:
+ - aze
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.STRUCT, 'aze', (AuthorizationException, AuthorizationException.thrift_spec), None, ), # 1
+ )
+
+ def __init__(self, aze=None,):
+ self.aze = aze
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.STRUCT:
+ self.aze = AuthorizationException()
+ self.aze.read(iprot)
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('failRequestV2_result')
+ if self.aze is not None:
+ oprot.writeFieldBegin('aze', TType.STRUCT, 1)
+ self.aze.write(oprot)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ return
+
+
+ def __hash__(self):
+ value = 17
+ value = (value * 31) ^ hash(self.aze)
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/py/storm/ttypes.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/ttypes.py b/storm-core/src/py/storm/ttypes.py
index 4fac146..e9085e6 100644
--- a/storm-core/src/py/storm/ttypes.py
+++ b/storm-core/src/py/storm/ttypes.py
@@ -156,6 +156,26 @@ class LogLevelAction:
"REMOVE": 3,
}
+class DRPCExceptionType:
+ INTERNAL_ERROR = 0
+ SERVER_SHUTDOWN = 1
+ SERVER_TIMEOUT = 2
+ FAILED_REQUEST = 3
+
+ _VALUES_TO_NAMES = {
+ 0: "INTERNAL_ERROR",
+ 1: "SERVER_SHUTDOWN",
+ 2: "SERVER_TIMEOUT",
+ 3: "FAILED_REQUEST",
+ }
+
+ _NAMES_TO_VALUES = {
+ "INTERNAL_ERROR": 0,
+ "SERVER_SHUTDOWN": 1,
+ "SERVER_TIMEOUT": 2,
+ "FAILED_REQUEST": 3,
+ }
+
class HBServerMessageType:
CREATE_PATH = 0
CREATE_PATH_RESPONSE = 1
@@ -11002,15 +11022,18 @@ class DRPCExecutionException(TException):
"""
Attributes:
- msg
+ - type
"""
thrift_spec = (
None, # 0
(1, TType.STRING, 'msg', None, None, ), # 1
+ (2, TType.I32, 'type', None, None, ), # 2
)
- def __init__(self, msg=None,):
+ def __init__(self, msg=None, type=None,):
self.msg = msg
+ self.type = type
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -11026,6 +11049,11 @@ class DRPCExecutionException(TException):
self.msg = iprot.readString().decode('utf-8')
else:
iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.I32:
+ self.type = iprot.readI32()
+ else:
+ iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
@@ -11040,6 +11068,10 @@ class DRPCExecutionException(TException):
oprot.writeFieldBegin('msg', TType.STRING, 1)
oprot.writeString(self.msg.encode('utf-8'))
oprot.writeFieldEnd()
+ if self.type is not None:
+ oprot.writeFieldBegin('type', TType.I32, 2)
+ oprot.writeI32(self.type)
+ oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
@@ -11055,6 +11087,7 @@ class DRPCExecutionException(TException):
def __hash__(self):
value = 17
value = (value * 31) ^ hash(self.msg)
+ value = (value * 31) ^ hash(self.type)
return value
def __repr__(self):
http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/storm.thrift
----------------------------------------------------------------------
diff --git a/storm-core/src/storm.thrift b/storm-core/src/storm.thrift
index fc018ee..8b27df7 100644
--- a/storm-core/src/storm.thrift
+++ b/storm-core/src/storm.thrift
@@ -690,8 +690,16 @@ struct DRPCRequest {
2: required string request_id;
}
+enum DRPCExceptionType {
+ INTERNAL_ERROR,
+ SERVER_SHUTDOWN,
+ SERVER_TIMEOUT,
+ FAILED_REQUEST
+}
+
exception DRPCExecutionException {
1: required string msg;
+ 2: optional DRPCExceptionType type;
}
service DistributedRPC {
@@ -702,6 +710,7 @@ service DistributedRPCInvocations {
void result(1: string id, 2: string result) throws (1: AuthorizationException aze);
DRPCRequest fetchRequest(1: string functionName) throws (1: AuthorizationException aze);
void failRequest(1: string id) throws (1: AuthorizationException aze);
+ void failRequestV2(1: string id, 2: DRPCExecutionException e) throws (1: AuthorizationException aze);
}
enum HBServerMessageType {
http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/test/clj/org/apache/storm/drpc_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/drpc_test.clj b/storm-core/test/clj/org/apache/storm/drpc_test.clj
index 7974e49..f485e6f 100644
--- a/storm-core/test/clj/org/apache/storm/drpc_test.clj
+++ b/storm-core/test/clj/org/apache/storm/drpc_test.clj
@@ -29,11 +29,9 @@
(:import [org.apache.storm.generated DRPCExecutionException DRPCRequest])
(:import [java.util.concurrent ConcurrentLinkedQueue])
(:import [org.apache.storm Thrift])
- (:import [org.apache.storm.daemon DrpcServer])
(:import [org.mockito ArgumentCaptor Mockito Matchers])
(:use [org.apache.storm config])
(:use [org.apache.storm.internal clojure])
- (:use [org.apache.storm.daemon drpc])
(:use [conjure core]))
(defbolt exclamation-bolt ["result" "return-info"] [tuple collector]
@@ -230,32 +228,6 @@
(.shutdown drpc)
))
-(deftest test-dequeue-req-after-timeout
- (let [queue (ConcurrentLinkedQueue.)
- delay-seconds 2
- conf {DRPC-REQUEST-TIMEOUT-SECS delay-seconds}
- mock-cu (proxy [ConfigUtils] []
- (readStormConfigImpl [] conf))
- drpc-handler (proxy [DrpcServer] [conf]
- (acquireQueue [function] queue))]
- (with-open [_ (ConfigUtilsInstaller. mock-cu)]
- (is (thrown? DRPCExecutionException
- (.execute drpc-handler "ArbitraryDRPCFunctionName" "")))
- (is (= 0 (.size queue))))))
-
-(deftest test-drpc-timeout-cleanup
- (let [queue (ConcurrentLinkedQueue.)
- delay-seconds 1
- conf {DRPC-REQUEST-TIMEOUT-SECS delay-seconds}
- mock-cu (proxy [ConfigUtils] []
- (readStormConfigImpl [] conf))
- drpc-handler (proxy [DrpcServer] [conf]
- (acquireQueue [function] queue)
- (getTimeoutCheckSecs [] delay-seconds))]
- (with-open [_ (ConfigUtilsInstaller. mock-cu)]
- (is (thrown? DRPCExecutionException
- (.execute drpc-handler "ArbitraryDRPCFunctionName" "no-args"))))))
-
(deftest test-drpc-attempts-two-reconnects-in-fail-request
(let [handler (Mockito/mock DRPCInvocationsClient
(.extraInterfaces (Mockito/withSettings) (into-array Class [ILocalDRPC])))
http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/test/clj/org/apache/storm/security/auth/drpc_auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/security/auth/drpc_auth_test.clj b/storm-core/test/clj/org/apache/storm/security/auth/drpc_auth_test.clj
deleted file mode 100644
index 0c2ee2e..0000000
--- a/storm-core/test/clj/org/apache/storm/security/auth/drpc_auth_test.clj
+++ /dev/null
@@ -1,320 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.security.auth.drpc-auth-test
- (:use [clojure test])
- (:require [org.apache.storm.daemon [drpc :as drpc]])
- (:import [org.apache.storm.generated AuthorizationException
- DRPCExecutionException DistributedRPC$Processor
- DistributedRPCInvocations$Processor]
- [org.apache.storm.daemon DrpcServer])
- (:import [org.apache.storm Config])
- (:import [org.apache.storm Testing Testing$Condition])
- (:import [org.apache.storm.security.auth ReqContext SingleUserPrincipal ThriftServer ThriftConnectionType])
- (:import [org.apache.storm.utils DRPCClient ConfigUtils Time])
- (:import [org.apache.storm.drpc DRPCInvocationsClient])
- (:import [java.util.concurrent TimeUnit])
- (:import [javax.security.auth Subject])
- (:use [org.apache.storm util config log])
- (:import [org.apache.storm.utils Utils]))
-
-(defmacro with-timeout
- [millis unit & body]
- `(let [f# (future ~@body)]
- (try
- (.get f# ~millis ~unit)
- (finally (future-cancel f#)))))
-
-(def DRPC-TIMEOUT-SEC (* (/ Testing/TEST_TIMEOUT_MS 1000) 2))
-
-(defn launch-server [conf drpcAznClass transportPluginClass login-cfg client-port invocations-port]
- (let [conf (if drpcAznClass (assoc conf DRPC-AUTHORIZER drpcAznClass) conf)
- conf (if transportPluginClass (assoc conf STORM-THRIFT-TRANSPORT-PLUGIN transportPluginClass) conf)
- conf (if login-cfg (assoc conf "java.security.auth.login.config" login-cfg) conf)
- conf (assoc conf DRPC-PORT client-port)
- conf (assoc conf DRPC-INVOCATIONS-PORT invocations-port)
- service-handler (DrpcServer. conf)
- handler-server (ThriftServer. conf
- (DistributedRPC$Processor. service-handler)
- ThriftConnectionType/DRPC)
- invoke-server (ThriftServer. conf
- (DistributedRPCInvocations$Processor. service-handler)
- ThriftConnectionType/DRPC_INVOCATIONS)]
- (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.stop handler-server) (.stop invoke-server))))
- (log-message "storm conf:" conf)
- (log-message "Starting DRPC invocation server ... " invocations-port)
- (.start (Thread. #(.serve invoke-server)))
- (Testing/whileTimeout (reify Testing$Condition (exec [this] (not (.isServing invoke-server)))) (fn [] (Time/sleep 100)))
- (log-message "Starting DRPC handler server ... " client-port)
- (.start (Thread. #(.serve handler-server)))
- (Testing/whileTimeout (reify Testing$Condition (exec [this] (not (.isServing handler-server)))) (fn [] (Time/sleep 100)))
- [handler-server invoke-server]))
-
-(defmacro with-server [args & body]
- `(let [[handler-server# invoke-server#] (launch-server ~@args)]
- ~@body
- (log-message "Stopping DRPC servers ...")
- (.stop handler-server#)
- (.stop invoke-server#)
- ))
-
-(deftest deny-drpc-test
- (let [client-port (Utils/getAvailablePort)
- invocations-port (Utils/getAvailablePort (int(inc client-port)))
- storm-conf (clojurify-structure (ConfigUtils/readStormConfig))]
- (with-server [storm-conf "org.apache.storm.security.auth.authorizer.DenyAuthorizer"
- nil nil client-port invocations-port]
- (let [drpc (DRPCClient. storm-conf "localhost" client-port)
- drpc_client (.getClient drpc)
- invocations (DRPCInvocationsClient. storm-conf "localhost" invocations-port)
- invocations_client (.getClient invocations)]
- (is (thrown? AuthorizationException (.execute drpc_client "func-foo" "args-bar")))
- (is (thrown? AuthorizationException (.fetchRequest invocations_client nil)))
- (.close drpc)
- (.close invocations)))))
-
-(deftest deny-drpc-digest-test
- (let [client-port (Utils/getAvailablePort)
- invocations-port (Utils/getAvailablePort (int (inc client-port)))
- storm-conf (clojurify-structure (ConfigUtils/readStormConfig))]
- (with-server [storm-conf "org.apache.storm.security.auth.authorizer.DenyAuthorizer"
- "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
- "test/clj/org/apache/storm/security/auth/jaas_digest.conf"
- client-port invocations-port]
- (let [conf (merge storm-conf {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
- "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest.conf"})
- drpc (DRPCClient. conf "localhost" client-port)
- drpc_client (.getClient drpc)
- invocations (DRPCInvocationsClient. conf "localhost" invocations-port)
- invocations_client (.getClient invocations)]
- (is (thrown? AuthorizationException (.execute drpc_client "func-foo" "args-bar")))
- (is (thrown? AuthorizationException (.fetchRequest invocations_client nil)))
- (.close drpc)
- (.close invocations)))))
-
-(defmacro with-simple-drpc-test-scenario
- [[strict? alice-client bob-client charlie-client alice-invok charlie-invok] & body]
- `(let [client-port# (Utils/getAvailablePort)
- invocations-port# (Utils/getAvailablePort (int (inc client-port#)))
- storm-conf# (merge (clojurify-structure (ConfigUtils/readStormConfig))
- {DRPC-AUTHORIZER-ACL-STRICT ~strict?
- DRPC-AUTHORIZER-ACL-FILENAME "drpc-simple-acl-test-scenario.yaml"
- STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"})]
- (with-server [storm-conf#
- "org.apache.storm.security.auth.authorizer.DRPCSimpleACLAuthorizer"
- "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
- "test/clj/org/apache/storm/security/auth/drpc-auth-server.jaas"
- client-port# invocations-port#]
- (let [~alice-client (DRPCClient.
- (merge storm-conf# {"java.security.auth.login.config"
- "test/clj/org/apache/storm/security/auth/drpc-auth-alice.jaas"})
- "localhost"
- client-port#)
- ~bob-client (DRPCClient.
- (merge storm-conf# {"java.security.auth.login.config"
- "test/clj/org/apache/storm/security/auth/drpc-auth-bob.jaas"})
- "localhost"
- client-port#)
- ~charlie-client (DRPCClient.
- (merge storm-conf# {"java.security.auth.login.config"
- "test/clj/org/apache/storm/security/auth/drpc-auth-charlie.jaas"})
- "localhost"
- client-port#)
- ~alice-invok (DRPCInvocationsClient.
- (merge storm-conf# {"java.security.auth.login.config"
- "test/clj/org/apache/storm/security/auth/drpc-auth-alice.jaas"})
- "localhost"
- invocations-port#)
- ~charlie-invok (DRPCInvocationsClient.
- (merge storm-conf# {"java.security.auth.login.config"
- "test/clj/org/apache/storm/security/auth/drpc-auth-charlie.jaas"})
- "localhost"
- invocations-port#)]
- (try
- ~@body
- (finally
- (.close ~alice-client)
- (.close ~bob-client)
- (.close ~charlie-client)
- (.close ~alice-invok)
- (.close ~charlie-invok)))))))
-
-(deftest drpc-per-function-auth-strict-test
- (with-simple-drpc-test-scenario [true alice-client bob-client charlie-client alice-invok charlie-invok]
- (let [drpc-timeout-seconds DRPC-TIMEOUT-SEC]
- (testing "Permitted user can execute a function in the ACL"
- (let [func "jump"
- exec-ftr (future (.execute alice-client func "some args"))
- id (atom "")
- expected "Authorized DRPC"]
- (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
- (while (empty? @id)
- (reset! id
- (-> charlie-invok (.fetchRequest func) .get_request_id)))
- (.result charlie-invok @id expected)
- (is (= expected (.get exec-ftr drpc-timeout-seconds TimeUnit/SECONDS))))))
-
- (testing "execute fails when function is not in ACL"
- (is (thrown-cause? AuthorizationException
- (.execute alice-client "jog" "some args"))))
-
- (testing "fetchRequest fails when function is not in ACL"
- (is (thrown-cause? AuthorizationException
- (.fetchRequest charlie-invok "jog"))))
-
- (testing "authorized user can fail a request"
- (let [func "jump"
- exec-ftr (future (.execute alice-client func "some args"))
- id (atom "")]
- (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
- (while (empty? @id)
- (reset! id
- (-> charlie-invok (.fetchRequest func) .get_request_id)))
- (.failRequest charlie-invok @id)
- (is (thrown-cause? DRPCExecutionException
- (.get exec-ftr drpc-timeout-seconds TimeUnit/SECONDS))))))
-
- (testing "unauthorized invocation user is denied returning a result"
- (let [func "jump"
- exec-ftr (future (.execute bob-client func "some args"))
- id (atom "")
- expected "Only Authorized User can populate the result"]
- (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
- (while (empty? @id)
- (reset! id
- (-> charlie-invok (.fetchRequest func) .get_request_id)))
- (is (thrown-cause? AuthorizationException
- (.result alice-invok @id "not the expected result")))
- (.result charlie-invok @id expected)
- (is (= expected (.get exec-ftr drpc-timeout-seconds TimeUnit/SECONDS))))))
-
- (testing "unauthorized invocation user is denied failing a request"
- (let [func "jump"
- exec-ftr (future (.execute alice-client func "some args"))
- id (atom "")]
- (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
- (while (empty? @id)
- (reset! id
- (-> charlie-invok (.fetchRequest func) .get_request_id)))
- (is (thrown-cause? AuthorizationException
- (.failRequest alice-invok @id)))
- (.failRequest charlie-invok @id))))
-
- (testing "unauthorized invocation user is denied fetching a request"
- (let [func "jump"
- exec-ftr (future (.execute bob-client func "some args"))
- id (atom "")
- expected "Only authorized users can fetchRequest"]
- (Thread/sleep 1000)
- (is (thrown-cause? AuthorizationException
- (-> alice-invok (.fetchRequest func) .get_request_id)))
- (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
- (while (empty? @id)
- (reset! id
- (-> charlie-invok (.fetchRequest func) .get_request_id)))
- (.result charlie-invok @id expected)
- (is (= expected (.get exec-ftr drpc-timeout-seconds TimeUnit/SECONDS)))))))))
-
-(deftest drpc-per-function-auth-non-strict-test
- (with-simple-drpc-test-scenario [false alice-client bob-client charlie-client alice-invok charlie-invok]
- (let [drpc-timeout-seconds DRPC-TIMEOUT-SEC]
- (testing "Permitted user can execute a function in the ACL"
- (let [func "jump"
- exec-ftr (future (.execute alice-client func "some args"))
- id (atom "")
- expected "Authorized DRPC"]
- (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
- (while (empty? @id)
- (reset! id
- (-> charlie-invok (.fetchRequest func) .get_request_id)))
- (.result charlie-invok @id expected)
- (is (= expected (.get exec-ftr drpc-timeout-seconds TimeUnit/SECONDS))))))
-
- (testing "DRPC succeeds for anyone when function is not in ACL"
- (let [func "jog"
- exec-ftr (future (.execute charlie-client func "some args"))
- id (atom "")
- expected "Permissive/No ACL Entry"]
- (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
- (while (empty? @id)
- (reset! id
- (-> alice-invok (.fetchRequest func) .get_request_id)))
- (.result alice-invok @id expected)
- (is (= expected (.get exec-ftr drpc-timeout-seconds TimeUnit/SECONDS))))))
-
- (testing "failure of a request is allowed when function is not in ACL"
- (let [func "jog"
- exec-ftr (future (.execute charlie-client func "some args"))
- id (atom "")]
- (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
- (while (empty? @id)
- (reset! id
- (-> alice-invok (.fetchRequest func) .get_request_id)))
- (.failRequest alice-invok @id)
- (is (thrown-cause? DRPCExecutionException
- (.get exec-ftr drpc-timeout-seconds TimeUnit/SECONDS))))))
-
- (testing "authorized user can fail a request"
- (let [func "jump"
- exec-ftr (future (.execute alice-client func "some args"))
- id (atom "")]
- (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
- (while (empty? @id)
- (reset! id
- (-> charlie-invok (.fetchRequest func) .get_request_id)))
- (.failRequest charlie-invok @id)
- (is (thrown-cause? DRPCExecutionException
- (.get exec-ftr drpc-timeout-seconds TimeUnit/SECONDS))))))
-
- (testing "unauthorized invocation user is denied returning a result"
- (let [func "jump"
- exec-ftr (future (.execute bob-client func "some args"))
- id (atom "")
- expected "Only Authorized User can populate the result"]
- (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
- (while (empty? @id)
- (reset! id
- (-> charlie-invok (.fetchRequest func) .get_request_id)))
- (is (thrown-cause? AuthorizationException
- (.result alice-invok @id "not the expected result")))
- (.result charlie-invok @id expected)
- (is (= expected (.get exec-ftr drpc-timeout-seconds TimeUnit/SECONDS))))))
-
- (testing "unauthorized invocation user is denied failing a request"
- (let [func "jump"
- exec-ftr (future (.execute alice-client func "some args"))
- id (atom "")]
- (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
- (while (empty? @id)
- (reset! id
- (-> charlie-invok (.fetchRequest func) .get_request_id)))
- (is (thrown-cause? AuthorizationException
- (.failRequest alice-invok @id)))
- (.failRequest charlie-invok @id))))
-
- (testing "unauthorized invocation user is denied fetching a request"
- (let [func "jump"
- exec-ftr (future (.execute bob-client func "some args"))
- id (atom "")
- expected "Only authorized users can fetchRequest"]
- (Thread/sleep 1000)
- (is (thrown-cause? AuthorizationException
- (-> alice-invok (.fetchRequest func) .get_request_id)))
- (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
- (while (empty? @id)
- (reset! id
- (-> charlie-invok (.fetchRequest func) .get_request_id)))
- (.result charlie-invok @id expected)
- (is (= expected (.get exec-ftr drpc-timeout-seconds TimeUnit/SECONDS)))))))))
http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/test/jvm/org/apache/storm/daemon/drpc/DRPCServerTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/daemon/drpc/DRPCServerTest.java b/storm-core/test/jvm/org/apache/storm/daemon/drpc/DRPCServerTest.java
new file mode 100644
index 0000000..b2df441
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/daemon/drpc/DRPCServerTest.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.drpc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import java.io.InputStream;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.storm.Config;
+import org.apache.storm.daemon.drpc.DRPCServer;
+import org.apache.storm.drpc.DRPCInvocationsClient;
+import org.apache.storm.generated.DRPCExecutionException;
+import org.apache.storm.generated.DRPCRequest;
+import org.apache.storm.security.auth.SimpleTransportPlugin;
+import org.apache.storm.utils.DRPCClient;
+import org.apache.storm.utils.Utils;
+import org.junit.AfterClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class DRPCServerTest {
+ private static final Logger LOG = LoggerFactory.getLogger(DRPCServerTest.class);
+ private static final ExecutorService exec = Executors.newCachedThreadPool();
+
+ @AfterClass
+ public static void close() {
+ exec.shutdownNow();
+ }
+
+ private static DRPCRequest getNextAvailableRequest(DRPCInvocationsClient invoke, String func) throws Exception {
+ DRPCRequest request = null;
+ long timedout = System.currentTimeMillis() + 5_000;
+ while (System.currentTimeMillis() < timedout) {
+ request = invoke.getClient().fetchRequest(func);
+ if (request != null && request.get_request_id() != null && !request.get_request_id().isEmpty()) {
+ return request;
+ }
+ Thread.sleep(1);
+ }
+ fail("Test timed out waiting for a request on " + func);
+ return request;
+ }
+
+ private Map<String, Object> getConf(int drpcPort, int invocationsPort, Integer httpPort) {
+ Map<String, Object> conf = new HashMap<>();
+ conf.put(Config.DRPC_PORT, drpcPort);
+ conf.put(Config.DRPC_INVOCATIONS_PORT, invocationsPort);
+ conf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, SimpleTransportPlugin.class.getName());
+ conf.put(Config.DRPC_WORKER_THREADS, 5);
+ conf.put(Config.DRPC_INVOCATIONS_THREADS, 5);
+ conf.put(Config.DRPC_MAX_BUFFER_SIZE, 1048576);
+ conf.put(Config.STORM_NIMBUS_RETRY_TIMES, 2);
+ conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL, 10);
+ conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING, 100);
+ if (httpPort != null) {
+ conf.put(Config.DRPC_HTTP_PORT, httpPort);
+ }
+ return conf;
+ }
+
+ @Test
+ public void testGoodThrift() throws Exception {
+ int drpcPort = Utils.getAvailablePort();
+ int invocationsPort = Utils.getAvailablePort(drpcPort + 1);
+ Map<String, Object> conf = getConf(drpcPort, invocationsPort, null);
+ try (DRPCServer server = new DRPCServer(conf)) {
+ exec.submit(() -> {
+ server.start();
+ return null;
+ });
+ try (DRPCClient client = new DRPCClient(conf, "localhost", drpcPort);
+ DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost", invocationsPort)) {
+ Future<String> found = exec.submit(() -> client.getClient().execute("testing", "test"));
+ DRPCRequest request = getNextAvailableRequest(invoke, "testing");
+ assertNotNull(request);
+ assertEquals("test", request.get_func_args());
+ assertNotNull(request.get_request_id());
+ invoke.result(request.get_request_id(), "tested");
+ String result = found.get(1000, TimeUnit.MILLISECONDS);
+ assertEquals("tested", result);
+ }
+ }
+ }
+
+ @Test
+ public void testFailedThrift() throws Exception {
+ int drpcPort = Utils.getAvailablePort();
+ int invocationsPort = Utils.getAvailablePort(drpcPort + 1);
+ Map<String, Object> conf = getConf(drpcPort, invocationsPort, null);
+ try (DRPCServer server = new DRPCServer(conf)) {
+ exec.submit(() -> {
+ server.start();
+ return null;
+ });
+ try (DRPCClient client = new DRPCClient(conf, "localhost", drpcPort);
+ DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost", invocationsPort)) {
+ Future<String> found = exec.submit(() -> client.getClient().execute("testing", "test"));
+ DRPCRequest request = getNextAvailableRequest(invoke, "testing");
+ assertNotNull(request);
+ assertEquals("test", request.get_func_args());
+ assertNotNull(request.get_request_id());
+ invoke.failRequest(request.get_request_id());
+ try {
+ found.get(1000, TimeUnit.MILLISECONDS);
+ fail("exec did not throw an exception");
+ } catch (ExecutionException e) {
+ Throwable t = e.getCause();
+ assertEquals(t.getClass(), DRPCExecutionException.class);
+ //Don't know a better way to validate that it failed.
+ assertEquals("Request failed", ((DRPCExecutionException)t).get_msg());
+ }
+ }
+ }
+ }
+
+ public static String GET(int port, String func, String args) {
+ try {
+ URL url = new URL("http://localhost:"+port+"/drpc/"+func+"/"+args);
+ InputStream in = url.openStream();
+ byte[] buffer = new byte[1024];
+ int read = in.read(buffer);
+ return new String(buffer, 0, read);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testGoodHttpGet() throws Exception {
+ LOG.info("STARTING HTTP GET TEST...");
+ int drpcPort = Utils.getAvailablePort();
+ int invocationsPort = Utils.getAvailablePort(drpcPort + 1);
+ int httpPort = Utils.getAvailablePort(invocationsPort + 1);
+ Map<String, Object> conf = getConf(drpcPort, invocationsPort, httpPort);
+ try (DRPCServer server = new DRPCServer(conf)) {
+ exec.submit(() -> {
+ server.start();
+ return null;
+ });
+ //TODO need a better way to do this
+ Thread.sleep(2000);
+ try (DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost", invocationsPort)) {
+ Future<String> found = exec.submit(() -> GET(httpPort, "testing", "test"));
+ DRPCRequest request = getNextAvailableRequest(invoke, "testing");
+ assertNotNull(request);
+ assertEquals("test", request.get_func_args());
+ assertNotNull(request.get_request_id());
+ invoke.result(request.get_request_id(), "tested");
+ String result = found.get(1000, TimeUnit.MILLISECONDS);
+ assertEquals("tested", result);
+ }
+ }
+ }
+
+ @Test
+ public void testFailedHttpGet() throws Exception {
+ LOG.info("STARTING HTTP GET (FAIL) TEST...");
+ int drpcPort = Utils.getAvailablePort();
+ int invocationsPort = Utils.getAvailablePort(drpcPort + 1);
+ int httpPort = Utils.getAvailablePort(invocationsPort + 1);
+ Map<String, Object> conf = getConf(drpcPort, invocationsPort, httpPort);
+ try (DRPCServer server = new DRPCServer(conf)) {
+ exec.submit(() -> {
+ server.start();
+ return null;
+ });
+ //TODO need a better way to do this
+ Thread.sleep(2000);
+ try (DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost", invocationsPort)) {
+ Future<String> found = exec.submit(() -> GET(httpPort, "testing", "test"));
+ DRPCRequest request = getNextAvailableRequest(invoke, "testing");
+ assertNotNull(request);
+ assertEquals("test", request.get_func_args());
+ assertNotNull(request.get_request_id());
+ invoke.getClient().failRequest(request.get_request_id());
+ try {
+ found.get(1000, TimeUnit.MILLISECONDS);
+ fail("exec did not throw an exception");
+ } catch (ExecutionException e) {
+ LOG.warn("Got Expected Exception", e);
+ //Getting the exact response code is a bit more complex.
+ //TODO should use a better client
+ }
+ }
+ }
+ }
+}