You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/12/19 00:54:12 UTC

[2/8] storm git commit: STORM-2217: Make DRPC pure java with Jersy and prepare for separating classpaths.

http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/generated/DistributedRPCInvocations.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/generated/DistributedRPCInvocations.java b/storm-core/src/jvm/org/apache/storm/generated/DistributedRPCInvocations.java
index d51175d..7d1c1ab 100644
--- a/storm-core/src/jvm/org/apache/storm/generated/DistributedRPCInvocations.java
+++ b/storm-core/src/jvm/org/apache/storm/generated/DistributedRPCInvocations.java
@@ -62,6 +62,8 @@ public class DistributedRPCInvocations {
 
     public void failRequest(String id) throws AuthorizationException, org.apache.thrift.TException;
 
+    public void failRequestV2(String id, DRPCExecutionException e) throws AuthorizationException, org.apache.thrift.TException;
+
   }
 
   public interface AsyncIface {
@@ -72,6 +74,8 @@ public class DistributedRPCInvocations {
 
     public void failRequest(String id, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
 
+    public void failRequestV2(String id, DRPCExecutionException e, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
+
   }
 
   public static class Client extends org.apache.thrift.TServiceClient implements Iface {
@@ -167,6 +171,30 @@ public class DistributedRPCInvocations {
       return;
     }
 
+    public void failRequestV2(String id, DRPCExecutionException e) throws AuthorizationException, org.apache.thrift.TException
+    {
+      send_failRequestV2(id, e);
+      recv_failRequestV2();
+    }
+
+    public void send_failRequestV2(String id, DRPCExecutionException e) throws org.apache.thrift.TException
+    {
+      failRequestV2_args args = new failRequestV2_args();
+      args.set_id(id);
+      args.set_e(e);
+      sendBase("failRequestV2", args);
+    }
+
+    public void recv_failRequestV2() throws AuthorizationException, org.apache.thrift.TException
+    {
+      failRequestV2_result result = new failRequestV2_result();
+      receiveBase(result, "failRequestV2");
+      if (result.aze != null) {
+        throw result.aze;
+      }
+      return;
+    }
+
   }
   public static class AsyncClient extends org.apache.thrift.async.TAsyncClient implements AsyncIface {
     public static class Factory implements org.apache.thrift.async.TAsyncClientFactory<AsyncClient> {
@@ -284,6 +312,41 @@ public class DistributedRPCInvocations {
       }
     }
 
+    public void failRequestV2(String id, DRPCExecutionException e, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
+      checkReady();
+      failRequestV2_call method_call = new failRequestV2_call(id, e, resultHandler, this, ___protocolFactory, ___transport);
+      this.___currentMethod = method_call;
+      ___manager.call(method_call);
+    }
+
+    public static class failRequestV2_call extends org.apache.thrift.async.TAsyncMethodCall {
+      private String id;
+      private DRPCExecutionException e;
+      public failRequestV2_call(String id, DRPCExecutionException e, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+        super(client, protocolFactory, transport, resultHandler, false);
+        this.id = id;
+        this.e = e;
+      }
+
+      public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
+        prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("failRequestV2", org.apache.thrift.protocol.TMessageType.CALL, 0));
+        failRequestV2_args args = new failRequestV2_args();
+        args.set_id(id);
+        args.set_e(e);
+        args.write(prot);
+        prot.writeMessageEnd();
+      }
+
+      public void getResult() throws AuthorizationException, org.apache.thrift.TException {
+        if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
+          throw new IllegalStateException("Method call not finished!");
+        }
+        org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
+        org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport);
+        (new Client(prot)).recv_failRequestV2();
+      }
+    }
+
   }
 
   public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor {
@@ -300,6 +363,7 @@ public class DistributedRPCInvocations {
       processMap.put("result", new result());
       processMap.put("fetchRequest", new fetchRequest());
       processMap.put("failRequest", new failRequest());
+      processMap.put("failRequestV2", new failRequestV2());
       return processMap;
     }
 
@@ -375,6 +439,30 @@ public class DistributedRPCInvocations {
       }
     }
 
+    public static class failRequestV2<I extends Iface> extends org.apache.thrift.ProcessFunction<I, failRequestV2_args> {
+      public failRequestV2() {
+        super("failRequestV2");
+      }
+
+      public failRequestV2_args getEmptyArgsInstance() {
+        return new failRequestV2_args();
+      }
+
+      protected boolean isOneway() {
+        return false;
+      }
+
+      public failRequestV2_result getResult(I iface, failRequestV2_args args) throws org.apache.thrift.TException {
+        failRequestV2_result result = new failRequestV2_result();
+        try {
+          iface.failRequestV2(args.id, args.e);
+        } catch (AuthorizationException aze) {
+          result.aze = aze;
+        }
+        return result;
+      }
+    }
+
   }
 
   public static class AsyncProcessor<I extends AsyncIface> extends org.apache.thrift.TBaseAsyncProcessor<I> {
@@ -391,6 +479,7 @@ public class DistributedRPCInvocations {
       processMap.put("result", new result());
       processMap.put("fetchRequest", new fetchRequest());
       processMap.put("failRequest", new failRequest());
+      processMap.put("failRequestV2", new failRequestV2());
       return processMap;
     }
 
@@ -563,6 +652,62 @@ public class DistributedRPCInvocations {
       }
     }
 
+    public static class failRequestV2<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, failRequestV2_args, Void> {
+      public failRequestV2() {
+        super("failRequestV2");
+      }
+
+      public failRequestV2_args getEmptyArgsInstance() {
+        return new failRequestV2_args();
+      }
+
+      public AsyncMethodCallback<Void> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
+        final org.apache.thrift.AsyncProcessFunction fcall = this;
+        return new AsyncMethodCallback<Void>() { 
+          public void onComplete(Void o) {
+            failRequestV2_result result = new failRequestV2_result();
+            try {
+              fcall.sendResponse(fb,result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);
+              return;
+            } catch (Exception e) {
+              LOGGER.error("Exception writing to internal frame buffer", e);
+            }
+            fb.close();
+          }
+          public void onError(Exception e) {
+            byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
+            org.apache.thrift.TBase msg;
+            failRequestV2_result result = new failRequestV2_result();
+            if (e instanceof AuthorizationException) {
+                        result.aze = (AuthorizationException) e;
+                        result.set_aze_isSet(true);
+                        msg = result;
+            }
+             else 
+            {
+              msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
+              msg = (org.apache.thrift.TBase)new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage());
+            }
+            try {
+              fcall.sendResponse(fb,msg,msgType,seqid);
+              return;
+            } catch (Exception ex) {
+              LOGGER.error("Exception writing to internal frame buffer", ex);
+            }
+            fb.close();
+          }
+        };
+      }
+
+      protected boolean isOneway() {
+        return false;
+      }
+
+      public void start(I iface, failRequestV2_args args, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws TException {
+        iface.failRequestV2(args.id, args.e,resultHandler);
+      }
+    }
+
   }
 
   public static class result_args implements org.apache.thrift.TBase<result_args, result_args._Fields>, java.io.Serializable, Cloneable, Comparable<result_args>   {
@@ -2932,4 +3077,828 @@ public class DistributedRPCInvocations {
 
   }
 
+  public static class failRequestV2_args implements org.apache.thrift.TBase<failRequestV2_args, failRequestV2_args._Fields>, java.io.Serializable, Cloneable, Comparable<failRequestV2_args>   {
+    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("failRequestV2_args");
+
+    private static final org.apache.thrift.protocol.TField ID_FIELD_DESC = new org.apache.thrift.protocol.TField("id", org.apache.thrift.protocol.TType.STRING, (short)1);
+    private static final org.apache.thrift.protocol.TField E_FIELD_DESC = new org.apache.thrift.protocol.TField("e", org.apache.thrift.protocol.TType.STRUCT, (short)2);
+
+    private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+    static {
+      schemes.put(StandardScheme.class, new failRequestV2_argsStandardSchemeFactory());
+      schemes.put(TupleScheme.class, new failRequestV2_argsTupleSchemeFactory());
+    }
+
+    private String id; // required
+    private DRPCExecutionException e; // required
+
+    /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+    public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+      ID((short)1, "id"),
+      E((short)2, "e");
+
+      private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+      static {
+        for (_Fields field : EnumSet.allOf(_Fields.class)) {
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not found.
+       */
+      public static _Fields findByThriftId(int fieldId) {
+        switch(fieldId) {
+          case 1: // ID
+            return ID;
+          case 2: // E
+            return E;
+          default:
+            return null;
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      public static _Fields findByName(String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final String _fieldName;
+
+      _Fields(short thriftId, String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public String getFieldName() {
+        return _fieldName;
+      }
+    }
+
+    // isset id assignments
+    public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+    static {
+      Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+      tmpMap.put(_Fields.ID, new org.apache.thrift.meta_data.FieldMetaData("id", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+      tmpMap.put(_Fields.E, new org.apache.thrift.meta_data.FieldMetaData("e", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRUCT)));
+      metaDataMap = Collections.unmodifiableMap(tmpMap);
+      org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(failRequestV2_args.class, metaDataMap);
+    }
+
+    public failRequestV2_args() {
+    }
+
+    public failRequestV2_args(
+      String id,
+      DRPCExecutionException e)
+    {
+      this();
+      this.id = id;
+      this.e = e;
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public failRequestV2_args(failRequestV2_args other) {
+      if (other.is_set_id()) {
+        this.id = other.id;
+      }
+      if (other.is_set_e()) {
+        this.e = new DRPCExecutionException(other.e);
+      }
+    }
+
+    public failRequestV2_args deepCopy() {
+      return new failRequestV2_args(this);
+    }
+
+    @Override
+    public void clear() {
+      this.id = null;
+      this.e = null;
+    }
+
+    public String get_id() {
+      return this.id;
+    }
+
+    public void set_id(String id) {
+      this.id = id;
+    }
+
+    public void unset_id() {
+      this.id = null;
+    }
+
+    /** Returns true if field id is set (has been assigned a value) and false otherwise */
+    public boolean is_set_id() {
+      return this.id != null;
+    }
+
+    public void set_id_isSet(boolean value) {
+      if (!value) {
+        this.id = null;
+      }
+    }
+
+    public DRPCExecutionException get_e() {
+      return this.e;
+    }
+
+    public void set_e(DRPCExecutionException e) {
+      this.e = e;
+    }
+
+    public void unset_e() {
+      this.e = null;
+    }
+
+    /** Returns true if field e is set (has been assigned a value) and false otherwise */
+    public boolean is_set_e() {
+      return this.e != null;
+    }
+
+    public void set_e_isSet(boolean value) {
+      if (!value) {
+        this.e = null;
+      }
+    }
+
+    public void setFieldValue(_Fields field, Object value) {
+      switch (field) {
+      case ID:
+        if (value == null) {
+          unset_id();
+        } else {
+          set_id((String)value);
+        }
+        break;
+
+      case E:
+        if (value == null) {
+          unset_e();
+        } else {
+          set_e((DRPCExecutionException)value);
+        }
+        break;
+
+      }
+    }
+
+    public Object getFieldValue(_Fields field) {
+      switch (field) {
+      case ID:
+        return get_id();
+
+      case E:
+        return get_e();
+
+      }
+      throw new IllegalStateException();
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+    public boolean isSet(_Fields field) {
+      if (field == null) {
+        throw new IllegalArgumentException();
+      }
+
+      switch (field) {
+      case ID:
+        return is_set_id();
+      case E:
+        return is_set_e();
+      }
+      throw new IllegalStateException();
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof failRequestV2_args)
+        return this.equals((failRequestV2_args)that);
+      return false;
+    }
+
+    public boolean equals(failRequestV2_args that) {
+      if (that == null)
+        return false;
+
+      boolean this_present_id = true && this.is_set_id();
+      boolean that_present_id = true && that.is_set_id();
+      if (this_present_id || that_present_id) {
+        if (!(this_present_id && that_present_id))
+          return false;
+        if (!this.id.equals(that.id))
+          return false;
+      }
+
+      boolean this_present_e = true && this.is_set_e();
+      boolean that_present_e = true && that.is_set_e();
+      if (this_present_e || that_present_e) {
+        if (!(this_present_e && that_present_e))
+          return false;
+        if (!this.e.equals(that.e))
+          return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      List<Object> list = new ArrayList<Object>();
+
+      boolean present_id = true && (is_set_id());
+      list.add(present_id);
+      if (present_id)
+        list.add(id);
+
+      boolean present_e = true && (is_set_e());
+      list.add(present_e);
+      if (present_e)
+        list.add(e);
+
+      return list.hashCode();
+    }
+
+    @Override
+    public int compareTo(failRequestV2_args other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+
+      lastComparison = Boolean.valueOf(is_set_id()).compareTo(other.is_set_id());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (is_set_id()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.id, other.id);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
+      lastComparison = Boolean.valueOf(is_set_e()).compareTo(other.is_set_e());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (is_set_e()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.e, other.e);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
+      return 0;
+    }
+
+    public _Fields fieldForId(int fieldId) {
+      return _Fields.findByThriftId(fieldId);
+    }
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+      schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+      schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("failRequestV2_args(");
+      boolean first = true;
+
+      sb.append("id:");
+      if (this.id == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.id);
+      }
+      first = false;
+      if (!first) sb.append(", ");
+      sb.append("e:");
+      if (this.e == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.e);
+      }
+      first = false;
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws org.apache.thrift.TException {
+      // check for required fields
+      // check for sub-struct validity
+    }
+
+    private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+      try {
+        write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+      try {
+        read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private static class failRequestV2_argsStandardSchemeFactory implements SchemeFactory {
+      public failRequestV2_argsStandardScheme getScheme() {
+        return new failRequestV2_argsStandardScheme();
+      }
+    }
+
+    private static class failRequestV2_argsStandardScheme extends StandardScheme<failRequestV2_args> {
+
+      public void read(org.apache.thrift.protocol.TProtocol iprot, failRequestV2_args struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TField schemeField;
+        iprot.readStructBegin();
+        while (true)
+        {
+          schemeField = iprot.readFieldBegin();
+          if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+            break;
+          }
+          switch (schemeField.id) {
+            case 1: // ID
+              if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+                struct.id = iprot.readString();
+                struct.set_id_isSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
+            case 2: // E
+              if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
+                struct.e = new DRPCExecutionException();
+                struct.e.read(iprot);
+                struct.set_e_isSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
+            default:
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+          }
+          iprot.readFieldEnd();
+        }
+        iprot.readStructEnd();
+        struct.validate();
+      }
+
+      public void write(org.apache.thrift.protocol.TProtocol oprot, failRequestV2_args struct) throws org.apache.thrift.TException {
+        struct.validate();
+
+        oprot.writeStructBegin(STRUCT_DESC);
+        if (struct.id != null) {
+          oprot.writeFieldBegin(ID_FIELD_DESC);
+          oprot.writeString(struct.id);
+          oprot.writeFieldEnd();
+        }
+        if (struct.e != null) {
+          oprot.writeFieldBegin(E_FIELD_DESC);
+          struct.e.write(oprot);
+          oprot.writeFieldEnd();
+        }
+        oprot.writeFieldStop();
+        oprot.writeStructEnd();
+      }
+
+    }
+
+    private static class failRequestV2_argsTupleSchemeFactory implements SchemeFactory {
+      public failRequestV2_argsTupleScheme getScheme() {
+        return new failRequestV2_argsTupleScheme();
+      }
+    }
+
+    private static class failRequestV2_argsTupleScheme extends TupleScheme<failRequestV2_args> {
+
+      @Override
+      public void write(org.apache.thrift.protocol.TProtocol prot, failRequestV2_args struct) throws org.apache.thrift.TException {
+        TTupleProtocol oprot = (TTupleProtocol) prot;
+        BitSet optionals = new BitSet();
+        if (struct.is_set_id()) {
+          optionals.set(0);
+        }
+        if (struct.is_set_e()) {
+          optionals.set(1);
+        }
+        oprot.writeBitSet(optionals, 2);
+        if (struct.is_set_id()) {
+          oprot.writeString(struct.id);
+        }
+        if (struct.is_set_e()) {
+          struct.e.write(oprot);
+        }
+      }
+
+      @Override
+      public void read(org.apache.thrift.protocol.TProtocol prot, failRequestV2_args struct) throws org.apache.thrift.TException {
+        TTupleProtocol iprot = (TTupleProtocol) prot;
+        BitSet incoming = iprot.readBitSet(2);
+        if (incoming.get(0)) {
+          struct.id = iprot.readString();
+          struct.set_id_isSet(true);
+        }
+        if (incoming.get(1)) {
+          struct.e = new DRPCExecutionException();
+          struct.e.read(iprot);
+          struct.set_e_isSet(true);
+        }
+      }
+    }
+
+  }
+
+  public static class failRequestV2_result implements org.apache.thrift.TBase<failRequestV2_result, failRequestV2_result._Fields>, java.io.Serializable, Cloneable, Comparable<failRequestV2_result>   {
+    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("failRequestV2_result");
+
+    private static final org.apache.thrift.protocol.TField AZE_FIELD_DESC = new org.apache.thrift.protocol.TField("aze", org.apache.thrift.protocol.TType.STRUCT, (short)1);
+
+    private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+    static {
+      schemes.put(StandardScheme.class, new failRequestV2_resultStandardSchemeFactory());
+      schemes.put(TupleScheme.class, new failRequestV2_resultTupleSchemeFactory());
+    }
+
+    private AuthorizationException aze; // required
+
+    /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+    public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+      AZE((short)1, "aze");
+
+      private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+      static {
+        for (_Fields field : EnumSet.allOf(_Fields.class)) {
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not found.
+       */
+      public static _Fields findByThriftId(int fieldId) {
+        switch(fieldId) {
+          case 1: // AZE
+            return AZE;
+          default:
+            return null;
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      public static _Fields findByName(String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final String _fieldName;
+
+      _Fields(short thriftId, String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public String getFieldName() {
+        return _fieldName;
+      }
+    }
+
+    // isset id assignments
+    public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+    static {
+      Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+      tmpMap.put(_Fields.AZE, new org.apache.thrift.meta_data.FieldMetaData("aze", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRUCT)));
+      metaDataMap = Collections.unmodifiableMap(tmpMap);
+      org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(failRequestV2_result.class, metaDataMap);
+    }
+
+    public failRequestV2_result() {
+    }
+
+    public failRequestV2_result(
+      AuthorizationException aze)
+    {
+      this();
+      this.aze = aze;
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public failRequestV2_result(failRequestV2_result other) {
+      if (other.is_set_aze()) {
+        this.aze = new AuthorizationException(other.aze);
+      }
+    }
+
+    public failRequestV2_result deepCopy() {
+      return new failRequestV2_result(this);
+    }
+
+    @Override
+    public void clear() {
+      this.aze = null;
+    }
+
+    public AuthorizationException get_aze() {
+      return this.aze;
+    }
+
+    public void set_aze(AuthorizationException aze) {
+      this.aze = aze;
+    }
+
+    public void unset_aze() {
+      this.aze = null;
+    }
+
+    /** Returns true if field aze is set (has been assigned a value) and false otherwise */
+    public boolean is_set_aze() {
+      return this.aze != null;
+    }
+
+    public void set_aze_isSet(boolean value) {
+      if (!value) {
+        this.aze = null;
+      }
+    }
+
+    public void setFieldValue(_Fields field, Object value) {
+      switch (field) {
+      case AZE:
+        if (value == null) {
+          unset_aze();
+        } else {
+          set_aze((AuthorizationException)value);
+        }
+        break;
+
+      }
+    }
+
+    public Object getFieldValue(_Fields field) {
+      switch (field) {
+      case AZE:
+        return get_aze();
+
+      }
+      throw new IllegalStateException();
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+    public boolean isSet(_Fields field) {
+      if (field == null) {
+        throw new IllegalArgumentException();
+      }
+
+      switch (field) {
+      case AZE:
+        return is_set_aze();
+      }
+      throw new IllegalStateException();
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof failRequestV2_result)
+        return this.equals((failRequestV2_result)that);
+      return false;
+    }
+
+    public boolean equals(failRequestV2_result that) {
+      if (that == null)
+        return false;
+
+      boolean this_present_aze = true && this.is_set_aze();
+      boolean that_present_aze = true && that.is_set_aze();
+      if (this_present_aze || that_present_aze) {
+        if (!(this_present_aze && that_present_aze))
+          return false;
+        if (!this.aze.equals(that.aze))
+          return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      List<Object> list = new ArrayList<Object>();
+
+      boolean present_aze = true && (is_set_aze());
+      list.add(present_aze);
+      if (present_aze)
+        list.add(aze);
+
+      return list.hashCode();
+    }
+
+    @Override
+    public int compareTo(failRequestV2_result other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+
+      lastComparison = Boolean.valueOf(is_set_aze()).compareTo(other.is_set_aze());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (is_set_aze()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.aze, other.aze);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
+      return 0;
+    }
+
+    public _Fields fieldForId(int fieldId) {
+      return _Fields.findByThriftId(fieldId);
+    }
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+      schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+      schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+      }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("failRequestV2_result(");
+      boolean first = true;
+
+      sb.append("aze:");
+      if (this.aze == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.aze);
+      }
+      first = false;
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws org.apache.thrift.TException {
+      // check for required fields
+      // check for sub-struct validity
+    }
+
+    private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+      try {
+        write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+      try {
+        read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private static class failRequestV2_resultStandardSchemeFactory implements SchemeFactory {
+      public failRequestV2_resultStandardScheme getScheme() {
+        return new failRequestV2_resultStandardScheme();
+      }
+    }
+
+    private static class failRequestV2_resultStandardScheme extends StandardScheme<failRequestV2_result> {
+
+      public void read(org.apache.thrift.protocol.TProtocol iprot, failRequestV2_result struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TField schemeField;
+        iprot.readStructBegin();
+        while (true)
+        {
+          schemeField = iprot.readFieldBegin();
+          if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+            break;
+          }
+          switch (schemeField.id) {
+            case 1: // AZE
+              if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
+                struct.aze = new AuthorizationException();
+                struct.aze.read(iprot);
+                struct.set_aze_isSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+              }
+              break;
+            default:
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+          }
+          iprot.readFieldEnd();
+        }
+        iprot.readStructEnd();
+        struct.validate();
+      }
+
+      public void write(org.apache.thrift.protocol.TProtocol oprot, failRequestV2_result struct) throws org.apache.thrift.TException {
+        struct.validate();
+
+        oprot.writeStructBegin(STRUCT_DESC);
+        if (struct.aze != null) {
+          oprot.writeFieldBegin(AZE_FIELD_DESC);
+          struct.aze.write(oprot);
+          oprot.writeFieldEnd();
+        }
+        oprot.writeFieldStop();
+        oprot.writeStructEnd();
+      }
+
+    }
+
+    private static class failRequestV2_resultTupleSchemeFactory implements SchemeFactory {
+      public failRequestV2_resultTupleScheme getScheme() {
+        return new failRequestV2_resultTupleScheme();
+      }
+    }
+
+    private static class failRequestV2_resultTupleScheme extends TupleScheme<failRequestV2_result> {
+
+      @Override
+      public void write(org.apache.thrift.protocol.TProtocol prot, failRequestV2_result struct) throws org.apache.thrift.TException {
+        TTupleProtocol oprot = (TTupleProtocol) prot;
+        BitSet optionals = new BitSet();
+        if (struct.is_set_aze()) {
+          optionals.set(0);
+        }
+        oprot.writeBitSet(optionals, 1);
+        if (struct.is_set_aze()) {
+          struct.aze.write(oprot);
+        }
+      }
+
+      @Override
+      public void read(org.apache.thrift.protocol.TProtocol prot, failRequestV2_result struct) throws org.apache.thrift.TException {
+        TTupleProtocol iprot = (TTupleProtocol) prot;
+        BitSet incoming = iprot.readBitSet(1);
+        if (incoming.get(0)) {
+          struct.aze = new AuthorizationException();
+          struct.aze.read(iprot);
+          struct.set_aze_isSet(true);
+        }
+      }
+    }
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/security/auth/AuthUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/security/auth/AuthUtils.java b/storm-core/src/jvm/org/apache/storm/security/auth/AuthUtils.java
index 353f679..e38a575 100644
--- a/storm-core/src/jvm/org/apache/storm/security/auth/AuthUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/security/auth/AuthUtils.java
@@ -30,6 +30,7 @@ import java.security.URIParameter;
 import java.security.MessageDigest;
 
 import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang.StringUtils;
 import org.apache.storm.security.INimbusCredentialPlugin;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
@@ -178,7 +179,7 @@ public class AuthUtils {
      * @param conf daemon configuration
      * @return the plugin
      */
-    public static IGroupMappingServiceProvider GetGroupMappingServiceProviderPlugin(Map conf) {
+    public static IGroupMappingServiceProvider GetGroupMappingServiceProviderPlugin(Map<String, Object> conf) {
         IGroupMappingServiceProvider gmsp = null;
         try {
             String gmsp_klassName = (String) conf.get(Config.STORM_GROUP_MAPPING_SERVICE_PROVIDER_PLUGIN);
@@ -318,11 +319,14 @@ public class AuthUtils {
         }
     }
 
-    private static IHttpCredentialsPlugin GetHttpCredentialsPlugin(Map conf,
+    public static IHttpCredentialsPlugin GetHttpCredentialsPlugin(Map<String, Object> conf,
             String klassName) {
         try {
-            IHttpCredentialsPlugin plugin = Utils.newInstance(klassName);
-            plugin.prepare(conf);
+            IHttpCredentialsPlugin plugin = null;
+            if (StringUtils.isNotBlank(klassName)) {
+                plugin = Utils.newInstance(klassName);
+                plugin.prepare(conf);
+            }
             return plugin;
         } catch (Exception e) {
             throw new RuntimeException(e);
@@ -348,7 +352,7 @@ public class AuthUtils {
      */
     public static IHttpCredentialsPlugin GetDrpcHttpCredentialsPlugin(Map conf) {
         String klassName = (String)conf.get(Config.DRPC_HTTP_CREDS_PLUGIN);
-        return AuthUtils.GetHttpCredentialsPlugin(conf, klassName);
+        return klassName == null ? null : AuthUtils.GetHttpCredentialsPlugin(conf, klassName);
     }
 
     private static final String USERNAME = "username";

http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/security/auth/ReqContext.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/security/auth/ReqContext.java b/storm-core/src/jvm/org/apache/storm/security/auth/ReqContext.java
index abc83b4..6d92fd9 100644
--- a/storm-core/src/jvm/org/apache/storm/security/auth/ReqContext.java
+++ b/storm-core/src/jvm/org/apache/storm/security/auth/ReqContext.java
@@ -17,17 +17,17 @@
  */
 package org.apache.storm.security.auth;
 
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.net.InetAddress;
-import com.google.common.annotations.VisibleForTesting;
-
 import java.security.AccessControlContext;
 import java.security.AccessController;
 import java.security.Principal;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import javax.security.auth.Subject;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * context request context includes info about:
  *
@@ -40,7 +40,6 @@ public class ReqContext {
     private Subject _subject;
     private InetAddress _remoteAddr;
     private Integer _reqID;
-    private Map _storm_conf;
     private Principal realPrincipal;
 
     @Override
@@ -53,7 +52,7 @@ public class ReqContext {
                 ", ThreadId=" + Thread.currentThread().toString() +
                 '}';
     }
-
+    
     /**
      * @return a request context associated with current thread
      */
@@ -91,7 +90,6 @@ public class ReqContext {
         _reqID = uniqueId.incrementAndGet();
     }
 
-
     /**
      * client address
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java b/storm-core/src/jvm/org/apache/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java
index bcff708..8aa1fd3 100644
--- a/storm-core/src/jvm/org/apache/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java
+++ b/storm-core/src/jvm/org/apache/storm/security/auth/authorizer/DRPCSimpleACLAuthorizer.java
@@ -35,6 +35,8 @@ import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public class DRPCSimpleACLAuthorizer extends DRPCAuthorizerBase {
     public static final Logger LOG =
         LoggerFactory.getLogger(DRPCSimpleACLAuthorizer.class);
@@ -47,7 +49,8 @@ public class DRPCSimpleACLAuthorizer extends DRPCAuthorizerBase {
     protected IPrincipalToLocal _ptol;
     protected boolean _permitWhenMissingFunctionEntry = false;
 
-    protected static class AclFunctionEntry {
+    @VisibleForTesting
+    public static class AclFunctionEntry {
         final public Set<String> clientUsers;
         final public String invocationUser;
         public AclFunctionEntry(Collection<String> clientUsers,

http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/ui/UIHelpers.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/ui/UIHelpers.java b/storm-core/src/jvm/org/apache/storm/ui/UIHelpers.java
index e046061..e97d52d 100644
--- a/storm-core/src/jvm/org/apache/storm/ui/UIHelpers.java
+++ b/storm-core/src/jvm/org/apache/storm/ui/UIHelpers.java
@@ -19,8 +19,13 @@ package org.apache.storm.ui;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableMap;
+
+import org.apache.storm.Config;
+import org.apache.storm.daemon.drpc.webapp.ReqContextFilter;
 import org.apache.storm.generated.ExecutorInfo;
 import org.apache.storm.logging.filters.AccessLoggingFilter;
+import org.apache.storm.security.auth.AuthUtils;
+import org.apache.storm.security.auth.IHttpCredentialsPlugin;
 import org.apache.storm.utils.Utils;
 import org.eclipse.jetty.server.Connector;
 import org.eclipse.jetty.server.DispatcherType;
@@ -40,6 +45,7 @@ import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.net.URLEncoder;
 import java.util.*;
+import java.util.Map.Entry;
 
 public class UIHelpers {
 
@@ -160,34 +166,52 @@ public class UIHelpers {
     }
 
     public static void configFilter(Server server, Servlet servlet, List<FilterConfiguration> filtersConfs) {
+        configFilter(server, servlet, filtersConfs, null);
+    }
+    
+    public static void configFilter(Server server, Servlet servlet, List<FilterConfiguration> filtersConfs, Map<String, String> params) {
         if (filtersConfs != null) {
             ServletHolder servletHolder = new ServletHolder(servlet);
+            servletHolder.setInitOrder(0);
+            if (params != null) {
+                servletHolder.setInitParameters(params);
+            }
             ServletContextHandler context = new ServletContextHandler(server, "/");
             context.addServlet(servletHolder, "/");
-            context.addFilter(corsFilterHandle(), "/*", EnumSet.allOf(DispatcherType.class));
-            for (FilterConfiguration filterConf : filtersConfs) {
-                String filterName = filterConf.getFilterName();
-                String filterClass = filterConf.getFilterClass();
-                Map filterParams = filterConf.getFilterParams();
-                if (filterClass != null) {
-                    FilterHolder filterHolder = new FilterHolder();
-                    filterHolder.setClassName(filterClass);
-                    if (filterName != null) {
-                        filterHolder.setName(filterName);
-                    } else {
-                        filterHolder.setName(filterClass);
-                    }
-                    if (filterParams != null) {
-                        filterHolder.setInitParameters(filterParams);
-                    } else {
-                        filterHolder.setInitParameters(new HashMap<String, String>());
-                    }
-                    context.addFilter(filterHolder, "/*", FilterMapping.ALL);
+            configFilters(context, filtersConfs);
+            server.setHandler(context);
+        }
+    }
+    
+    public static void configFilters(ServletContextHandler context, List<FilterConfiguration> filtersConfs) {
+        context.addFilter(corsFilterHandle(), "/*", EnumSet.allOf(DispatcherType.class));
+        for (FilterConfiguration filterConf : filtersConfs) {
+            String filterName = filterConf.getFilterName();
+            String filterClass = filterConf.getFilterClass();
+            Map filterParams = filterConf.getFilterParams();
+            if (filterClass != null) {
+                FilterHolder filterHolder = new FilterHolder();
+                filterHolder.setClassName(filterClass);
+                if (filterName != null) {
+                    filterHolder.setName(filterName);
+                } else {
+                    filterHolder.setName(filterClass);
+                }
+                if (filterParams != null) {
+                    filterHolder.setInitParameters(filterParams);
+                } else {
+                    filterHolder.setInitParameters(new HashMap<String, String>());
                 }
+                context.addFilter(filterHolder, "/*", FilterMapping.ALL);
             }
-            context.addFilter(mkAccessLoggingFilterHandle(), "/*", EnumSet.allOf(DispatcherType.class));
-            server.setHandler(context);
         }
+        context.addFilter(mkAccessLoggingFilterHandle(), "/*", EnumSet.allOf(DispatcherType.class));
+    }
+    
+    public static void addRequestContextFilter(ServletContextHandler context, String configName, Map<String, Object> conf) {
+        IHttpCredentialsPlugin auth = AuthUtils.GetHttpCredentialsPlugin(conf, (String)conf.get(configName));
+        ReqContextFilter filter = new ReqContextFilter(auth);
+        context.addFilter(new FilterHolder(filter), "/*", FilterMapping.ALL);
     }
 
     private static Server removeNonSslConnector(Server server) {
@@ -202,7 +226,7 @@ public class UIHelpers {
     /**
      * Construct a Jetty Server instance.
      */
-    private static Server jettyCreateServer(Integer port, String host, Integer httpsPort) {
+    public static Server jettyCreateServer(Integer port, String host, Integer httpsPort) {
         SelectChannelConnector connector = new SelectChannelConnector();
         connector.setPort(Utils.getInt(port, 80));
         connector.setHost(host);

http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index a9856c4..cabfcc6 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -1950,6 +1950,7 @@ public class Utils {
             public void run() {
                 try {
                     Time.sleepSecs(1);
+                    LOG.warn("Forceing Halt...");
                     Runtime.getRuntime().halt(20);
                 } catch (Exception e) {
                     LOG.warn("Exception in the ShutDownHook", e);

http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/py/storm/DistributedRPCInvocations-remote
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/DistributedRPCInvocations-remote b/storm-core/src/py/storm/DistributedRPCInvocations-remote
index 01435b6..d1f100e 100644
--- a/storm-core/src/py/storm/DistributedRPCInvocations-remote
+++ b/storm-core/src/py/storm/DistributedRPCInvocations-remote
@@ -45,6 +45,7 @@ if len(sys.argv) <= 1 or sys.argv[1] == '--help':
   print('  void result(string id, string result)')
   print('  DRPCRequest fetchRequest(string functionName)')
   print('  void failRequest(string id)')
+  print('  void failRequestV2(string id, DRPCExecutionException e)')
   print('')
   sys.exit(0)
 
@@ -119,6 +120,12 @@ elif cmd == 'failRequest':
     sys.exit(1)
   pp.pprint(client.failRequest(args[0],))
 
+elif cmd == 'failRequestV2':
+  if len(args) != 2:
+    print('failRequestV2 requires 2 args')
+    sys.exit(1)
+  pp.pprint(client.failRequestV2(args[0],eval(args[1]),))
+
 else:
   print('Unrecognized method %s' % cmd)
   sys.exit(1)

http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/py/storm/DistributedRPCInvocations.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/DistributedRPCInvocations.py b/storm-core/src/py/storm/DistributedRPCInvocations.py
index 207fa9d..df873fe 100644
--- a/storm-core/src/py/storm/DistributedRPCInvocations.py
+++ b/storm-core/src/py/storm/DistributedRPCInvocations.py
@@ -59,6 +59,14 @@ class Iface:
     """
     pass
 
+  def failRequestV2(self, id, e):
+    """
+    Parameters:
+     - id
+     - e
+    """
+    pass
+
 
 class Client(Iface):
   def __init__(self, iprot, oprot=None):
@@ -164,6 +172,39 @@ class Client(Iface):
       raise result.aze
     return
 
+  def failRequestV2(self, id, e):
+    """
+    Parameters:
+     - id
+     - e
+    """
+    self.send_failRequestV2(id, e)
+    self.recv_failRequestV2()
+
+  def send_failRequestV2(self, id, e):
+    self._oprot.writeMessageBegin('failRequestV2', TMessageType.CALL, self._seqid)
+    args = failRequestV2_args()
+    args.id = id
+    args.e = e
+    args.write(self._oprot)
+    self._oprot.writeMessageEnd()
+    self._oprot.trans.flush()
+
+  def recv_failRequestV2(self):
+    iprot = self._iprot
+    (fname, mtype, rseqid) = iprot.readMessageBegin()
+    if mtype == TMessageType.EXCEPTION:
+      x = TApplicationException()
+      x.read(iprot)
+      iprot.readMessageEnd()
+      raise x
+    result = failRequestV2_result()
+    result.read(iprot)
+    iprot.readMessageEnd()
+    if result.aze is not None:
+      raise result.aze
+    return
+
 
 class Processor(Iface, TProcessor):
   def __init__(self, handler):
@@ -172,6 +213,7 @@ class Processor(Iface, TProcessor):
     self._processMap["result"] = Processor.process_result
     self._processMap["fetchRequest"] = Processor.process_fetchRequest
     self._processMap["failRequest"] = Processor.process_failRequest
+    self._processMap["failRequestV2"] = Processor.process_failRequestV2
 
   def process(self, iprot, oprot):
     (name, type, seqid) = iprot.readMessageBegin()
@@ -254,6 +296,28 @@ class Processor(Iface, TProcessor):
     oprot.writeMessageEnd()
     oprot.trans.flush()
 
+  def process_failRequestV2(self, seqid, iprot, oprot):
+    args = failRequestV2_args()
+    args.read(iprot)
+    iprot.readMessageEnd()
+    result = failRequestV2_result()
+    try:
+      self._handler.failRequestV2(args.id, args.e)
+      msg_type = TMessageType.REPLY
+    except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+      raise
+    except AuthorizationException as aze:
+      msg_type = TMessageType.REPLY
+      result.aze = aze
+    except Exception as ex:
+      msg_type = TMessageType.EXCEPTION
+      logging.exception(ex)
+      result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+    oprot.writeMessageBegin("failRequestV2", msg_type, seqid)
+    result.write(oprot)
+    oprot.writeMessageEnd()
+    oprot.trans.flush()
+
 
 # HELPER FUNCTIONS AND STRUCTURES
 
@@ -675,3 +739,148 @@ class failRequest_result:
 
   def __ne__(self, other):
     return not (self == other)
+
+class failRequestV2_args:
+  """
+  Attributes:
+   - id
+   - e
+  """
+
+  thrift_spec = (
+    None, # 0
+    (1, TType.STRING, 'id', None, None, ), # 1
+    (2, TType.STRUCT, 'e', (DRPCExecutionException, DRPCExecutionException.thrift_spec), None, ), # 2
+  )
+
+  def __init__(self, id=None, e=None,):
+    self.id = id
+    self.e = e
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 1:
+        if ftype == TType.STRING:
+          self.id = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
+      elif fid == 2:
+        if ftype == TType.STRUCT:
+          self.e = DRPCExecutionException()
+          self.e.read(iprot)
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('failRequestV2_args')
+    if self.id is not None:
+      oprot.writeFieldBegin('id', TType.STRING, 1)
+      oprot.writeString(self.id.encode('utf-8'))
+      oprot.writeFieldEnd()
+    if self.e is not None:
+      oprot.writeFieldBegin('e', TType.STRUCT, 2)
+      self.e.write(oprot)
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    return
+
+
+  def __hash__(self):
+    value = 17
+    value = (value * 31) ^ hash(self.id)
+    value = (value * 31) ^ hash(self.e)
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
+class failRequestV2_result:
+  """
+  Attributes:
+   - aze
+  """
+
+  thrift_spec = (
+    None, # 0
+    (1, TType.STRUCT, 'aze', (AuthorizationException, AuthorizationException.thrift_spec), None, ), # 1
+  )
+
+  def __init__(self, aze=None,):
+    self.aze = aze
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 1:
+        if ftype == TType.STRUCT:
+          self.aze = AuthorizationException()
+          self.aze.read(iprot)
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('failRequestV2_result')
+    if self.aze is not None:
+      oprot.writeFieldBegin('aze', TType.STRUCT, 1)
+      self.aze.write(oprot)
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    return
+
+
+  def __hash__(self):
+    value = 17
+    value = (value * 31) ^ hash(self.aze)
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)

http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/py/storm/ttypes.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/ttypes.py b/storm-core/src/py/storm/ttypes.py
index 4fac146..e9085e6 100644
--- a/storm-core/src/py/storm/ttypes.py
+++ b/storm-core/src/py/storm/ttypes.py
@@ -156,6 +156,26 @@ class LogLevelAction:
     "REMOVE": 3,
   }
 
+class DRPCExceptionType:
+  INTERNAL_ERROR = 0
+  SERVER_SHUTDOWN = 1
+  SERVER_TIMEOUT = 2
+  FAILED_REQUEST = 3
+
+  _VALUES_TO_NAMES = {
+    0: "INTERNAL_ERROR",
+    1: "SERVER_SHUTDOWN",
+    2: "SERVER_TIMEOUT",
+    3: "FAILED_REQUEST",
+  }
+
+  _NAMES_TO_VALUES = {
+    "INTERNAL_ERROR": 0,
+    "SERVER_SHUTDOWN": 1,
+    "SERVER_TIMEOUT": 2,
+    "FAILED_REQUEST": 3,
+  }
+
 class HBServerMessageType:
   CREATE_PATH = 0
   CREATE_PATH_RESPONSE = 1
@@ -11002,15 +11022,18 @@ class DRPCExecutionException(TException):
   """
   Attributes:
    - msg
+   - type
   """
 
   thrift_spec = (
     None, # 0
     (1, TType.STRING, 'msg', None, None, ), # 1
+    (2, TType.I32, 'type', None, None, ), # 2
   )
 
-  def __init__(self, msg=None,):
+  def __init__(self, msg=None, type=None,):
     self.msg = msg
+    self.type = type
 
   def read(self, iprot):
     if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -11026,6 +11049,11 @@ class DRPCExecutionException(TException):
           self.msg = iprot.readString().decode('utf-8')
         else:
           iprot.skip(ftype)
+      elif fid == 2:
+        if ftype == TType.I32:
+          self.type = iprot.readI32()
+        else:
+          iprot.skip(ftype)
       else:
         iprot.skip(ftype)
       iprot.readFieldEnd()
@@ -11040,6 +11068,10 @@ class DRPCExecutionException(TException):
       oprot.writeFieldBegin('msg', TType.STRING, 1)
       oprot.writeString(self.msg.encode('utf-8'))
       oprot.writeFieldEnd()
+    if self.type is not None:
+      oprot.writeFieldBegin('type', TType.I32, 2)
+      oprot.writeI32(self.type)
+      oprot.writeFieldEnd()
     oprot.writeFieldStop()
     oprot.writeStructEnd()
 
@@ -11055,6 +11087,7 @@ class DRPCExecutionException(TException):
   def __hash__(self):
     value = 17
     value = (value * 31) ^ hash(self.msg)
+    value = (value * 31) ^ hash(self.type)
     return value
 
   def __repr__(self):

http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/src/storm.thrift
----------------------------------------------------------------------
diff --git a/storm-core/src/storm.thrift b/storm-core/src/storm.thrift
index fc018ee..8b27df7 100644
--- a/storm-core/src/storm.thrift
+++ b/storm-core/src/storm.thrift
@@ -690,8 +690,16 @@ struct DRPCRequest {
   2: required string request_id;
 }
 
+enum DRPCExceptionType {
+  INTERNAL_ERROR,
+  SERVER_SHUTDOWN,
+  SERVER_TIMEOUT,
+  FAILED_REQUEST
+}
+
 exception DRPCExecutionException {
   1: required string msg;
+  2: optional DRPCExceptionType type;
 }
 
 service DistributedRPC {
@@ -702,6 +710,7 @@ service DistributedRPCInvocations {
   void result(1: string id, 2: string result) throws (1: AuthorizationException aze);
   DRPCRequest fetchRequest(1: string functionName) throws (1: AuthorizationException aze);
   void failRequest(1: string id) throws (1: AuthorizationException aze);  
+  void failRequestV2(1: string id, 2: DRPCExecutionException e) throws (1: AuthorizationException aze);  
 }
 
 enum HBServerMessageType {

http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/test/clj/org/apache/storm/drpc_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/drpc_test.clj b/storm-core/test/clj/org/apache/storm/drpc_test.clj
index 7974e49..f485e6f 100644
--- a/storm-core/test/clj/org/apache/storm/drpc_test.clj
+++ b/storm-core/test/clj/org/apache/storm/drpc_test.clj
@@ -29,11 +29,9 @@
   (:import [org.apache.storm.generated DRPCExecutionException DRPCRequest])
   (:import [java.util.concurrent ConcurrentLinkedQueue])
   (:import [org.apache.storm Thrift])
-  (:import [org.apache.storm.daemon DrpcServer])
   (:import [org.mockito ArgumentCaptor Mockito Matchers])
   (:use [org.apache.storm config])
   (:use [org.apache.storm.internal clojure])
-  (:use [org.apache.storm.daemon drpc])
   (:use [conjure core]))
 
 (defbolt exclamation-bolt ["result" "return-info"] [tuple collector]
@@ -230,32 +228,6 @@
     (.shutdown drpc)
     ))
 
-(deftest test-dequeue-req-after-timeout
-  (let [queue (ConcurrentLinkedQueue.)
-        delay-seconds 2
-        conf {DRPC-REQUEST-TIMEOUT-SECS delay-seconds}
-        mock-cu (proxy [ConfigUtils] []
-                  (readStormConfigImpl [] conf))
-        drpc-handler (proxy [DrpcServer] [conf]
-                       (acquireQueue [function] queue))]
-    (with-open [_ (ConfigUtilsInstaller. mock-cu)]
-      (is (thrown? DRPCExecutionException
-            (.execute drpc-handler "ArbitraryDRPCFunctionName" "")))
-      (is (= 0 (.size queue))))))
-
-(deftest test-drpc-timeout-cleanup
-  (let [queue (ConcurrentLinkedQueue.)
-        delay-seconds 1
-        conf {DRPC-REQUEST-TIMEOUT-SECS delay-seconds}
-        mock-cu (proxy [ConfigUtils] []
-                  (readStormConfigImpl [] conf))
-        drpc-handler (proxy [DrpcServer] [conf]
-          (acquireQueue [function] queue)
-          (getTimeoutCheckSecs [] delay-seconds))]
-    (with-open [_ (ConfigUtilsInstaller. mock-cu)]
-      (is (thrown? DRPCExecutionException
-            (.execute drpc-handler "ArbitraryDRPCFunctionName" "no-args"))))))
-
 (deftest test-drpc-attempts-two-reconnects-in-fail-request
   (let [handler (Mockito/mock DRPCInvocationsClient 
                               (.extraInterfaces (Mockito/withSettings) (into-array Class [ILocalDRPC])))

http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/test/clj/org/apache/storm/security/auth/drpc_auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/security/auth/drpc_auth_test.clj b/storm-core/test/clj/org/apache/storm/security/auth/drpc_auth_test.clj
deleted file mode 100644
index 0c2ee2e..0000000
--- a/storm-core/test/clj/org/apache/storm/security/auth/drpc_auth_test.clj
+++ /dev/null
@@ -1,320 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.security.auth.drpc-auth-test
-  (:use [clojure test])
-  (:require [org.apache.storm.daemon [drpc :as drpc]])
-  (:import [org.apache.storm.generated AuthorizationException
-            DRPCExecutionException DistributedRPC$Processor
-            DistributedRPCInvocations$Processor]
-           [org.apache.storm.daemon DrpcServer])
-  (:import [org.apache.storm Config])
-  (:import [org.apache.storm Testing Testing$Condition])
-  (:import [org.apache.storm.security.auth ReqContext SingleUserPrincipal ThriftServer ThriftConnectionType])
-  (:import [org.apache.storm.utils DRPCClient ConfigUtils Time])
-  (:import [org.apache.storm.drpc DRPCInvocationsClient])
-  (:import [java.util.concurrent TimeUnit])
-  (:import [javax.security.auth Subject])
-  (:use [org.apache.storm util config log])
-  (:import [org.apache.storm.utils Utils]))
-
-(defmacro with-timeout
-  [millis unit & body]
-  `(let [f# (future ~@body)]
-     (try
-       (.get f# ~millis ~unit)
-       (finally (future-cancel f#)))))
-
-(def DRPC-TIMEOUT-SEC (* (/ Testing/TEST_TIMEOUT_MS 1000) 2))
-
-(defn launch-server [conf drpcAznClass transportPluginClass login-cfg client-port invocations-port]
-  (let [conf (if drpcAznClass (assoc conf DRPC-AUTHORIZER drpcAznClass) conf)
-        conf (if transportPluginClass (assoc conf STORM-THRIFT-TRANSPORT-PLUGIN transportPluginClass) conf)
-        conf (if login-cfg (assoc conf "java.security.auth.login.config" login-cfg) conf)
-        conf (assoc conf DRPC-PORT client-port)
-        conf (assoc conf DRPC-INVOCATIONS-PORT invocations-port)
-        service-handler (DrpcServer. conf)
-        handler-server (ThriftServer. conf
-                                      (DistributedRPC$Processor. service-handler)
-                                      ThriftConnectionType/DRPC)
-        invoke-server (ThriftServer. conf
-                                     (DistributedRPCInvocations$Processor. service-handler)
-                                     ThriftConnectionType/DRPC_INVOCATIONS)]
-    (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.stop handler-server) (.stop invoke-server))))
-    (log-message "storm conf:" conf)
-    (log-message "Starting DRPC invocation server ... " invocations-port)
-    (.start (Thread. #(.serve invoke-server)))
-    (Testing/whileTimeout (reify Testing$Condition (exec [this] (not (.isServing invoke-server)))) (fn [] (Time/sleep 100)))
-    (log-message "Starting DRPC handler server ... " client-port)
-    (.start (Thread. #(.serve handler-server)))
-    (Testing/whileTimeout (reify Testing$Condition (exec [this] (not (.isServing handler-server)))) (fn [] (Time/sleep 100)))
-    [handler-server invoke-server]))
-
-(defmacro with-server [args & body]
-  `(let [[handler-server# invoke-server#] (launch-server ~@args)]
-      ~@body
-      (log-message "Stopping DRPC servers ...")
-      (.stop handler-server#)
-      (.stop invoke-server#)
-      ))
-
-(deftest deny-drpc-test
-  (let [client-port (Utils/getAvailablePort)
-        invocations-port (Utils/getAvailablePort (int(inc client-port)))
-        storm-conf (clojurify-structure (ConfigUtils/readStormConfig))]
-    (with-server [storm-conf "org.apache.storm.security.auth.authorizer.DenyAuthorizer"
-                  nil nil client-port invocations-port]
-      (let [drpc (DRPCClient. storm-conf "localhost" client-port)
-            drpc_client (.getClient drpc)
-            invocations (DRPCInvocationsClient. storm-conf "localhost" invocations-port)
-            invocations_client (.getClient invocations)]
-        (is (thrown? AuthorizationException (.execute drpc_client "func-foo" "args-bar")))
-        (is (thrown? AuthorizationException (.fetchRequest invocations_client nil)))
-        (.close drpc)
-        (.close invocations)))))
-
-(deftest deny-drpc-digest-test
-  (let [client-port (Utils/getAvailablePort)
-        invocations-port (Utils/getAvailablePort (int (inc client-port)))
-        storm-conf (clojurify-structure (ConfigUtils/readStormConfig))]
-    (with-server [storm-conf "org.apache.storm.security.auth.authorizer.DenyAuthorizer"
-                  "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
-                  "test/clj/org/apache/storm/security/auth/jaas_digest.conf"
-                  client-port invocations-port]
-      (let [conf (merge storm-conf {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
-                             "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest.conf"})
-            drpc (DRPCClient. conf "localhost" client-port)
-            drpc_client (.getClient drpc)
-            invocations (DRPCInvocationsClient. conf "localhost" invocations-port)
-            invocations_client (.getClient invocations)]
-        (is (thrown? AuthorizationException (.execute drpc_client "func-foo" "args-bar")))
-        (is (thrown? AuthorizationException (.fetchRequest invocations_client nil)))
-        (.close drpc)
-        (.close invocations)))))
-
-(defmacro with-simple-drpc-test-scenario
-  [[strict? alice-client bob-client charlie-client alice-invok charlie-invok] & body]
-  `(let [client-port# (Utils/getAvailablePort)
-         invocations-port# (Utils/getAvailablePort (int (inc client-port#)))
-         storm-conf# (merge (clojurify-structure (ConfigUtils/readStormConfig))
-                          {DRPC-AUTHORIZER-ACL-STRICT ~strict?
-                           DRPC-AUTHORIZER-ACL-FILENAME "drpc-simple-acl-test-scenario.yaml"
-                           STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"})]
-    (with-server [storm-conf#
-                   "org.apache.storm.security.auth.authorizer.DRPCSimpleACLAuthorizer"
-                   "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
-                   "test/clj/org/apache/storm/security/auth/drpc-auth-server.jaas"
-                   client-port# invocations-port#]
-       (let [~alice-client (DRPCClient.
-                           (merge storm-conf# {"java.security.auth.login.config"
-                                              "test/clj/org/apache/storm/security/auth/drpc-auth-alice.jaas"})
-                           "localhost"
-                           client-port#)
-             ~bob-client (DRPCClient.
-                         (merge storm-conf# {"java.security.auth.login.config"
-                                            "test/clj/org/apache/storm/security/auth/drpc-auth-bob.jaas"})
-                         "localhost"
-                         client-port#)
-             ~charlie-client (DRPCClient.
-                               (merge storm-conf# {"java.security.auth.login.config"
-                                                  "test/clj/org/apache/storm/security/auth/drpc-auth-charlie.jaas"})
-                               "localhost"
-                               client-port#)
-             ~alice-invok (DRPCInvocationsClient.
-                            (merge storm-conf# {"java.security.auth.login.config"
-                                               "test/clj/org/apache/storm/security/auth/drpc-auth-alice.jaas"})
-                            "localhost"
-                            invocations-port#)
-             ~charlie-invok (DRPCInvocationsClient.
-                             (merge storm-conf# {"java.security.auth.login.config"
-                                                "test/clj/org/apache/storm/security/auth/drpc-auth-charlie.jaas"})
-                             "localhost"
-                             invocations-port#)]
-         (try
-           ~@body
-           (finally
-             (.close ~alice-client)
-             (.close ~bob-client)
-             (.close ~charlie-client)
-             (.close ~alice-invok)
-             (.close ~charlie-invok)))))))
-
-(deftest drpc-per-function-auth-strict-test
-  (with-simple-drpc-test-scenario [true alice-client bob-client charlie-client alice-invok charlie-invok]
-    (let [drpc-timeout-seconds DRPC-TIMEOUT-SEC]
-      (testing "Permitted user can execute a function in the ACL"
-        (let [func "jump"
-              exec-ftr (future (.execute alice-client func "some args"))
-              id (atom "")
-              expected "Authorized DRPC"]
-          (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
-            (while (empty? @id)
-              (reset! id
-                      (-> charlie-invok (.fetchRequest func) .get_request_id)))
-          (.result charlie-invok @id expected)
-          (is (= expected (.get exec-ftr drpc-timeout-seconds TimeUnit/SECONDS))))))
-
-      (testing "execute fails when function is not in ACL"
-        (is (thrown-cause? AuthorizationException
-          (.execute alice-client "jog" "some args"))))
-
-      (testing "fetchRequest fails when function is not in ACL"
-        (is (thrown-cause? AuthorizationException
-          (.fetchRequest charlie-invok "jog"))))
-
-      (testing "authorized user can fail a request"
-        (let [func "jump"
-              exec-ftr (future (.execute alice-client func "some args"))
-              id (atom "")]
-          (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
-            (while (empty? @id)
-              (reset! id
-                      (-> charlie-invok (.fetchRequest func) .get_request_id)))
-          (.failRequest charlie-invok @id)
-          (is (thrown-cause? DRPCExecutionException
-                             (.get exec-ftr drpc-timeout-seconds TimeUnit/SECONDS))))))
-
-      (testing "unauthorized invocation user is denied returning a result"
-        (let [func "jump"
-              exec-ftr (future (.execute bob-client func "some args"))
-              id (atom "")
-              expected "Only Authorized User can populate the result"]
-          (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
-            (while (empty? @id)
-              (reset! id
-                      (-> charlie-invok (.fetchRequest func) .get_request_id)))
-          (is (thrown-cause? AuthorizationException
-            (.result alice-invok @id "not the expected result")))
-          (.result charlie-invok @id expected)
-          (is (= expected (.get exec-ftr drpc-timeout-seconds TimeUnit/SECONDS))))))
-
-      (testing "unauthorized invocation user is denied failing a request"
-        (let [func "jump"
-              exec-ftr (future (.execute alice-client func "some args"))
-              id (atom "")]
-          (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
-            (while (empty? @id)
-              (reset! id
-                      (-> charlie-invok (.fetchRequest func) .get_request_id)))
-          (is (thrown-cause? AuthorizationException
-            (.failRequest alice-invok @id)))
-          (.failRequest charlie-invok @id))))
-
-      (testing "unauthorized invocation user is denied fetching a request"
-        (let [func "jump"
-              exec-ftr (future (.execute bob-client func "some args"))
-              id (atom "")
-              expected "Only authorized users can fetchRequest"]
-          (Thread/sleep 1000)
-          (is (thrown-cause? AuthorizationException
-            (-> alice-invok (.fetchRequest func) .get_request_id)))
-          (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
-            (while (empty? @id)
-              (reset! id
-                      (-> charlie-invok (.fetchRequest func) .get_request_id)))
-          (.result charlie-invok @id expected)
-          (is (= expected (.get exec-ftr drpc-timeout-seconds TimeUnit/SECONDS)))))))))
-
-(deftest drpc-per-function-auth-non-strict-test
-  (with-simple-drpc-test-scenario [false alice-client bob-client charlie-client alice-invok charlie-invok]
-    (let [drpc-timeout-seconds DRPC-TIMEOUT-SEC]
-      (testing "Permitted user can execute a function in the ACL"
-        (let [func "jump"
-              exec-ftr (future (.execute alice-client func "some args"))
-              id (atom "")
-              expected "Authorized DRPC"]
-          (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
-            (while (empty? @id)
-              (reset! id
-                      (-> charlie-invok (.fetchRequest func) .get_request_id)))
-          (.result charlie-invok @id expected)
-          (is (= expected (.get exec-ftr drpc-timeout-seconds TimeUnit/SECONDS))))))
-
-      (testing "DRPC succeeds for anyone when function is not in ACL"
-        (let [func "jog"
-              exec-ftr (future (.execute charlie-client func "some args"))
-              id (atom "")
-              expected "Permissive/No ACL Entry"]
-          (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
-            (while (empty? @id)
-              (reset! id
-                      (-> alice-invok (.fetchRequest func) .get_request_id)))
-          (.result alice-invok @id expected)
-          (is (= expected (.get exec-ftr drpc-timeout-seconds TimeUnit/SECONDS))))))
-
-      (testing "failure of a request is allowed when function is not in ACL"
-        (let [func "jog"
-              exec-ftr (future (.execute charlie-client func "some args"))
-              id (atom "")]
-          (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
-            (while (empty? @id)
-              (reset! id
-                      (-> alice-invok (.fetchRequest func) .get_request_id)))
-          (.failRequest alice-invok @id)
-          (is (thrown-cause? DRPCExecutionException
-                             (.get exec-ftr drpc-timeout-seconds TimeUnit/SECONDS))))))
-
-      (testing "authorized user can fail a request"
-        (let [func "jump"
-              exec-ftr (future (.execute alice-client func "some args"))
-              id (atom "")]
-          (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
-            (while (empty? @id)
-              (reset! id
-                      (-> charlie-invok (.fetchRequest func) .get_request_id)))
-          (.failRequest charlie-invok @id)
-          (is (thrown-cause? DRPCExecutionException
-                             (.get exec-ftr drpc-timeout-seconds TimeUnit/SECONDS))))))
-
-      (testing "unauthorized invocation user is denied returning a result"
-        (let [func "jump"
-              exec-ftr (future (.execute bob-client func "some args"))
-              id (atom "")
-              expected "Only Authorized User can populate the result"]
-          (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
-            (while (empty? @id)
-              (reset! id
-                      (-> charlie-invok (.fetchRequest func) .get_request_id)))
-          (is (thrown-cause? AuthorizationException
-            (.result alice-invok @id "not the expected result")))
-          (.result charlie-invok @id expected)
-          (is (= expected (.get exec-ftr drpc-timeout-seconds TimeUnit/SECONDS))))))
-
-      (testing "unauthorized invocation user is denied failing a request"
-        (let [func "jump"
-              exec-ftr (future (.execute alice-client func "some args"))
-              id (atom "")]
-          (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
-            (while (empty? @id)
-              (reset! id
-                      (-> charlie-invok (.fetchRequest func) .get_request_id)))
-          (is (thrown-cause? AuthorizationException
-            (.failRequest alice-invok @id)))
-          (.failRequest charlie-invok @id))))
-
-      (testing "unauthorized invocation user is denied fetching a request"
-        (let [func "jump"
-              exec-ftr (future (.execute bob-client func "some args"))
-              id (atom "")
-              expected "Only authorized users can fetchRequest"]
-          (Thread/sleep 1000)
-          (is (thrown-cause? AuthorizationException
-            (-> alice-invok (.fetchRequest func) .get_request_id)))
-          (with-timeout drpc-timeout-seconds TimeUnit/SECONDS
-            (while (empty? @id)
-              (reset! id
-                      (-> charlie-invok (.fetchRequest func) .get_request_id)))
-          (.result charlie-invok @id expected)
-          (is (= expected (.get exec-ftr drpc-timeout-seconds TimeUnit/SECONDS)))))))))

http://git-wip-us.apache.org/repos/asf/storm/blob/ea44062f/storm-core/test/jvm/org/apache/storm/daemon/drpc/DRPCServerTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/daemon/drpc/DRPCServerTest.java b/storm-core/test/jvm/org/apache/storm/daemon/drpc/DRPCServerTest.java
new file mode 100644
index 0000000..b2df441
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/daemon/drpc/DRPCServerTest.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.drpc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import java.io.InputStream;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.storm.Config;
+import org.apache.storm.daemon.drpc.DRPCServer;
+import org.apache.storm.drpc.DRPCInvocationsClient;
+import org.apache.storm.generated.DRPCExecutionException;
+import org.apache.storm.generated.DRPCRequest;
+import org.apache.storm.security.auth.SimpleTransportPlugin;
+import org.apache.storm.utils.DRPCClient;
+import org.apache.storm.utils.Utils;
+import org.junit.AfterClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class DRPCServerTest {
+    private static final Logger LOG = LoggerFactory.getLogger(DRPCServerTest.class);
+    private static final ExecutorService exec = Executors.newCachedThreadPool();
+    
+    @AfterClass
+    public static void close() {
+        exec.shutdownNow();
+    }
+    
+    private static DRPCRequest getNextAvailableRequest(DRPCInvocationsClient invoke, String func) throws Exception {
+        DRPCRequest request = null;
+        long timedout = System.currentTimeMillis() + 5_000;
+        while (System.currentTimeMillis() < timedout) {
+            request = invoke.getClient().fetchRequest(func);
+            if (request != null && request.get_request_id() != null && !request.get_request_id().isEmpty()) {
+                return request;
+            }
+            Thread.sleep(1);
+        }
+        fail("Test timed out waiting for a request on " + func);
+        return request;
+    }
+    
+    private Map<String, Object> getConf(int drpcPort, int invocationsPort, Integer httpPort) {
+        Map<String, Object> conf = new HashMap<>();
+        conf.put(Config.DRPC_PORT, drpcPort);
+        conf.put(Config.DRPC_INVOCATIONS_PORT, invocationsPort);
+        conf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, SimpleTransportPlugin.class.getName());
+        conf.put(Config.DRPC_WORKER_THREADS, 5);
+        conf.put(Config.DRPC_INVOCATIONS_THREADS, 5);
+        conf.put(Config.DRPC_MAX_BUFFER_SIZE, 1048576);
+        conf.put(Config.STORM_NIMBUS_RETRY_TIMES, 2);
+        conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL, 10);
+        conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING, 100);
+        if (httpPort != null) {
+            conf.put(Config.DRPC_HTTP_PORT, httpPort);
+        }
+        return conf;
+    }
+    
+    @Test
+    public void testGoodThrift() throws Exception {
+        int drpcPort = Utils.getAvailablePort();
+        int invocationsPort = Utils.getAvailablePort(drpcPort + 1);
+        Map<String, Object> conf = getConf(drpcPort, invocationsPort, null);
+        try (DRPCServer server = new DRPCServer(conf)) {
+            exec.submit(() -> {
+                server.start();
+                return null;
+            });
+            try (DRPCClient client = new DRPCClient(conf, "localhost", drpcPort);
+                 DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost", invocationsPort)) {
+                Future<String> found = exec.submit(() -> client.getClient().execute("testing", "test"));
+                DRPCRequest request = getNextAvailableRequest(invoke, "testing");
+                assertNotNull(request);
+                assertEquals("test", request.get_func_args());
+                assertNotNull(request.get_request_id());
+                invoke.result(request.get_request_id(), "tested");
+                String result = found.get(1000, TimeUnit.MILLISECONDS);
+                assertEquals("tested", result);
+            }
+        }
+    }
+    
+    @Test
+    public void testFailedThrift() throws Exception {
+        int drpcPort = Utils.getAvailablePort();
+        int invocationsPort = Utils.getAvailablePort(drpcPort + 1);
+        Map<String, Object> conf = getConf(drpcPort, invocationsPort, null);
+        try (DRPCServer server = new DRPCServer(conf)) {
+            exec.submit(() -> {
+                server.start();
+                return null;
+            });
+            try (DRPCClient client = new DRPCClient(conf, "localhost", drpcPort);
+                    DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost", invocationsPort)) {
+                Future<String> found = exec.submit(() -> client.getClient().execute("testing", "test"));
+                DRPCRequest request = getNextAvailableRequest(invoke, "testing");
+                assertNotNull(request);
+                assertEquals("test", request.get_func_args());
+                assertNotNull(request.get_request_id());
+                invoke.failRequest(request.get_request_id());
+                try {
+                    found.get(1000, TimeUnit.MILLISECONDS);
+                    fail("exec did not throw an exception");
+                } catch (ExecutionException e) {
+                    Throwable t = e.getCause();
+                    assertEquals(t.getClass(), DRPCExecutionException.class);
+                    //Don't know a better way to validate that it failed.
+                    assertEquals("Request failed", ((DRPCExecutionException)t).get_msg());
+                }
+            }
+        }
+    }
+    
+    public static String GET(int port, String func, String args) {
+        try {
+            URL url = new URL("http://localhost:"+port+"/drpc/"+func+"/"+args);
+            InputStream in = url.openStream();
+            byte[] buffer = new byte[1024];
+            int read = in.read(buffer);
+            return new String(buffer, 0, read);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+    
+    @Test
+    public void testGoodHttpGet() throws Exception {
+        LOG.info("STARTING HTTP GET TEST...");
+        int drpcPort = Utils.getAvailablePort();
+        int invocationsPort = Utils.getAvailablePort(drpcPort + 1);
+        int httpPort = Utils.getAvailablePort(invocationsPort + 1);
+        Map<String, Object> conf = getConf(drpcPort, invocationsPort, httpPort);
+        try (DRPCServer server = new DRPCServer(conf)) {
+            exec.submit(() -> {
+                server.start();
+                return null;
+            });
+            //TODO need a better way to do this
+            Thread.sleep(2000);
+            try (DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost", invocationsPort)) {
+                Future<String> found = exec.submit(() -> GET(httpPort, "testing", "test"));
+                DRPCRequest request = getNextAvailableRequest(invoke, "testing");
+                assertNotNull(request);
+                assertEquals("test", request.get_func_args());
+                assertNotNull(request.get_request_id());
+                invoke.result(request.get_request_id(), "tested");
+                String result = found.get(1000, TimeUnit.MILLISECONDS);
+                assertEquals("tested", result);
+            }
+        }
+    }
+    
+    @Test
+    public void testFailedHttpGet() throws Exception {
+        LOG.info("STARTING HTTP GET (FAIL) TEST...");
+        int drpcPort = Utils.getAvailablePort();
+        int invocationsPort = Utils.getAvailablePort(drpcPort + 1);
+        int httpPort = Utils.getAvailablePort(invocationsPort + 1);
+        Map<String, Object> conf = getConf(drpcPort, invocationsPort, httpPort);
+        try (DRPCServer server = new DRPCServer(conf)) {
+            exec.submit(() -> {
+                server.start();
+                return null;
+            });
+            //TODO need a better way to do this
+            Thread.sleep(2000);
+            try (DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost", invocationsPort)) {
+                Future<String> found = exec.submit(() -> GET(httpPort, "testing", "test"));
+                DRPCRequest request = getNextAvailableRequest(invoke, "testing");
+                assertNotNull(request);
+                assertEquals("test", request.get_func_args());
+                assertNotNull(request.get_request_id());
+                invoke.getClient().failRequest(request.get_request_id());
+                try {
+                    found.get(1000, TimeUnit.MILLISECONDS);
+                    fail("exec did not throw an exception");
+                } catch (ExecutionException e) {
+                    LOG.warn("Got Expected Exception", e);
+                    //Getting the exact response code is a bit more complex.
+                    //TODO should use a better client
+                }
+            }
+        }
+    }
+}