You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2009/05/20 23:15:33 UTC
svn commit: r776856 - in /qpid/trunk/qpid/ruby: ext/sasl/sasl.c
lib/qpid/connection.rb lib/qpid/delegates.rb lib/qpid/framer.rb
lib/qpid/qmf.rb lib/qpid/session.rb tests/qmf.rb
Author: tross
Date: Wed May 20 21:15:32 2009
New Revision: 776856
URL: http://svn.apache.org/viewvc?rev=776856&view=rev
Log:
Numerous fixes in the qmf console for Ruby
Added proper connection close if the SASL security context expires.
Added :timeout and :async features to qmf console.
Modified:
qpid/trunk/qpid/ruby/ext/sasl/sasl.c
qpid/trunk/qpid/ruby/lib/qpid/connection.rb
qpid/trunk/qpid/ruby/lib/qpid/delegates.rb
qpid/trunk/qpid/ruby/lib/qpid/framer.rb
qpid/trunk/qpid/ruby/lib/qpid/qmf.rb
qpid/trunk/qpid/ruby/lib/qpid/session.rb
qpid/trunk/qpid/ruby/tests/qmf.rb
Modified: qpid/trunk/qpid/ruby/ext/sasl/sasl.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/ruby/ext/sasl/sasl.c?rev=776856&r1=776855&r2=776856&view=diff
==============================================================================
--- qpid/trunk/qpid/ruby/ext/sasl/sasl.c (original)
+++ qpid/trunk/qpid/ruby/ext/sasl/sasl.c Wed May 20 21:15:32 2009
@@ -41,6 +41,7 @@
sasl_callback_t callbacks[8];
char* userName;
char* password;
+ char* operUserName;
unsigned int minSsf;
unsigned int maxSsf;
char mechanism[MECH_SIZE];
@@ -280,6 +281,8 @@
free(context->userName);
if (context->password)
free(context->password);
+ if (context->operUserName)
+ free(context->operUserName);
free(context);
return Qnil;
@@ -294,10 +297,12 @@
char* mechList;
char* mechToUse;
int result;
+ int propResult;
const char* response;
unsigned int len;
sasl_interact_t* interact = 0;
const char* chosen;
+ const char* operName;
if (argc == 2) {
context = (context_t*) argv[0];
@@ -322,6 +327,14 @@
rb_raise(rb_eRuntimeError, "sasl_client_start failed: %d - %s",
result, sasl_errdetail(context->conn));
+ if (result == SASL_OK) {
+ propResult = sasl_getprop(context->conn, SASL_USERNAME, (const void**) &operName);
+ if (propResult == SASL_OK) {
+ context->operUserName = (char*) malloc(strlen(operName) + 1);
+ strcpy(context->operUserName, operName);
+ }
+ }
+
return rb_ary_new3(3, INT2NUM(result), rb_str_new(response, len), rb_str_new2(chosen));
}
@@ -333,7 +346,9 @@
context_t* context;
VALUE challenge;
int result;
+ int propResult;
const char* response;
+ const char* operName;
unsigned int len;
sasl_interact_t* interact = 0;
@@ -356,9 +371,33 @@
if (result != SASL_OK && result != SASL_CONTINUE)
return QSASL_FAILED;
+ if (result == SASL_OK) {
+ propResult = sasl_getprop(context->conn, SASL_USERNAME, (const void**) &operName);
+ if (propResult == SASL_OK) {
+ context->operUserName = (char*) malloc(strlen(operName) + 1);
+ strcpy(context->operUserName, operName);
+ }
+ }
+
return rb_ary_new3(2, INT2NUM(result), rb_str_new(response, len));
}
+static VALUE qsasl_user_id(int argc, VALUE *argv, VALUE obj)
+{
+ context_t* context;
+
+ if (argc == 1) {
+ context = (context_t*) argv[0];
+ } else {
+ rb_raise(rb_eRuntimeError, "Wrong Number of Arguments");
+ }
+
+ if (context->operUserName)
+ return rb_str_new2(context->operUserName);
+
+ return Qnil;
+}
+
//
// Encode transport data for the security layer.
//
@@ -427,6 +466,7 @@
rb_define_module_function(mSasl, "free", qsasl_free, -1);
rb_define_module_function(mSasl, "client_start", qsasl_client_start, -1);
rb_define_module_function(mSasl, "client_step", qsasl_client_step, -1);
+ rb_define_module_function(mSasl, "user_id", qsasl_user_id, -1);
rb_define_module_function(mSasl, "encode", qsasl_encode, -1);
rb_define_module_function(mSasl, "decode", qsasl_decode, -1);
}
Modified: qpid/trunk/qpid/ruby/lib/qpid/connection.rb
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/ruby/lib/qpid/connection.rb?rev=776856&r1=776855&r2=776856&view=diff
==============================================================================
--- qpid/trunk/qpid/ruby/lib/qpid/connection.rb (original)
+++ qpid/trunk/qpid/ruby/lib/qpid/connection.rb Wed May 20 21:15:32 2009
@@ -36,7 +36,7 @@
include MonitorMixin
attr_reader :spec, :attached, :sessions, :thread
- attr_accessor :opened, :failed, :close_code
+ attr_accessor :opened, :failed, :close_code, :user_id
def initialize(sock, args={})
super(sock)
@@ -58,6 +58,7 @@
@thread = nil
@channel_max = 65535
+ @user_id = nil
@delegate = delegate.call(self, args)
end
Modified: qpid/trunk/qpid/ruby/lib/qpid/delegates.rb
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/ruby/lib/qpid/delegates.rb?rev=776856&r1=776855&r2=776856&view=diff
==============================================================================
--- qpid/trunk/qpid/ruby/lib/qpid/delegates.rb (original)
+++ qpid/trunk/qpid/ruby/lib/qpid/delegates.rb Wed May 20 21:15:32 2009
@@ -202,6 +202,7 @@
end
begin
resp = Sasl.client_start(@saslConn, mech_list)
+ @connection.user_id = Sasl.user_id(@saslConn)
ch.connection_start_ok(:client_properties => PROPERTIES,
:mechanism => resp[2],
:response => resp[1])
@@ -214,6 +215,7 @@
def connection_secure(ch, secure)
resp = Sasl.client_step(@saslConn, secure.challenge)
+ @connection.user_id = Sasl.user_id(@saslConn)
ch.connection_secure_ok(:response => resp[1])
end
Modified: qpid/trunk/qpid/ruby/lib/qpid/framer.rb
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/ruby/lib/qpid/framer.rb?rev=776856&r1=776855&r2=776856&view=diff
==============================================================================
--- qpid/trunk/qpid/ruby/lib/qpid/framer.rb (original)
+++ qpid/trunk/qpid/ruby/lib/qpid/framer.rb Wed May 20 21:15:32 2009
@@ -137,6 +137,8 @@
@tx_buf = ""
frm.debug("FLUSHED") if frm
end
+ rescue
+ @sock.close unless @sock.closed?
end
def _write(buf)
Modified: qpid/trunk/qpid/ruby/lib/qpid/qmf.rb
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/ruby/lib/qpid/qmf.rb?rev=776856&r1=776855&r2=776856&view=diff
==============================================================================
--- qpid/trunk/qpid/ruby/lib/qpid/qmf.rb (original)
+++ qpid/trunk/qpid/ruby/lib/qpid/qmf.rb Wed May 20 21:15:32 2009
@@ -252,7 +252,7 @@
args = { :exchange => "qpid.management",
:queue => broker.topic_name,
:binding_key => "console.obj.*.*.#{package_name}.#" }
- broker.amqpSession.exchange_bind(args)
+ broker.amqp_session.exchange_bind(args)
end
end
@@ -264,7 +264,7 @@
args = { :exchange => "qpid.management",
:queue => broker.topic_name,
:binding_key=> "console.obj.*.*.#{package_name}.#{class_name}.#" }
- broker.amqpSession.exchange_bind(args)
+ broker.amqp_session.exchange_bind(args)
end
end
@@ -277,7 +277,7 @@
args = { :exchange => "qpid.management",
:queue => broker.topic_name,
:binding_key => "console.obj.*.*.#{pname}.#{cname}.#" }
- broker.amqpSession.exchange_bind(args)
+ broker.amqp_session.exchange_bind(args)
end
end
@@ -319,7 +319,7 @@
# The default timeout for this synchronous operation is 60 seconds. To change the timeout,
# use the following argument:
#
- # :_timeout = <time in seconds>
+ # :timeout = <time in seconds>
#
# If additional arguments are supplied, they are used as property
# selectors, as long as their keys are strings. For example, if
@@ -340,19 +340,21 @@
unless broker_list.include?(agent.broker)
raise ArgumentError, "Supplied agent is not accessible through the supplied broker"
end
- agent_list << agent
+ agent_list << agent if agent.broker.connected?
else
if kwargs.include?(:object_id)
oid = kwargs[:object_id]
broker_list.each { |broker|
broker.agents.each { |agent|
if oid.broker_bank == agent.broker_bank && oid.agent_bank == agent.agent_bank
- agent_list << agent
+ agent_list << agent if agent.broker.connected?
end
}
}
else
- broker_list.each { |broker| agent_list += broker.agents }
+ broker_list.each { |broker|
+ agent_list += broker.agents if broker.connected?
+ }
end
end
@@ -400,8 +402,8 @@
end
timeout = false
- if kwargs.include?(:_timeout)
- wait_time = kwargs[:_timeout]
+ if kwargs.include?(:timeout)
+ wait_time = kwargs[:timeout]
else
wait_time = DEFAULT_GET_WAIT_TIME
end
@@ -616,8 +618,8 @@
def handle_broker_disconnect(broker); end
def handle_error(error)
- @error = error
synchronize do
+ @error = error if @sync_sequence_list.length > 0
@sync_sequence_list = []
@cv.signal
end
@@ -953,6 +955,8 @@
class Object
+ DEFAULT_METHOD_WAIT_TIME = 60
+
attr_reader :object_id, :schema, :properties, :statistics,
:current_time, :create_time, :delete_time, :broker
@@ -1110,12 +1114,14 @@
timeout = nil
if kwargs.class == Hash
- if kwargs.include?(:_timeout)
- timeout = kwargs[:_timeout]
+ if kwargs.include?(:timeout)
+ timeout = kwargs[:timeout]
+ else
+ timeout = DEFAULT_METHOD_WAIT_TIME
end
- if kwargs.include?(:_async)
- sync = !kwargs[:_async]
+ if kwargs.include?(:async)
+ sync = !kwargs[:async]
end
args.pop
end
@@ -1212,7 +1218,7 @@
end
end
- rescue Qpid::Session::Closed, Qpid::Session::Detached, SystemCallError
+ rescue
delay *= DELAY_FACTOR if delay < DELAY_MAX
end
@@ -1247,6 +1253,7 @@
class Broker
SYNC_TIME = 60
+ @@next_seq = 1
include MonitorMixin
@@ -1267,20 +1274,17 @@
@port = port
@auth_name = auth_name
@auth_pass = auth_pass
+ @user_id = nil
@auth_mechanism = kwargs[:mechanism]
@auth_service = kwargs[:service]
@broker_bank = 1
- @agents = {}
- @agents["1.0"] = Agent.new(self, 0, "BrokerAgent")
@topic_bound = false
@cv = new_cond
- @sync_in_flight = false
- @sync_request = 0
- @sync_result = nil
- @reqs_outstanding = 1
- @error = nil
- @broker_id = nil
+ @error = nil
+ @broker_id = nil
@is_connected = false
+ @amqp_session_id = "%s.%d.%d" % [Socket.gethostname, Process::pid, @@next_seq]
+ @@next_seq += 1
@conn = nil
if @session.managedConnections?
@thread = ManagedConnection.new(self)
@@ -1326,6 +1330,7 @@
def wait_for_stable
synchronize do
+ return unless connected?
return if @reqs_outstanding == 0
@sync_in_flight = true
unless @cv.wait_for(SYNC_TIME) { @reqs_outstanding == 0 }
@@ -1350,6 +1355,7 @@
mp = @amqp_session.message_properties
mp.content_type = "x-application/qmf"
mp.reply_to = amqp_session.reply_to("amq.direct", @reply_name)
+ #mp.user_id = @user_id if @user_id
return Qpid::Message.new(dp, mp, body)
end
@@ -1422,8 +1428,14 @@
end
def try_to_connect
- #begin
- @amqp_session_id = "%s.%d" % [Socket.gethostname, Process::pid]
+ @agents = {}
+ @agents["1.0"] = Agent.new(self, 0, "BrokerAgent")
+ @topic_bound = false
+ @sync_in_flight = false
+ @sync_request = 0
+ @sync_result = nil
+ @reqs_outstanding = 1
+
# FIXME: Need sth for Qpid::Util::connect
@conn = Qpid::Connection.new(TCPSocket.new(@host, @port),
@@ -1433,6 +1445,7 @@
:host => @host,
:service => @auth_service)
@conn.start
+ @user_id = @conn.user_id
@reply_name = "reply-%s" % amqp_session_id
@amqp_session = @conn.session(@amqp_session_id)
@amqp_session.auto_sync = true
Modified: qpid/trunk/qpid/ruby/lib/qpid/session.rb
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/ruby/lib/qpid/session.rb?rev=776856&r1=776855&r2=776856&view=diff
==============================================================================
--- qpid/trunk/qpid/ruby/lib/qpid/session.rb (original)
+++ qpid/trunk/qpid/ruby/lib/qpid/session.rb Wed May 20 21:15:32 2009
@@ -114,7 +114,7 @@
end
end
if error?
- raise Qpid::Session::Exception, exceptions
+ raise Qpid::Session::Exception, @exceptions
end
end
@@ -436,7 +436,7 @@
end
def execution_exception(ex)
- @session.exceptions.append(ex)
+ @session.exceptions << ex
end
end
Modified: qpid/trunk/qpid/ruby/tests/qmf.rb
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/ruby/tests/qmf.rb?rev=776856&r1=776855&r2=776856&view=diff
==============================================================================
--- qpid/trunk/qpid/ruby/tests/qmf.rb (original)
+++ qpid/trunk/qpid/ruby/tests/qmf.rb Wed May 20 21:15:32 2009
@@ -44,7 +44,7 @@
@count = count
for idx in 0...count
synchronize do
- seq = broker.echo(idx, "Echo Message", :_async => true)
+ seq = broker.echo(idx, "Echo Message", :async => true)
@xmt_list[seq] = idx
end
end
@@ -109,7 +109,7 @@
start_qmf
body = "Echo Message Body"
for seq in 1..10
- res = @broker.echo(seq, body, :_timeout => 10)
+ res = @broker.echo(seq, body, :timeout => 10)
assert_equal(0, res.status)
assert_equal("OK", res.text)
assert_equal(seq, res.sequence)
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org