You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2009/05/20 23:15:33 UTC

svn commit: r776856 - in /qpid/trunk/qpid/ruby: ext/sasl/sasl.c lib/qpid/connection.rb lib/qpid/delegates.rb lib/qpid/framer.rb lib/qpid/qmf.rb lib/qpid/session.rb tests/qmf.rb

Author: tross
Date: Wed May 20 21:15:32 2009
New Revision: 776856

URL: http://svn.apache.org/viewvc?rev=776856&view=rev
Log:
Numerous fixes in the qmf console for Ruby
Added proper connection close if the SASL security context expires.
Added :timeout and :async features to qmf console.

Modified:
    qpid/trunk/qpid/ruby/ext/sasl/sasl.c
    qpid/trunk/qpid/ruby/lib/qpid/connection.rb
    qpid/trunk/qpid/ruby/lib/qpid/delegates.rb
    qpid/trunk/qpid/ruby/lib/qpid/framer.rb
    qpid/trunk/qpid/ruby/lib/qpid/qmf.rb
    qpid/trunk/qpid/ruby/lib/qpid/session.rb
    qpid/trunk/qpid/ruby/tests/qmf.rb

Modified: qpid/trunk/qpid/ruby/ext/sasl/sasl.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/ruby/ext/sasl/sasl.c?rev=776856&r1=776855&r2=776856&view=diff
==============================================================================
--- qpid/trunk/qpid/ruby/ext/sasl/sasl.c (original)
+++ qpid/trunk/qpid/ruby/ext/sasl/sasl.c Wed May 20 21:15:32 2009
@@ -41,6 +41,7 @@
     sasl_callback_t callbacks[8];
     char* userName;
     char* password;
+    char* operUserName;
     unsigned int minSsf;
     unsigned int maxSsf;
     char mechanism[MECH_SIZE];
@@ -280,6 +281,8 @@
         free(context->userName);
     if (context->password)
         free(context->password);
+    if (context->operUserName)
+        free(context->operUserName);
     free(context);
 
     return Qnil;
@@ -294,10 +297,12 @@
     char* mechList;
     char* mechToUse;
     int result;
+    int propResult;
     const char* response;
     unsigned int len;
     sasl_interact_t* interact = 0;
     const char* chosen;
+    const char* operName;
 
     if (argc == 2) {
         context = (context_t*) argv[0];
@@ -322,6 +327,14 @@
         rb_raise(rb_eRuntimeError, "sasl_client_start failed: %d - %s",
                  result, sasl_errdetail(context->conn));
 
+    if (result == SASL_OK) {
+        propResult = sasl_getprop(context->conn, SASL_USERNAME, (const void**) &operName);
+        if (propResult == SASL_OK) {
+            context->operUserName = (char*) malloc(strlen(operName) + 1);
+            strcpy(context->operUserName, operName);
+        }
+    }
+
     return rb_ary_new3(3, INT2NUM(result), rb_str_new(response, len), rb_str_new2(chosen));
 }
 
@@ -333,7 +346,9 @@
     context_t* context;
     VALUE challenge;
     int result;
+    int propResult;
     const char* response;
+    const char* operName;
     unsigned int len;
     sasl_interact_t* interact = 0;
 
@@ -356,9 +371,33 @@
     if (result != SASL_OK && result != SASL_CONTINUE)
         return QSASL_FAILED;
 
+    if (result == SASL_OK) {
+        propResult = sasl_getprop(context->conn, SASL_USERNAME, (const void**) &operName);
+        if (propResult == SASL_OK) {
+            context->operUserName = (char*) malloc(strlen(operName) + 1);
+            strcpy(context->operUserName, operName);
+        }
+    }
+
     return rb_ary_new3(2, INT2NUM(result), rb_str_new(response, len));
 }
 
+static VALUE qsasl_user_id(int argc, VALUE *argv, VALUE obj)
+{
+    context_t* context;
+
+    if (argc == 1) {
+        context = (context_t*) argv[0];
+    } else {
+        rb_raise(rb_eRuntimeError, "Wrong Number of Arguments");
+    }
+
+    if (context->operUserName)
+        return rb_str_new2(context->operUserName);
+
+    return Qnil;
+}
+
 //
 // Encode transport data for the security layer.
 //
@@ -427,6 +466,7 @@
     rb_define_module_function(mSasl, "free", qsasl_free, -1);
     rb_define_module_function(mSasl, "client_start", qsasl_client_start, -1);
     rb_define_module_function(mSasl, "client_step", qsasl_client_step, -1);
+    rb_define_module_function(mSasl, "user_id", qsasl_user_id, -1);
     rb_define_module_function(mSasl, "encode", qsasl_encode, -1);
     rb_define_module_function(mSasl, "decode", qsasl_decode, -1);
 }

Modified: qpid/trunk/qpid/ruby/lib/qpid/connection.rb
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/ruby/lib/qpid/connection.rb?rev=776856&r1=776855&r2=776856&view=diff
==============================================================================
--- qpid/trunk/qpid/ruby/lib/qpid/connection.rb (original)
+++ qpid/trunk/qpid/ruby/lib/qpid/connection.rb Wed May 20 21:15:32 2009
@@ -36,7 +36,7 @@
     include MonitorMixin
 
     attr_reader :spec, :attached, :sessions, :thread
-    attr_accessor :opened, :failed, :close_code
+    attr_accessor :opened, :failed, :close_code, :user_id
 
     def initialize(sock, args={})
       super(sock)
@@ -58,6 +58,7 @@
       @thread = nil
 
       @channel_max = 65535
+      @user_id = nil
 
       @delegate = delegate.call(self, args)
     end

Modified: qpid/trunk/qpid/ruby/lib/qpid/delegates.rb
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/ruby/lib/qpid/delegates.rb?rev=776856&r1=776855&r2=776856&view=diff
==============================================================================
--- qpid/trunk/qpid/ruby/lib/qpid/delegates.rb (original)
+++ qpid/trunk/qpid/ruby/lib/qpid/delegates.rb Wed May 20 21:15:32 2009
@@ -202,6 +202,7 @@
         end
         begin
           resp = Sasl.client_start(@saslConn, mech_list)
+          @connection.user_id = Sasl.user_id(@saslConn)
           ch.connection_start_ok(:client_properties => PROPERTIES,
                                  :mechanism => resp[2],
                                  :response => resp[1])
@@ -214,6 +215,7 @@
 
       def connection_secure(ch, secure)
         resp = Sasl.client_step(@saslConn, secure.challenge)
+        @connection.user_id = Sasl.user_id(@saslConn)
         ch.connection_secure_ok(:response => resp[1])
       end
 

Modified: qpid/trunk/qpid/ruby/lib/qpid/framer.rb
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/ruby/lib/qpid/framer.rb?rev=776856&r1=776855&r2=776856&view=diff
==============================================================================
--- qpid/trunk/qpid/ruby/lib/qpid/framer.rb (original)
+++ qpid/trunk/qpid/ruby/lib/qpid/framer.rb Wed May 20 21:15:32 2009
@@ -137,6 +137,8 @@
         @tx_buf = ""
         frm.debug("FLUSHED") if frm
       end
+    rescue
+      @sock.close unless @sock.closed?
     end
 
     def _write(buf)

Modified: qpid/trunk/qpid/ruby/lib/qpid/qmf.rb
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/ruby/lib/qpid/qmf.rb?rev=776856&r1=776855&r2=776856&view=diff
==============================================================================
--- qpid/trunk/qpid/ruby/lib/qpid/qmf.rb (original)
+++ qpid/trunk/qpid/ruby/lib/qpid/qmf.rb Wed May 20 21:15:32 2009
@@ -252,7 +252,7 @@
         args = { :exchange => "qpid.management",
           :queue => broker.topic_name,
           :binding_key => "console.obj.*.*.#{package_name}.#" }
-        broker.amqpSession.exchange_bind(args)
+        broker.amqp_session.exchange_bind(args)
       end
     end
 
@@ -264,7 +264,7 @@
         args = { :exchange => "qpid.management",
           :queue => broker.topic_name,
           :binding_key=> "console.obj.*.*.#{package_name}.#{class_name}.#" }
-        broker.amqpSession.exchange_bind(args)
+        broker.amqp_session.exchange_bind(args)
       end
     end
 
@@ -277,7 +277,7 @@
         args = { :exchange => "qpid.management",
           :queue => broker.topic_name,
           :binding_key => "console.obj.*.*.#{pname}.#{cname}.#" }
-        broker.amqpSession.exchange_bind(args)
+        broker.amqp_session.exchange_bind(args)
       end
     end
 
@@ -319,7 +319,7 @@
     # The default timeout for this synchronous operation is 60 seconds.  To change the timeout,
     # use the following argument:
     #
-    # :_timeout = <time in seconds>
+    # :timeout = <time in seconds>
     #
     # If additional arguments are supplied, they are used as property
     # selectors, as long as their keys are strings.  For example, if
@@ -340,19 +340,21 @@
         unless broker_list.include?(agent.broker)
           raise ArgumentError, "Supplied agent is not accessible through the supplied broker"
         end
-        agent_list << agent
+        agent_list << agent if agent.broker.connected?
       else
         if kwargs.include?(:object_id)
           oid = kwargs[:object_id]
           broker_list.each { |broker|
             broker.agents.each { |agent|
               if oid.broker_bank == agent.broker_bank && oid.agent_bank == agent.agent_bank
-                agent_list << agent
+                agent_list << agent if agent.broker.connected?
               end
             }
           }
         else
-          broker_list.each { |broker| agent_list += broker.agents }
+          broker_list.each { |broker|
+            agent_list += broker.agents if broker.connected?
+          }
         end
       end
 
@@ -400,8 +402,8 @@
       end
 
       timeout = false
-      if kwargs.include?(:_timeout)
-        wait_time = kwargs[:_timeout]
+      if kwargs.include?(:timeout)
+        wait_time = kwargs[:timeout]
       else
         wait_time = DEFAULT_GET_WAIT_TIME
       end
@@ -616,8 +618,8 @@
     def handle_broker_disconnect(broker); end
 
     def handle_error(error)
-      @error = error
       synchronize do
+        @error = error if @sync_sequence_list.length > 0
         @sync_sequence_list = []
         @cv.signal
       end
@@ -953,6 +955,8 @@
 
   class Object
 
+    DEFAULT_METHOD_WAIT_TIME = 60
+
     attr_reader :object_id, :schema, :properties, :statistics,
     :current_time, :create_time, :delete_time, :broker
 
@@ -1110,12 +1114,14 @@
       timeout = nil
 
       if kwargs.class == Hash
-        if kwargs.include?(:_timeout)
-          timeout = kwargs[:_timeout]
+        if kwargs.include?(:timeout)
+          timeout = kwargs[:timeout]
+        else
+          timeout = DEFAULT_METHOD_WAIT_TIME
         end
 
-        if kwargs.include?(:_async)
-          sync = !kwargs[:_async]
+        if kwargs.include?(:async)
+          sync = !kwargs[:async]
         end
         args.pop
       end
@@ -1212,7 +1218,7 @@
               end
             end
 
-          rescue Qpid::Session::Closed, Qpid::Session::Detached, SystemCallError
+          rescue
             delay *= DELAY_FACTOR if delay < DELAY_MAX
           end
 
@@ -1247,6 +1253,7 @@
   class Broker
 
     SYNC_TIME = 60
+    @@next_seq = 1
 
     include MonitorMixin
 
@@ -1267,20 +1274,17 @@
       @port     = port
       @auth_name = auth_name
       @auth_pass = auth_pass
+      @user_id = nil
       @auth_mechanism = kwargs[:mechanism]
       @auth_service = kwargs[:service]
       @broker_bank = 1
-      @agents   = {}
-      @agents["1.0"] = Agent.new(self, 0, "BrokerAgent")
       @topic_bound = false
       @cv = new_cond
-      @sync_in_flight = false
-      @sync_request = 0
-      @sync_result = nil
-      @reqs_outstanding = 1
-      @error     = nil
-      @broker_id  = nil
+      @error = nil
+      @broker_id = nil
       @is_connected = false
+      @amqp_session_id = "%s.%d.%d" % [Socket.gethostname, Process::pid, @@next_seq]
+      @@next_seq += 1
       @conn = nil
       if @session.managedConnections?
         @thread = ManagedConnection.new(self)
@@ -1326,6 +1330,7 @@
 
     def wait_for_stable
       synchronize do
+        return unless connected?
         return if @reqs_outstanding == 0
         @sync_in_flight = true
         unless @cv.wait_for(SYNC_TIME) { @reqs_outstanding == 0 }
@@ -1350,6 +1355,7 @@
       mp = @amqp_session.message_properties
       mp.content_type = "x-application/qmf"
       mp.reply_to = amqp_session.reply_to("amq.direct", @reply_name)
+      #mp.user_id = @user_id if @user_id
       return Qpid::Message.new(dp, mp, body)
     end
 
@@ -1422,8 +1428,14 @@
     end
 
     def try_to_connect
-      #begin
-      @amqp_session_id = "%s.%d" % [Socket.gethostname, Process::pid]
+      @agents = {}
+      @agents["1.0"] = Agent.new(self, 0, "BrokerAgent")
+      @topic_bound = false
+      @sync_in_flight = false
+      @sync_request = 0
+      @sync_result = nil
+      @reqs_outstanding = 1
+
       # FIXME: Need sth for Qpid::Util::connect
 
       @conn = Qpid::Connection.new(TCPSocket.new(@host, @port),
@@ -1433,6 +1445,7 @@
                                    :host => @host,
                                    :service => @auth_service)
       @conn.start
+      @user_id = @conn.user_id
       @reply_name = "reply-%s" % amqp_session_id
       @amqp_session = @conn.session(@amqp_session_id)
       @amqp_session.auto_sync = true

Modified: qpid/trunk/qpid/ruby/lib/qpid/session.rb
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/ruby/lib/qpid/session.rb?rev=776856&r1=776855&r2=776856&view=diff
==============================================================================
--- qpid/trunk/qpid/ruby/lib/qpid/session.rb (original)
+++ qpid/trunk/qpid/ruby/lib/qpid/session.rb Wed May 20 21:15:32 2009
@@ -114,7 +114,7 @@
         end
       end
       if error?
-        raise Qpid::Session::Exception, exceptions
+        raise Qpid::Session::Exception, @exceptions
       end
     end
 
@@ -436,7 +436,7 @@
       end
 
       def execution_exception(ex)
-        @session.exceptions.append(ex)
+        @session.exceptions << ex
       end
     end
 

Modified: qpid/trunk/qpid/ruby/tests/qmf.rb
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/ruby/tests/qmf.rb?rev=776856&r1=776855&r2=776856&view=diff
==============================================================================
--- qpid/trunk/qpid/ruby/tests/qmf.rb (original)
+++ qpid/trunk/qpid/ruby/tests/qmf.rb Wed May 20 21:15:32 2009
@@ -44,7 +44,7 @@
       @count = count
       for idx in 0...count
         synchronize do
-          seq = broker.echo(idx, "Echo Message", :_async => true)
+          seq = broker.echo(idx, "Echo Message", :async => true)
           @xmt_list[seq] = idx
         end
       end
@@ -109,7 +109,7 @@
     start_qmf
     body = "Echo Message Body"
     for seq in 1..10
-      res = @broker.echo(seq, body, :_timeout => 10)
+      res = @broker.echo(seq, body, :timeout => 10)
       assert_equal(0, res.status)
       assert_equal("OK", res.text)
       assert_equal(seq, res.sequence)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org