You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/11/13 03:45:19 UTC
svn commit: r713616 [2/3] - in /incubator/qpid/trunk/qpid/ruby: ./ examples/
lib/ lib/qpid/ qpid/ tests/ tests_0-8/
Added: incubator/qpid/trunk/qpid/ruby/lib/qpid/qmf.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/ruby/lib/qpid/qmf.rb?rev=713616&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/ruby/lib/qpid/qmf.rb (added)
+++ incubator/qpid/trunk/qpid/ruby/lib/qpid/qmf.rb Wed Nov 12 18:45:18 2008
@@ -0,0 +1,1503 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Console API for Qpid Management Framework
+
+require 'socket'
+require 'monitor'
+require 'uri'
+require 'time'
+
+module Qpid::Qmf
+
+ # To access the asynchronous operations, a class must be derived from
+ # Console with overrides of any combination of the available methods.
+ class Console
+
+ # Invoked when a connection is established to a broker
+ def broker_connected(broker); end
+
+ # Invoked when the connection to a broker is lost
+ def broker_disconnected(broker); end
+
+ # Invoked when a QMF package is discovered
+ def new_package(name); end
+
+ # Invoked when a new class is discovered. Session.getSchema can be
+ # used to obtain details about the class
+ def new_class(kind, klass_key); end
+
+ # Invoked when a QMF agent is discovered
+ def new_agent(agent); end
+
+ # Invoked when a QMF agent disconects
+ def del_agent(agent); end
+
+ # Invoked when an object is updated
+ def object_props(broker, record); end
+
+ # Invoked when an object is updated
+ def objectStats(broker, record); end
+
+ # Invoked when an event is raised
+ def event(broker, event); end
+
+ def heartbeat(agent, timestamp); end
+
+ def broker_info(broker); end
+ end
+
+ class BrokerURL
+ def initialize(text)
+ uri = URI.parse(text)
+
+ @host = uri.host
+ @port = uri.port ? uri.port : 5672
+ @auth_name = uri.user ? uri.user : "guest"
+ @auth_pass = uri.password ? uri.password: "guest"
+ @auth_mech = "PLAIN"
+
+ return uri
+ end
+
+ def name
+ "#{@host}:#{@port}"
+ end
+
+ def match(host, port)
+ # FIXME: Unlcear what the Python code is actually checking for
+ # here, especially since HOST can resolve to multiple IP's
+ @port == port &&
+ (host == @host || ipaddr(host, port) == ipaddr(@host, @port))
+ end
+
+ private
+ def ipaddr(host, port)
+ s = Socket::getaddrinfo(host, port,
+ Socket::AF_INET, Socket::SOCK_STREAM)
+ s[0][2]
+ end
+ end
+
+ # An instance of the Session class represents a console session running
+ # against one or more QMF brokers. A single instance of Session is
+ # needed to interact with the management framework as a console.
+ class Session
+ CONTEXT_SYNC = 1
+ CONTEXT_STARTUP = 2
+ CONTEXT_MULTIGET = 3
+
+ GET_WAIT_TIME = 60
+
+ include MonitorMixin
+
+ attr_reader :binding_key_list, :select, :seq_mgr, :console, :packages
+
+ # Initialize a session. If the console argument is provided, the
+ # more advanced asynchronous features are available. If console is
+ # defaulted, the session will operate in a simpler, synchronous
+ # manner. The rcvObjects, rcvEvents, and rcvHeartbeats arguments
+ # are meaningful only if 'console' is provided. They control
+ # whether object updates, events, and agent-heartbeats are
+ # subscribed to. If the console is not interested in receiving one
+ # or more of the above, setting the argument to False will reduce
+ # tha bandwidth used by the API. If manageConnections is set to
+ # True, the Session object will manage connections to the brokers.
+ # This means that if a broker is unreachable, it will retry until a
+ # connection can be established. If a connection is lost, the
+ # Session will attempt to reconnect.
+ #
+ # If manageConnections is set to False, the user is responsible for
+ # handing failures. In this case, an unreachable broker will cause
+ # addBroker to raise an exception. If userBindings is set to False
+ # (the default) and rcvObjects is True, the console will receive
+ # data for all object classes. If userBindings is set to True, the
+ # user must select which classes the console shall receive by
+ # invoking the bindPackage or bindClass methods. This allows the
+ # console to be configured to receive only information that is
+ # relavant to a particular application. If rcvObjects id False,
+ # userBindings has no meaning.
+ #
+ # Accept a hash of parameters, where keys can be :console,
+ # :rcv_objects, :rcv_events, :rcv_heartbeats, :manage_connections,
+ # and :user_bindings
+ def initialize(kwargs = {})
+ super()
+ @console = kwargs[:console] || nil
+ @brokers = []
+ @packages = {}
+ @seq_mgr = SequenceManager.new
+ @cv = new_cond
+ @sync_sequence_list = []
+ @result = []
+ @select = []
+ @error = nil
+ @rcv_objects = kwargs[:rcv_objects] || true
+ @rcv_events = kwargs[:rcv_events] || true
+ @rcv_heartbeats = kwargs[:rcv_heartbeats] || true
+ @user_bindings = kwargs[:user_bindings] || false
+ unless @console
+ @rcv_objects = false
+ @rcv_events = false
+ @rcv_heartbeats = false
+ end
+ @binding_key_list = binding_keys
+ @manage_connections = kwargs[:manage_connections] || false
+
+ if @user_bindings && ! @rcv_objects
+ raise ArgumentError, "user_bindings can't be set unless rcv_objects is set and a console is provided"
+ end
+
+ #raise NotImplementedError, @manage_connections
+ end
+
+ def to_s
+ "QMF Console Session Manager (brokers connected: #{@brokers.size})"
+ end
+
+ # Connect to a Qpid broker. Returns an object of type Broker
+ def add_broker(target="amqp://localhost")
+ uri = URI.parse(target)
+ broker = Broker.new(self, uri.host, uri.port, "PLAIN", uri.user, uri.password)
+ unless broker.connected? || @manage_connections
+ raise broker.error
+ end
+
+ @brokers << broker
+ objects(:broker => broker, :class => "agent")
+ return broker
+ end
+
+ # Disconnect from a broker. The 'broker' argument is the object
+ # returned from the addBroker call
+ def del_broker(broker)
+ broker.shutdown
+ @brokers.delete(broker)
+ end
+
+ # Get the list of known classes within a QMF package
+ def classes(package_name)
+ @brokers.each { |broker| broker.wait_for_stable }
+ if @packages.include?(package_name)
+ # FIXME What's the actual structure of @packages[package_name]
+ @packages[package_name].inject([]) do |list, cname, hash|
+ list << [ package_name, cname, hash]
+ end
+ end
+ end
+
+ # Get the schema for a QMF class
+ def schema(klass_key)
+ @brokers.each { |broker| broker.wait_for_stable }
+ pname, cname, hash = klass_key
+ if @packages.include?(pname)
+ @packages[pname][ [cname, hash] ]
+ end
+ end
+
+ def bind_package(package_name)
+ unless @user_bindings && @rcv_objects
+ raise "userBindings option not set for Session"
+ end
+ @brokers.each do |broker|
+ args = { :exchange => "qpid.management",
+ :queue => broker.topicName,
+ :binding_key => "console.obj.#{package_name}" }
+ broker.amqpSession.exchange_bind(args)
+ end
+ end
+
+ def bind_class(klass_key)
+ unless @user_bindings && @rcv_objects
+ raise "userBindings option not set for Session"
+ end
+ pname, cname, hash = klass_key
+ @brokers.each do |broker|
+ args = { :exchange => "qpid.management",
+ :queue => broker.topicName,
+ :binding_key => "console.obj.#{pname}.#{cname}" }
+ broker.amqpSession.exchange_bind(args)
+ end
+ end
+
+ # Get a list of currently known agents
+ def agents(broker=nil)
+ broker_list = []
+ if broker.nil?
+ broker_list = @brokers.dup
+ else
+ broker_list << broker
+ end
+ broker_list.each { |b| b.wait_for_stable }
+ agent_list = []
+ broker_list.each { |b| agent_list += b.agents }
+ return agent_list
+ end
+
+ # Get a list of objects from QMF agents.
+ # All arguments are passed by name(keyword).
+ #
+ # The class for queried objects may be specified in one of the
+ # following ways:
+ # :schema => <schema> - supply a schema object returned from getSchema.
+ # :key => <key> - supply a klass_key from the list returned by getClasses.
+ # :class => <name> - supply a class name as a string. If the class name exists
+ # in multiple packages, a _package argument may also be supplied.
+ # :object_id = <id> - get the object referenced by the object-id
+ #
+ # If objects should be obtained from only one agent, use the following argument.
+ # Otherwise, the query will go to all agents.
+ #
+ # :agent = <agent> - supply an agent from the list returned by getAgents.
+ # If the get query is to be restricted to one broker (as opposed to
+ # all connected brokers), add the following argument:
+ #
+ # :broker = <broker> - supply a broker as returned by addBroker.
+ #
+ # If additional arguments are supplied, they are used as property
+ # selectors, as long as their keys are strings. For example, if
+ # the argument "name" => "test" is supplied, only objects whose
+ # "name" property is "test" will be returned in the result.
+ def objects(kwargs)
+ if kwargs.include?(:broker)
+ broker_list = []
+ broker_list << kwargs[:broker]
+ else
+ broker_list = @brokers
+ end
+ broker_list.each { |broker| broker.wait_for_stable }
+
+ agent_list = []
+ if kwargs.include?(:agent)
+ agent = kwargs[:agent]
+ unless broker_list.include?(agent.broker)
+ raise ArgumentError, "Supplied agent is not accessible through the supplied broker"
+ end
+ agent_list << agent
+ else
+ broker_list.each { |broker| agent_list += broker.agents }
+ end
+
+ cname = nil
+ if kwargs.include?(:schema)
+ # FIXME: What kind of object is kwargs[:schema]
+ pname, cname, hash = kwargs[:schema].getKey()
+ elsif kwargs.include?(:key)
+ pname, cname, hash = kwargs[:key]
+ elsif kwargs.include?(:class)
+ pname, cname, hash = [kwargs[:package], kwargs[:class], nil]
+ end
+ if cname.nil? && ! kwargs.include?(:object_id)
+ raise ArgumentError,
+ "No class supplied, use :schema, :key, :class, or :object_id' argument"
+ end
+
+ map = {}
+ @select = []
+ if kwargs.include?(:object_id)
+ map["_objectId"] = kwargs[:object_id].to_str
+ else
+ map["_class"] = cname
+ map["_package"] = pname if pname
+ map["_hash"] = hash if hash
+ kwargs.each do |k,v|
+ @select << [k, v] if k.is_a?(String)
+ end
+ end
+
+ @result = []
+ agent_list.each do |agent|
+ broker = agent.broker
+ send_codec = Qpid::StringCodec.new(broker.conn.spec)
+ seq = nil
+ synchronize do
+ seq = @seq_mgr.reserve(CONTEXT_MULTIGET)
+ @sync_sequence_list << seq
+ end
+ broker.set_header(send_codec, ?G, seq)
+ send_codec.write_map(map)
+ smsg = broker.message(send_codec.encoded, "agent.#{agent.bank}")
+ broker.emit(smsg)
+ end
+
+ timeout = false
+ synchronize do
+ unless @cv.wait_for(GET_WAIT_TIME) {
+ @sync_sequence_list.empty? || @error }
+ @sync_sequence_list.each do |pending_seq|
+ @seq_mgr.release(pending_seq)
+ end
+ @sync_sequence_list = []
+ timeout = true
+ end
+ end
+
+ if @error
+ errorText = @error
+ @error = nil
+ raise errorText
+ end
+
+ if @result.empty? && timeout
+ raise "No agent responded within timeout period"
+ end
+ @result
+ end
+
+ def set_event_filter(kwargs); end
+
+ def handle_broker_connect(broker); end
+
+ def handle_broker_resp(broker, codec, seq)
+ broker.broker_id = codec.read_uuid
+ @console.broker_info(broker) if @console
+
+ # Send a package request
+ # (effectively inc and dec outstanding by not doing anything)
+ send_codec = Qpid::StringCodec.new(broker.conn.spec)
+ seq = @seq_mgr.reserve(CONTEXT_STARTUP)
+ broker.set_header(send_codec, ?P, seq)
+ smsg = broker.message(send_codec.encoded)
+ broker.emit(smsg)
+ end
+
+ def handle_package_ind(broker, codec, seq)
+ pname = codec.read_str8
+ new_package = false
+ synchronize do
+ new_package = ! @packages.include?(pname)
+ @packages[pname] = {} if new_package
+ end
+ @console.new_package(pname) if @console
+
+ # Send a class request
+ broker.inc_outstanding
+ send_codec = Qpid::StringCodec.new(broker.conn.spec)
+ seq = @seq_mgr.reserve(CONTEXT_STARTUP)
+ broker.set_header(send_codec, ?Q, seq)
+ send_codec.write_str8(pname)
+ smsg = broker.message(send_codec.encoded)
+ broker.emit(smsg)
+ end
+
+ def handle_command_complete(broker, codec, seq)
+ code = codec.read_uint32
+ text = codec.read_str8
+ context = @seq_mgr.release(seq)
+ if context == CONTEXT_STARTUP
+ broker.dec_outstanding
+ elsif context == CONTEXT_SYNC && seq == broker.sync_sequence
+ broker.sync_done
+ elsif context == CONTEXT_MULTIGET && @sync_sequence_list.include?(seq)
+ synchronize do
+ @sync_sequence_list.delete(seq)
+ @cv.signal if @sync_sequence_list.empty?
+ end
+ end
+ end
+
+ def handle_class_ind(broker, codec, seq)
+ kind = codec.read_uint8
+ pname = codec.read_str8
+ cname = codec.read_str8
+ hash = codec.read_bin128
+ unknown = false
+
+ synchronize do
+ return unless @packages.include?(pname)
+ unknown = true unless @packages[pname].include?([cname, hash])
+ end
+
+ if unknown
+ # Send a schema request for the unknown class
+ broker.inc_outstanding
+ send_codec = Qpid::StringCodec.new(broker.conn.spec)
+ seq = @seq_mgr.reserve(CONTEXT_STARTUP)
+ broker.set_header(send_codec, ?S, seq)
+ send_codec.write_str8(pname)
+ send_codec.write_str8(cname)
+ send_codec.write_bin128(hash)
+ smsg = broker.message(send_codec.encoded)
+ broker.emit(smsg)
+ end
+ end
+
+ def handle_method_resp(broker, codec, seq)
+ code = codec.read_uint32
+
+ text = codec.read_str16
+ out_args = {}
+ method, synchronous = @seq_mgr.release(seq)
+ if code == 0
+ method.arguments.each do |arg|
+ if arg.dir.index(?O)
+ out_args[arg.name] = decode_value(codec, arg.type)
+ end
+ end
+ end
+ result = MethodResult.new(code, text, out_args)
+ if synchronous:
+ broker.synchronize do
+ broker.sync_result = MethodResult.new(code, text, out_args)
+ broker.sync_done
+ end
+ else
+ @console.method_response(broker, seq, result) if @console
+ end
+ end
+
+ def handle_heartbeat_ind(broker, codec, seq, msg)
+ if @console
+ broker_bank = 1
+ agent_bank = 0
+ dp = msg.get("delivery_properties")
+ if dp
+ key = dp["routing_key"]
+ key_elements = key.split(".")
+ if key_elements.length == 4
+ broker_bank = key_elements[2].to_i
+ agent_bank = key_lements[2].to_i
+ end
+ end
+ agent = broker.agent(broker_bank, agent_bank)
+ timestamp = codec.read_uint64
+ @console.heartbeat(agent, timestamp)
+ end
+ end
+
+ def handle_event_ind(broker, codec, seq)
+ if @console
+ event = Event.new(self, broker, codec)
+ @console.event(broker, event)
+ end
+ end
+
+ def handle_schema_resp(broker, codec, seq)
+ kind = codec.read_uint8
+ pname = codec.read_str8
+ cname = codec.read_str8
+ hash = codec.read_bin128
+ klass_key = [pname, cname, hash]
+ klass = SchemaClass.new(kind, klass_key, codec)
+ synchronize { @packages[pname][ [cname, hash] ] = klass }
+
+ @seq_mgr.release(seq)
+ broker.dec_outstanding
+ @console.new_class(kind, klass_key) if @console
+ end
+
+ def handle_content_ind(broker, codec, seq, prop=false, stat=false)
+ pname = codec.read_str8
+ cname = codec.read_str8
+ hash = codec.read_bin128
+ klass_key = [pname, cname, hash]
+
+ schema = nil
+ synchronize do
+ return unless @packages.include?(pname)
+ return unless @packages[pname].include?([cname, hash])
+ schema = @packages[pname][ [cname, hash] ]
+ end
+
+ object = Qpid::Qmf::Object.new(self, broker, schema, codec, prop, stat)
+ if pname == "org.apache.qpid.broker" && cname == "agent"
+ broker.update_agent(object)
+ end
+
+ synchronize do
+ if @sync_sequence_list.include?(seq)
+ if object.timestamps()[2] == 0 && select_match(object)
+ @result << object
+ end
+ return
+ end
+ end
+
+ @console.object_props(broker, object) if @console && prop
+ @console.object_stats(broker, object) if @console && stat
+ end
+
+ def handle_broker_disconnect(broker); end
+
+ def handle_error(error)
+ @error = error
+ synchronize do
+ @sync_sequence_list = []
+ @cv.signal
+ end
+ end
+
+ # Decode, from the codec, a value based on its typecode
+ def decode_value(codec, typecode)
+ case typecode
+ when 1: data = codec.read_uint8 # U8
+ when 2: data = codec.read_uint16 # U16
+ when 3: data = codec.read_uint32 # U32
+ when 4: data = codec.read_uint64 # U64
+ when 6: data = codec.read_str8 # SSTR
+ when 7: data = codec.read_str16 # LSTR
+ when 8: data = codec.read_int64 # ABSTIME
+ when 9: data = codec.read_uint64 # DELTATIME
+ when 10: data = ObjectId.new(codec) # REF
+ when 11: data = codec.read_uint8 != 0 # BOOL
+ when 12: data = codec.read_float # FLOAT
+ when 13: data = codec.read_double # DOUBLE
+ when 14: data = codec.read_uuid # UUID
+ when 15: data = codec.read_map # FTABLE
+ when 16: data = codec.read_int8 # S8
+ when 17: data = codec.read_int16 # S16
+ when 18: data = codec.read_int32 # S32
+ when 19: data = codec.read_int64 # S64
+ else
+ raise ArgumentError, "Invalid type code: #{typecode} - #{typecode.inspect}"
+ end
+ return data
+ end
+
+ # Encode, into the codec, a value based on its typecode
+ def encode_value(codec, value, typecode)
+ # FIXME: Python does a lot of magic type conversions
+ # We just assume that value has the right type; this is safer
+ # than coercing explicitly, since Array::pack will complain
+ # loudly about various type errors
+ case typecode
+ when 1: codec.write_uint8(value) # U8
+ when 2: codec.write_uint16(value) # U16
+ when 3: codec.write_uint32(value) # U32
+ when 4: codec.write_uint64(value) # U64
+ when 6: codec.write_str8(value) # SSTR
+ when 7: codec.write_str16(value) # LSTR
+ when 8: codec.write_int64(value) # ABSTIME
+ when 9: codec.write_uint64(value) # DELTATIME
+ when 10: value.encode(codec) # REF
+ when 11: codec.write_uint8(value ? 1 : 0) # BOOL
+ when 12: codec.write_float(value) # FLOAT
+ when 13: codec.write_double(value) # DOUBLE
+ when 14: codec.write_uuid(value) # UUID
+ when 15: codec.write_map(value) # FTABLE
+ when 16: codec.write_int8(value) # S8
+ when 17: codec.write_int16(value) # S16
+ when 18: codec.write_int32(value) # S32
+ when 19: codec.write_int64(value) # S64
+ else
+ raise ValueError, "Invalid type code: %d" % typecode
+ end
+ end
+
+ def display_value(value, typecode)
+ case typecode
+ when 1: return value.to_s
+ when 2: return value.to_s
+ when 3: return value.to_s
+ when 4: return value.to_s
+ when 6: return value.to_s
+ when 7: return value.to_s
+ when 8: return strftime("%c", gmtime(value / 1000000000))
+ when 9: return value.to_s
+ when 10: return value.to_s
+ when 11: return value ? 'T' : 'F'
+ when 12: return value.to_s
+ when 13: return value.to_s
+ when 14: return Qpid::UUID::format(hash)
+ when 15: return value.to_s
+ when 16: return value.to_s
+ when 17: return value.to_s
+ when 18: return value.to_s
+ when 19: return value.to_s
+ else
+ raise ValueError, "Invalid type code: %d" % typecode
+ end
+ end
+
+ private
+
+ def binding_keys
+ key_list = []
+ key_list << "schema.#"
+ if @rcv_objects && @rcv_events && @rcv_heartbeats &&
+ ! @user_bindings
+ key_list << "console.#"
+ else
+ if @rcv_objects && ! @user_bindings
+ key_list << "console.obj.#"
+ else
+ key_list << "console.obj.org.apache.qpid.broker.agent"
+ end
+ key_list << "console.event.#" if @rcv_events
+ key_list << "console.heartbeat" if @rcv_heartbeats
+ end
+ return key_list
+ end
+
+ # Check the object against select to check for a match
+ def select_match(object)
+ select.each do |key, value|
+ object.properties.each do |prop, propval|
+ return false if key == prop.name && value != propval
+ end
+ end
+ return true
+ end
+
+ end
+
+ class Package
+ attr_reader :name
+
+ def initialize(name)
+ @name = name
+ end
+ end
+
+ class ClassKey
+ attr_reader :package, :klass_name, :hash
+
+ def initialize(package, klass_name, hash)
+ @package = package
+ @klass_name = klass_name
+ @hash = hash
+ end
+ end
+
+ class SchemaClass
+
+ CLASS_KIND_TABLE = 1
+ CLASS_KIND_EVENT = 2
+
+ attr_reader :klass_key, :properties, :statistics, :methods, :arguments
+
+ def initialize(kind, key, codec)
+ @kind = kind
+ @klass_key = key
+ @properties = []
+ @statistics = []
+ @methods = []
+ @arguments = []
+
+ if @kind == CLASS_KIND_TABLE
+ prop_count = codec.read_uint16
+ stat_count = codec.read_uint16
+ method_count = codec.read_uint16
+ prop_count.times { |idx|
+ @properties << SchemaProperty.new(codec) }
+ stat_count.times { |idx|
+ @statistics << SchemaStatistic.new(codec) }
+ method_count.times { |idx|
+ @methods<< SchemaMethod.new(codec) }
+ elsif @kind == CLASS_KIND_EVENT
+ arg_count = codec.read_uint16
+ arg_count.times { |idx|
+ sa = SchemaArgument.new(codec, false)
+ @arguments << sa
+ }
+ end
+ end
+
+ def to_s
+ pname, cname, hash = @klass_key
+ if @kind == CLASS_KIND_TABLE
+ kind_str = "Table"
+ elsif @kind == CLASS_KIND_EVENT
+ kind_str = "Event"
+ else
+ kind_str = "Unsupported"
+ end
+ result = "%s Class: %s:%s " % [kind_str, pname, cname]
+ result += Qpid::UUID::format(hash)
+ return result
+ end
+ end
+
+ class SchemaProperty
+
+ attr_reader :name, :type, :access, :index, :optional,
+ :unit, :min, :max, :maxlan, :desc
+
+ def initialize(codec)
+ map = codec.read_map
+ @name = map["name"]
+ @type = map["type"]
+ @access = map["access"]
+ @index = map["index"] != 0
+ @optional = map["optional"] != 0
+ @unit = map["unit"]
+ @min = map["min"]
+ @max = map["max"]
+ @maxlan = map["maxlen"]
+ @desc = map["desc"]
+ end
+
+ def to_s
+ @name
+ end
+ end
+
+ class SchemaStatistic
+
+ attr_reader :name, :type, :unit, :desc
+
+ def initialize(codec)
+ map = codec.read_map
+ @name = map["name"]
+ @type = map["type"]
+ @unit = map["unit"]
+ @desc = map["desc"]
+ end
+
+ def to_s
+ @name
+ end
+ end
+
+ class SchemaMethod
+
+ attr_reader :name, :desc, :arguments
+
+ def initialize(codec)
+ map = codec.read_map
+ @name = map["name"]
+ arg_count = map["argCount"]
+ @desc = map["desc"]
+ @arguments = []
+ arg_count.times { |idx|
+ @arguments << SchemaArgument.new(codec, true)
+ }
+ end
+
+ def to_s
+ result = @name + "("
+ first = true
+ result += @arguments.select { |arg| arg.dir.index(?I) }.join(", ")
+ result += ")"
+ return result
+ end
+ end
+
+ class SchemaArgument
+
+ attr_reader :name, :type, :dir, :unit, :min, :max, :maxlen
+ attr_reader :desc, :default
+
+ def initialize(codec, method_arg)
+ map = codec.read_map
+ @name = map["name"]
+ @type = map["type"]
+ @dir = map["dir"].upcase if method_arg
+ @unit = map["unit"]
+ @min = map["min"]
+ @max = map["max"]
+ @maxlen = map["maxlen"]
+ @desc = map["desc"]
+ @default = map["default"]
+ end
+ end
+
+ # Object that represents QMF object identifiers
+ class ObjectId
+
+ include Comparable
+
+ attr_reader :first, :second
+
+ def initialize(codec, first=0, second=0)
+ if codec
+ @first = codec.read_uint64
+ @second = codec.read_uint64
+ else
+ @first = first
+ @second = second
+ end
+ end
+
+ def <=>(other)
+ return 1 unless other.is_a?(ObjectId)
+ return -1 if first < other.first
+ return 1 if first > other.first
+ return second <=> other.second
+ end
+
+ def to_s
+ "%d-%d-%d-%d-%d" % [flags, sequence, broker_bank, agent_bank, object]
+ end
+
+ def index
+ [first, second]
+ end
+
+ def flags
+ (first & 0xF000000000000000) >> 60
+ end
+
+ def sequence
+ (first & 0x0FFF000000000000) >> 48
+ end
+
+ def broker_bank
+ (first & 0x0000FFFFF0000000) >> 28
+ end
+
+ def agent_bank
+ first & 0x000000000FFFFFFF
+ end
+
+ def object
+ second
+ end
+
+ def durable?
+ sequence == 0
+ end
+
+ def encode(codec)
+ codec.write_uint64(first)
+ codec.write_uint64(second)
+ end
+ end
+
+ class Object
+
+ attr_reader :object_id, :schema, :properties, :statistics,
+ :current_time, :create_time, :delete_time, :broker
+
+ def initialize(session, broker, schema, codec, prop, stat)
+ @session = session
+ @broker = broker
+ @schema = schema
+ @current_time = codec.read_uint64
+ @create_time = codec.read_uint64
+ @delete_time = codec.read_uint64
+ @object_id = ObjectId.new(codec)
+ @properties = []
+ @statistics = []
+ if prop
+ missing = parse_presence_masks(codec, schema)
+ schema.properties.each do |property|
+ v = nil
+ unless missing.include?(property.name)
+ v = @session.decode_value(codec, property.type)
+ end
+ @properties << [property, v]
+ end
+ end
+
+ if stat
+ schema.statistics.each do |statistic|
+ s = @session.decode_value(codec, statistic.type)
+ @statistics << [statistic, s]
+ end
+ end
+ end
+
+ def klass_key
+ @schema.klass_key
+ end
+
+
+ def methods
+ @schema.methods
+ end
+
+ # Return the current, creation, and deletion times for this object
+ def timestamps
+ return [@current_time, @create_time, @delete_time]
+ end
+
+ # Return a string describing this object's primary key
+ def index
+ @properties.select { |property, value|
+ property.index
+ }.collect { |property,value|
+ value.to_s }.join(":")
+ end
+
+ # Replace properties and/or statistics with a newly received update
+ def merge_update(newer)
+ unless object_id == newer.object_id
+ raise "Objects with different object-ids"
+ end
+ @properties = newer.getProperties unless newer.properties.empty?
+ @statistics = newer.getStatistics unless newer.statistics.empty?
+ end
+
+ def to_s
+ index
+ end
+
+ # This must be defined because ruby has this (deprecated) method built in.
+ def id
+ method_missing(:id)
+ end
+
+ # Same here..
+ def type
+ method_missing(:type)
+ end
+
+ def name
+ method_missing(:name)
+ end
+
+ def method_missing(name, *args)
+ name = name.to_s
+
+ if method = @schema.methods.find { |method| name == method.name }
+ return invoke(method, name, args)
+ end
+
+ @properties.each do |property, value|
+ return value if name == property.name
+ if name == "_#{property.name}_" && property.type == 10
+ # Dereference references
+ deref = @session.objects(:object_id => value, :broker => @broker)
+ return nil unless deref.size == 1
+ return deref[0]
+ end
+ end
+ @statistics.each do |statistic, value|
+ if name == statistic.name
+ return value
+ end
+ end
+ raise "Type Object has no attribute '#{name}'"
+ end
+
+ private
+
+ def send_method_request(method, name, args, synchronous = false)
+ @schema.methods.each do |schema_method|
+ if name == schema_method.name
+ send_codec = Qpid::StringCodec.new(@broker.conn.spec)
+ seq = @session.seq_mgr.reserve([schema_method, synchronous])
+ @broker.set_header(send_codec, ?M, seq)
+ @object_id.encode(send_codec)
+ pname, cname, hash = @schema.klass_key
+ send_codec.write_str8(pname)
+ send_codec.write_str8(cname)
+ send_codec.write_bin128(hash)
+ send_codec.write_str8(name)
+
+ formals = method.arguments.select { |arg| arg.dir.index(?I) }
+ count = method.arguments.select { |arg| arg.dir.index(?I) }.size
+ unless formals.size == args.size
+ raise "Incorrect number of arguments: expected #{formals.size}, got #{args.size}"
+ end
+
+ formals.zip(args).each do |formal, actual|
+ @session.encode_value(send_codec, actual, formal.type)
+ end
+
+ smsg = @broker.message(send_codec.encoded,
+ "agent.#{object_id.broker_bank}.#{object_id.agent_bank}")
+
+ @broker.sync_start if synchronous
+ @broker.emit(smsg)
+
+ return seq
+ end
+ end
+ end
+
+ def invoke(method, name, args)
+ if send_method_request(method, name, args, synchronous = true)
+ unless @broker.wait_for_sync_done
+ @session.seq_mgr.release(seq)
+ raise "Timed out waiting for method to respond"
+ end
+
+ if @broker.error
+ error_text = @broker.error
+ @broker.error = nil
+ raise error_text
+ end
+
+ return @broker.sync_result
+ end
+ raise "Invalid Method (software defect) [#{name}]"
+ end
+
+ def parse_presence_masks(codec, schema)
+ exclude_list = []
+ bit = 0
+ schema.properties.each do |property|
+ if property.optional
+ if bit == 0
+ mask = codec.read_uint8
+ bit = 1
+ end
+ if (mask & bit) == 0
+ exclude_list << property.name
+ end
+ bit *= 2
+ bit = 0 if bit == 256
+ end
+ end
+ return exclude_list
+ end
+ end
+
+ class MethodResult
+
+ attr_reader :status, :text
+
+ def initialize(status, text, out_args)
+ @status = status
+ @text = text
+ @out_args = out_args
+ end
+
+ def method_missing(name)
+ name = name.to_s()
+ if @out_args.include?(name)
+ return @out_args[name]
+ else
+ raise "Unknown method result arg #{name}"
+ end
+ end
+
+ def to_s
+ "#{text} (#{status}) - #{out_args.inspect}"
+ end
+ end
+
+ class Broker
+
+ SYNC_TIME = 60
+
+ include MonitorMixin
+
+ attr_accessor :error
+
+ attr_reader :amqp_session_id, :amqp_session, :conn
+
+ attr_accessor :broker_id, :sync_result
+
+ def initialize(session, host, port, auth_mech, auth_user, auth_pass)
+ super()
+
+ # For debugging..
+ Thread.abort_on_exception = true
+
+ @session = session
+ @host = host
+ @port = port
+ @auth_user = auth_user
+ @auth_pass = auth_pass
+ @agents = {}
+ @agents["1.0"] = Agent.new(self, "1.0", "BrokerAgent")
+ @topic_bound = false
+ @cv = new_cond
+ @sync_in_flight = false
+ @sync_request = 0
+ @sync_result = nil
+ @reqs_outstanding = 1
+ @error = nil
+ @broker_id = nil
+ @is_connected = false
+ @conn = nil
+ try_to_connect
+ end
+
+ def connected?
+ @is_connected
+ end
+
+ def agent(broker_bank, agent_bank)
+ bank_key = "%d.%d" % [broker_bank, agent_bank]
+ return @agents[bank_key]
+ end
+
+ # Get the list of agents reachable via this broker
+ def agents
+ @agents.values
+ end
+
+ def url
+ "#{@host}:#{@port}"
+ end
+
+ def to_s
+ if connected?
+ "Broker connected at: #{url}"
+ else
+ "Disconnected Broker"
+ end
+ end
+
+ def wait_for_sync_done
+ synchronize do
+ return @cv.wait_for(SYNC_TIME) { ! @sync_in_flight || @error }
+ end
+ end
+
+ def wait_for_stable
+ synchronize do
+ return if @reqs_outstanding == 0
+ @sync_in_flight = true
+ unless @cv.wait_for(SYNC_TIME) { @reqs_outstanding == 0 }
+ raise "Timed out waiting for broker to synchronize"
+ end
+ end
+ end
+
+ # Compose the header of a management message
+ def set_header(codec, opcode, seq=0)
+ codec.write_uint8(?A)
+ codec.write_uint8(?M)
+ codec.write_uint8(?2)
+ codec.write_uint8(opcode)
+ codec.write_uint32(seq)
+ end
+
+ def message(body, routing_key="broker")
+ dp = @amqp_session.delivery_properties
+ dp.routing_key = routing_key
+ mp = @amqp_session.message_properties
+ mp.content_type = "x-application/qmf"
+ mp.reply_to = amqp_session.reply_to("amq.direct", @reply_name)
+ return Qpid::Message.new(dp, mp, body)
+ end
+
+ def emit(msg, dest="qpid.management")
+ @amqp_session.message_transfer(:destination => dest,
+ :message => msg)
+ end
+
+ def inc_outstanding
+ synchronize { @reqs_outstanding += 1 }
+ end
+
+ def dec_outstanding
+ synchronize do
+ @reqs_outstanding -= 1
+ if @reqs_outstanding == 0 && ! @topic_bound
+ @topic_bound = true
+ @session.binding_key_list.each do |key|
+ args = {
+ :exchange => "qpid.management",
+ :queue => @topic_name,
+ :binding_key => key }
+ @amqp_session.exchange_bind(args)
+ end
+ end
+ if @reqs_outstanding == 0 && @sync_in_flight
+ sync_done
+ end
+ end
+ end
+
+ def sync_start
+ synchronize { @sync_in_flight = true }
+ end
+
+ def sync_done
+ synchronize do
+ @sync_in_flight = false
+ @cv.signal
+ end
+ end
+
+ def update_agent(obj)
+ bank_key = "%d.%d" % [obj.brokerBank, obj.agentBank]
+ if obj.delete_time == 0
+ unless @agents.include?(bank_key)
+ agent = Agent.new(self, bank_key, obj.label)
+ @agents[bank_key] = agent
+ @session.console.new_agent(agent) if @session.console
+ end
+ else
+ agent = @agents.delete(bank_key)
+ @session.console.del_agent(agent) if agent && @session.console
+ end
+ end
+
+ def shutdown
+ if connected?
+ @amqp_session.incoming("rdest").stop
+ if @session.console
+ @amqp_session.incoming("tdest").stop
+ end
+ @amqp_session.close
+ @is_connected = false
+ else
+ raise "Broker already disconnected"
+ end
+ end
+
+ private
+
+ def try_to_connect
+ #begin
+ @amqp_session_id = "%s.%d" % [Socket.gethostname, Process::pid]
+ # FIXME: Need sth for Qpid::Util::connect
+
+ @conn = Qpid::Connection.new(TCPSocket.new(@host, @port),
+ :username => @auth_user,
+ :password => @auth_pass)
+ @conn.start
+ @reply_name = "reply-%s" % amqp_session_id
+ @amqp_session = @conn.session(@amqp_session_id)
+ @amqp_session.auto_sync = true
+
+ @amqp_session.queue_declare(:queue => @reply_name,
+ :exclusive => true,
+ :auto_delete => true)
+
+ @amqp_session.exchange_bind(:exchange => "amq.direct",
+ :queue => @reply_name,
+ :binding_key => @reply_name)
+ @amqp_session.message_subscribe(:queue => @reply_name,
+ :destination => "rdest",
+ :accept_mode => @amqp_session.message_accept_mode.none,
+ :acquire_mode => @amqp_session.message_acquire_mode.pre_acquired)
+ q = @amqp_session.incoming("rdest")
+ q.exc_listen(& method(:exception_cb))
+ q.listen(& method(:reply_cb))
+ @amqp_session.message_set_flow_mode(:destination => "rdest",
+ :flow_mode => 1)
+ @amqp_session.message_flow(:destination => "rdest",
+ :unit => 0,
+ :value => 0xFFFFFFFF)
+ @amqp_session.message_flow(:destination => "rdest",
+ :unit => 1,
+ :value => 0xFFFFFFFF)
+
+ @topic_name = "topic-#{@amqp_session_id}"
+ @amqp_session.queue_declare(:queue => @topic_name,
+ :exclusive => true,
+ :auto_delete => true)
+ @amqp_session.message_subscribe(:queue => @topic_name,
+ :destination => "tdest",
+ :accept_mode => @amqp_session.message_accept_mode.none,
+ :acquire_mode => @amqp_session.message_acquire_mode.pre_acquired)
+ @amqp_session.incoming("tdest").listen(& method(:reply_cb))
+ @amqp_session.message_set_flow_mode(:destination => "tdest",
+ :flow_mode => 1)
+ @amqp_session.message_flow(:destination => "tdest",
+ :unit => 0,
+ :value => 0xFFFFFFFF)
+ @amqp_session.message_flow(:destination => "tdest",
+ :unit => 1,
+ :value => 0xFFFFFFFF)
+
+ @is_connected = true
+ @session.handle_broker_connect(self)
+
+ codec = Qpid::StringCodec.new(@conn.spec)
+ set_header(codec, ?B)
+ msg = message(codec.encoded)
+ emit(msg)
+ # FIXME: These exceptions are bogus here
+ #rescue socket.error => e
+ # @error = "Socket Error %s - %s" % [e[0], e[1]]
+ #rescue Closed => e
+ # @error = "Connect Failed %d - %s" % [e[0], e[1]]
+ #rescue ConnectionFailed => e
+ # @error = "Connect Failed %d - %s" % [e[0], e[1]]
+ #end
+ end
+
+ # Check the header of a management message and extract the opcode and
+ # class
+ def check_header(codec)
+ begin
+ return [nil, nil] unless codec.read_uint8 == ?A
+ return [nil, nil] unless codec.read_uint8 == ?M
+ return [nil, nil] unless codec.read_uint8 == ?2
+ opcode = codec.read_uint8
+ seq = codec.read_uint32
+ return [opcode, seq]
+ rescue
+ return [nil, nil]
+ end
+ end
+
+ def reply_cb(msg)
+ codec = Qpid::StringCodec.new(@conn.spec, msg.body)
+ loop do
+ opcode, seq = check_header(codec)
+ return unless opcode
+ case opcode
+ when ?b: @session.handle_broker_resp(self, codec, seq)
+ when ?p: @session.handle_package_ind(self, codec, seq)
+ when ?z: @session.handle_command_complete(self, codec, seq)
+ when ?q: @session.handle_class_ind(self, codec, seq)
+ when ?m: @session.handle_method_resp(self, codec, seq)
+ when ?h: @session.handle_heartbeat_ind(self, codec, seq, msg)
+ when ?e: @session.handle_event_ind(self, codec, seq)
+ when ?s: @session.handle_schema_resp(self, codec, seq)
+ when ?c: @session.handle_content_ind(self, codec, seq, true, false)
+ when ?i: @session.handle_content_ind(self, codec, seq, false, true)
+ when ?g: @session.handle_content_ind(self, codec, seq, true, true)
+ else
+ raise "Unexpected opcode #{opcode.inspect}"
+ end
+ end
+ end
+
+ def exception_cb(data)
+ @is_connected = false
+ @error = data
+ synchronize { @cv.signal if @sync_in_flight }
+ @session.handle_error(@error)
+ @session.handle_broker_disconnect(self)
+ end
+ end
+
+ class Agent
+ attr_reader :broker, :bank
+
+ def initialize(broker, bank, label)
+ @broker = broker
+ @bank = bank
+ @label = label
+ end
+
+ def to_s
+ "Agent at bank %s (%s)" % [@bank, @label]
+ end
+ end
+
+ class Event
+
+ attr_reader :klass_key, :arguments, :timestamp, :name, :schema
+
+ def initialize(session, broker, codec)
+ @session = session
+ @broker = broker
+ pname = codec.read_str8
+ cname = codec.read_str8
+ hash = codec.read_bin128
+ @klass_key = [pname, cname, hash]
+ @timestamp = codec.read_int64
+ @severity = codec.read_uint8
+ @schema = nil
+ session.packages.keys.each do |pname|
+ k = [cname, hash]
+ if session.packages[pname].include?(k)
+ @schema = session.packages[pname][k]
+ @arguments = {}
+ @schema.arguments.each do |arg|
+ v = session.decode_value(codec, arg.type)
+ @arguments[arg.name] = v
+ end
+ end
+ end
+ end
+
+ def to_s
+ return "<uninterpretable>" unless @schema
+ t = Time.at(self.timestamp / 1000000000)
+ out = t.strftime("%c")
+ out += " " + sev_name + " " + @klass_key[0] + ":" + klass_key[1]
+ out += " broker=" + @broker.url
+ @schema.arguments.each do |arg|
+ out += " " + arg.name + "=" + @session.display_value(@arguments[arg.name], arg.type)
+ end
+ return out
+ end
+
+ def sev_name
+ case @severity
+ when 0 : return "EMER "
+ when 1 : return "ALERT"
+ when 2 : return "CRIT "
+ when 3 : return "ERROR"
+ when 4 : return "WARN "
+ when 5 : return "NOTIC"
+ when 6 : return "INFO "
+ when 7 : return "DEBUG"
+ else
+ return "INV-%d" % @severity
+ end
+ end
+
+ end
+
+ # Manage sequence numbers for asynchronous method calls
+ class SequenceManager
+ include MonitorMixin
+
+ def initialize
+ super()
+ @sequence = 0
+ @pending = {}
+ end
+
+ # Reserve a unique sequence number
+ def reserve (data)
+ synchronize do
+ result = @sequence
+ @sequence += 1
+ @pending[result] = data
+ return result
+ end
+ end
+
+ # Release a reserved sequence number
+ def release (seq)
+ synchronize { @pending.delete(seq) }
+ end
+ end
+
+ class DebugConsole < Console
+
+ def broker_connected(broker)
+ puts "brokerConnected #{broker}"
+ end
+
+ def broker_disconnected(broker)
+ puts "brokerDisconnected #{broker}"
+ end
+
+ def new_package(name)
+ puts "newPackage #{name}"
+ end
+
+ def new_class(kind, klass_key)
+ puts "newClass #{kind} #{klass_key}"
+ end
+
+ def new_agent(agent)
+ puts "new_agent #{agent}"
+ end
+
+ def del_agent(agent)
+ puts "delAgent #{agent}"
+ end
+
+ def object_props(broker, record)
+ puts "objectProps #{record.klass_key}"
+ end
+
+ def object_stats(broker, record)
+ puts "objectStats #{record.klass_key}"
+ end
+
+ def event(broker, event)
+ puts "event #{event}"
+ end
+
+ def heartbeat(agent, timestamp)
+ puts "heartbeat #{agent}"
+ end
+
+ def broker_info(broker)
+ puts "brokerInfo #{broker}"
+ end
+ end
+end
Added: incubator/qpid/trunk/qpid/ruby/lib/qpid/queue.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/ruby/lib/qpid/queue.rb?rev=713616&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/ruby/lib/qpid/queue.rb (added)
+++ incubator/qpid/trunk/qpid/ruby/lib/qpid/queue.rb Wed Nov 12 18:45:18 2008
@@ -0,0 +1,101 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Augment the standard python multithreaded Queue implementation to add a
+# close() method so that threads blocking on the content of a queue can be
+# notified if the queue is no longer in use.
+
+require 'thread'
+
+# Python nominally uses a bounded queue, but the code never establishes
+# a maximum size; we therefore use Ruby's unbounded queue
+class Qpid::Queue < ::Queue
+
+ DONE = Object.new
+ STOP = Object.new
+
+ def initialize
+ super
+ @error = nil
+ @listener = nil
+ @exc_listener = nil
+ @exc_listener_lock = Monitor.new
+ @thread = nil
+ end
+
+ def close(error = nil)
+ @error = error
+ put(DONE)
+ unless @thread.nil?
+ @thread.join()
+ @thread = nil
+ end
+ end
+
+ def get(block = true, timeout = nil)
+ unless timeout.nil?
+ raise NotImplementedError
+ end
+ result = pop(! block)
+ if result == DONE
+ # this guarantees that any other waiting threads or any future
+ # calls to get will also result in a Qpid::Closed exception
+ put(DONE)
+ raise Qpid::Closed.new(@error)
+ else
+ return result
+ end
+ end
+
+ alias :put :push
+
+ def exc_listen(&block)
+ @exc_listener_lock.synchronize do
+ @exc_listener = block
+ end
+ end
+
+ def listen(&block)
+ if ! block_given? && @thread
+ put(STOP)
+ @thread.join()
+ @thread = nil
+ end
+
+ # FIXME: There is a potential race since we could be changing one
+ # non-nil listener to another
+ @listener = block
+
+ if block_given? && @thread.nil?
+ @thread = Thread.new do
+ loop do
+ begin
+ o = get()
+ break if o == STOP
+ @listener.call(o)
+ rescue Qpid::Closed => e
+ @exc_listener.call(e) if @exc_listener
+ break
+ end
+ end
+ end
+ end
+ end
+
+end
Added: incubator/qpid/trunk/qpid/ruby/lib/qpid/session.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/ruby/lib/qpid/session.rb?rev=713616&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/ruby/lib/qpid/session.rb (added)
+++ incubator/qpid/trunk/qpid/ruby/lib/qpid/session.rb Wed Nov 12 18:45:18 2008
@@ -0,0 +1,458 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require 'monitor'
+
+module Qpid
+
+ class Session < Invoker
+
+ def log; Qpid::logger["qpid.io.cmd"]; end
+ def msg; Qpid::logger["qpid.io.msg"]; end
+
+
+ class Exception < RuntimeError; end
+ class Closed < Qpid::Session::Exception; end
+ class Detached < Qpid::Session::Exception; end
+
+
+ INCOMPLETE = Object.new
+
+ def self.client(*args)
+ return Qpid::Client(*args)
+ end
+
+ def self.server(*args)
+ return Server(*args)
+ end
+
+ attr_reader :name, :spec, :auto_sync, :timeout, :channel
+ attr_reader :results, :exceptions
+ attr_accessor :channel, :auto_sync, :send_id, :receiver, :sender
+
+ # FIXME: Pass delegate through a block ?
+ def initialize(name, spec, kwargs = {})
+ auto_sync = true
+ auto_sync = kwargs[:auto_sync] if kwargs.key?(:auto_sync)
+ timeout = kwargs[:timeout] || 10
+ delegate = kwargs[:delegate]
+
+ @name = name
+ @spec = spec
+ @auto_sync = auto_sync
+ @timeout = timeout
+ @invoke_lock = Monitor.new
+ @closing = false
+ @closed = false
+
+ @cond_lock = Monitor.new
+ @condition = @cond_lock.new_cond
+
+ @send_id = true
+ @receiver = Receiver.new(self)
+ @sender = Sender.new(self)
+
+ @lock = Monitor.new
+ @incoming = {}
+ @results = {}
+ @exceptions = []
+
+ @assembly = nil
+
+ @delegate = delegate.call(self) if delegate
+
+ @ctl_seg = spec[:segment_type].enum[:control].value
+ @cmd_seg = spec[:segment_type].enum[:command].value
+ @hdr_seg = spec[:segment_type].enum[:header].value
+ @body_seg = spec[:segment_type].enum[:body].value
+ end
+
+ def incoming(destination)
+ @lock.synchronize do
+ queue = @incoming[destination]
+ unless queue
+ queue = Incoming.new(self, destination)
+ @incoming[destination] = queue
+ end
+ return queue
+ end
+ end
+
+ def error?
+ @exceptions.size > 0
+ end
+
+ def sync(timeout=nil)
+ if channel && Thread.current == channel.connection.thread
+ raise Qpid::Session::Exception, "deadlock detected"
+ end
+ unless @auto_sync
+ execution_sync(:sync => true)
+ end
+ last = @sender.next_id - 1
+ @cond_lock.synchronize do
+ unless @condition.wait_for(timeout) {
+ @sender.completed.include?(last) || error?
+ }
+ raise Qpid::Timeout
+ end
+ end
+ if error?
+ raise Qpid::Session::Exception, exceptions
+ end
+ end
+
+ def close(timeout=nil)
+ @invoke_lock.synchronize do
+ @closing = true
+ channel.session_detach(name)
+ end
+ @cond_lock.synchronize do
+ unless @condition.wait_for(timeout) { @closed }
+ raise Qpid::Timeout
+ end
+ end
+ end
+
+ def closed
+ @lock.synchronize do
+ return if @closed
+
+ @results.each { |id, f| f.error(exceptions) }
+ @results.clear
+
+ @incoming.values.each { |q| q.close(exceptions) }
+ @closed = true
+ @cond_lock.synchronize { @condition.signal }
+ end
+ end
+
+ def resolve_method(name)
+ o = @spec.children[name]
+ case o
+ when Qpid::Spec010::Command
+ return invocation(:method, o)
+ when Qpid::Spec010::Struct
+ return invocation(:method, o)
+ when Qpid::Spec010::Domain
+ return invocation(:value, o.enum) unless o.enum.nil?
+ end
+
+ matches = @spec.children.select { |x|
+ x.name.to_s.include?(name.to_s)
+ }.collect { |x| x.name.to_s }.sort
+ if matches.size == 0
+ msg = nil
+ elsif matches.size == 1
+ msg = "Did you mean #{matches[0]} ? "
+ else
+ msg = "Did you mean one of #{matches.join(",")} ? "
+ end
+ return invocation(:error, msg)
+ end
+
+ def invoke(type, args)
+ # XXX
+ unless type.respond_to?(:track)
+ return type.create(*args)
+ end
+ @invoke_lock.synchronize do
+ return do_invoke(type, args)
+ end
+ end
+
+ def do_invoke(type, args)
+ raise Qpid::Session::Closed if @closing
+ raise Qpid::Session::Detached unless channel
+
+ # Clumsy simulation of Python's keyword args
+ kwargs = {}
+ if args.size > 0 && args[-1].is_a?(Hash)
+ if args.size > type.fields.size
+ kwargs = args.pop
+ elsif type.fields[args.size - 1].type != @spec[:map]
+ kwargs = args.pop
+ end
+ end
+
+ if type.payload
+ if args.size == type.fields.size + 1
+ message = args.pop
+ else
+ message = kwargs.delete(:message) # XXX Really ?
+ end
+ else
+ message = nil
+ end
+
+ hdr = Qpid::struct(@spec[:header])
+ hdr.sync = @auto_sync || kwargs.delete(:sync)
+
+ cmd = type.create(*args.push(kwargs))
+ sc = Qpid::StringCodec.new(@spec)
+ sc.write_command(hdr, cmd)
+
+ seg = Segment.new(true, (message.nil? ||
+ (message.headers.nil? && message.body.nil?)),
+ type.segment_type, type.track, @channel.id, sc.encoded)
+
+ unless type.result.nil?
+ result = Future.new(exception=Exception)
+ @results[@sender.next_id] = result
+ end
+ emit(seg)
+
+ log.debug("SENT %s %s %s" % [seg.id, hdr, cmd]) if log
+
+ unless message.nil?
+ unless message.headers.nil?
+ sc = Qpid::StringCodec.new(@spec)
+ message.headers.each { |st| sc.write_struct32(st) }
+
+ seg = Segment.new(false, message.body.nil?, @hdr_seg,
+ type.track, @channel.id, sc.encoded)
+ emit(seg)
+ end
+ unless message.body.nil?
+ seg = Segment.new(false, true, @body_seg, type.track,
+ @channel.id, message.body)
+ emit(seg)
+ end
+ msg.debug("SENT %s" % message) if msg
+ end
+
+ if !type.result.nil?
+ return @auto_sync ? result.get(@timeout) : result
+ elsif @auto_sync
+ sync(@timeout)
+ end
+ end
+
+ def received(seg)
+ @receiver.received(seg)
+ if seg.first_segment?
+ raise Qpid::Session::Exception unless @assembly.nil?
+ @assembly = []
+ end
+ @assembly << seg
+ if seg.last_segment?
+ dispatch(@assembly)
+ @assembly = nil
+ end
+ end
+
+ def dispatch(assembly)
+ hdr = nil
+ cmd = nil
+ header = nil
+ body = nil
+ assembly.each do |seg|
+ d = seg.decode(@spec)
+ case seg.type
+ when @cmd_seg
+ hdr, cmd = d
+ when @hdr_seg
+ header = d
+ when @body_seg
+ body = d
+ else
+ raise Qpid::Session::Exception
+ end
+ end
+ log.debug("RECV %s %s %s" % [cmd.id, hdr, cmd]) if log
+
+ if cmd.type.payload
+ result = @delegate.send(cmd.type.name, cmd, header, body)
+ else
+ result = @delegate.send(cmd.type.name, cmd)
+ end
+
+ unless cmd.type.result.nil?
+ execution_result(cmd.id, result)
+ end
+
+ if result != INCOMPLETE
+ assembly.each do |seg|
+ @receiver.has_completed(seg)
+ # XXX: don't forget to obey sync for manual completion as well
+ if hdr.sync
+ @channel.session_completed(@receiver.completed)
+ end
+ end
+ end
+ end
+
+ # Python calls this 'send', but that has a special meaning
+ # in Ruby, so we call it 'emit'
+ def emit(seg)
+ @sender.emit(seg)
+ end
+
+ def signal
+ @cond_lock.synchronize { @condition.signal }
+ end
+
+ def wait_for(timeout = nil, &block)
+ @cond_lock.synchronize { @condition.wait_for(timeout, &block) }
+ end
+
+ def to_s
+ "<Session: #{name}, #{channel}>"
+ end
+
+ class Receiver
+
+ attr_reader :completed
+ attr_accessor :next_id, :next_offset
+
+ def initialize(session)
+ @session = session
+ @next_id = nil
+ @next_offset = nil
+ @completed = Qpid::RangedSet.new()
+ end
+
+ def received(seg)
+ if @next_id.nil? || @next_offset.nil?
+ raise Exception, "todo"
+ end
+ seg.id = @next_id
+ seg.offset = @next_offset
+ if seg.last_segment?
+ @next_id += 1
+ @next_offset = 0
+ else
+ @next_offset += seg.payload.size
+ end
+ end
+
+ def has_completed(seg)
+ if seg.id.nil?
+ raise ArgumentError, "cannot complete unidentified segment"
+ end
+ if seg.last_segment?
+ @completed.add(seg.id)
+ end
+ end
+
+ def known_completed(commands)
+ completed = Qpid::RangedSet.new()
+ @completed.ranges.each do |c|
+ unless commands.ranges.find { |kc|
+ kc.contains(c.lower) && kc.contains(c.upper)
+ }
+ completed.add_range(c)
+ end
+ end
+ @completed = completed
+ end
+ end
+
+ class Sender
+
+ def initialize(session)
+ @session = session
+ @next_id = 0.to_serial
+ @next_offset = 0
+ @segments = []
+ @completed = RangedSet.new()
+ end
+
+ attr_reader :next_id, :completed
+
+ def emit(seg)
+ seg.id = @next_id
+ seg.offset = @next_offset
+ if seg.last_segment?
+ @next_id += 1
+ @next_offset = 0
+ else
+ @next_offset += seg.payload.size
+ end
+ @segments << seg
+ if @session.send_id
+ @session.send_id = false
+ @session.channel.session_command_point(seg.id, seg.offset)
+ end
+ @session.channel.connection.write_segment(seg)
+ end
+
+ def has_completed(commands)
+ @segments = @segments.reject { |seg| commands.include?(seg.id) }
+ commands.ranges.each do |range|
+ @completed.add(range.lower, range.upper)
+ end
+ end
+ end
+
+ class Incoming < Qpid::Queue
+
+ def initialize(session, destination)
+ super()
+ @session = session
+ @destination = destination
+ end
+
+ def start
+ @session.message_credit_unit.choices.each do |unit|
+ @session.message_flow(@destination, unit.value, 0xFFFFFFFF)
+ end
+ end
+
+ def stop
+ @session.message_cancel(@destination)
+ listen # Kill the listener
+ end
+ end
+
+ class Delegate
+
+ def initialize(session)
+ @session = session
+ end
+
+ #XXX: do something with incoming accepts
+ def message_accept(ma) nil; end
+
+ def execution_result(er)
+ future = @session.results.delete(er.command_id)
+ future.set(er.value)
+ end
+
+ def execution_exception(ex)
+ @session.exceptions.append(ex)
+ end
+ end
+
+ class Client < Delegate
+
+ def log ; Qpid::logger["qpid.io.msg"]; end
+
+ def message_transfer(cmd, headers, body)
+ m = Qpid::Message.new(body)
+ m.headers = headers
+ m.id = cmd.id
+ messages = @session.incoming(cmd.destination)
+ messages.put(m)
+ log.debug("RECV %s" % m) if log
+ return INCOMPLETE
+ end
+ end
+ end
+end
Added: incubator/qpid/trunk/qpid/ruby/lib/qpid/spec.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/ruby/lib/qpid/spec.rb?rev=713616&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/ruby/lib/qpid/spec.rb (added)
+++ incubator/qpid/trunk/qpid/ruby/lib/qpid/spec.rb Wed Nov 12 18:45:18 2008
@@ -0,0 +1,183 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require "set"
+require "rexml/document"
+require "qpid/fields"
+require "qpid/traverse"
+
+module Qpid
+ module Spec
+
+ include REXML
+
+ class Container < Array
+
+ def initialize()
+ @cache = {}
+ end
+
+ def [](key)
+ return @cache[key] if @cache.include?(key)
+ value = do_lookup(key)
+ @cache[key] = value
+ return value
+ end
+
+ def do_lookup(key)
+ case key
+ when String
+ return find {|x| x.name == key.intern()}
+ when Symbol
+ return find {|x| x.name == key}
+ else
+ return slice(key)
+ end
+ end
+
+ def +(other)
+ copy = clone()
+ copy.concat(other)
+ return copy
+ end
+
+ end
+
+ class Reference
+
+ fields(:name)
+
+ def init(&block)
+ @resolver = block
+ end
+
+ def resolve(*args)
+ @resolver.call(*args)
+ end
+
+ end
+
+ class Loader
+
+ def initialize()
+ @stack = []
+ end
+
+ def container()
+ return Container.new()
+ end
+
+ def load(obj)
+ case obj
+ when String
+ elem = @stack[-1]
+ result = container()
+ elem.elements.each(obj) {|e|
+ @index = result.size
+ result << load(e)
+ }
+ @index = nil
+ return result
+ else
+ elem = obj
+ @stack << elem
+ begin
+ result = send(:"load_#{elem.name}")
+ ensure
+ @stack.pop()
+ end
+ return result
+ end
+ end
+
+ def element
+ @stack[-1]
+ end
+
+ def text
+ element.text
+ end
+
+ def attr(name, type = :string, default = nil, path = nil)
+ if path.nil?
+ elem = element
+ else
+ elem = nil
+ element.elements.each(path) {|elem|}
+ if elem.nil?
+ return default
+ end
+ end
+
+ value = elem.attributes[name]
+ value = value.strip() unless value.nil?
+ if value.nil?
+ default
+ else
+ send(:"parse_#{type}", value)
+ end
+ end
+
+ def parse_int(value)
+ if value.nil?
+ return nil
+ else
+ value.to_i(0)
+ end
+ end
+
+ TRUE = ["yes", "true", "1"].to_set
+ FALSE = ["no", "false", "0", nil].to_set
+
+ def parse_bool(value)
+ if TRUE.include?(value)
+ true
+ elsif FALSE.include?(value)
+ false
+ else
+ raise Exception.new("parse error, expecting boolean: #{value}")
+ end
+ end
+
+ def parse_string(value)
+ value.to_s
+ end
+
+ def parse_symbol(value)
+ value.intern() unless value.nil?
+ end
+
+ REPLACE = {" " => "_", "-" => "_"}
+ KEYWORDS = {"global" => "global_", "return" => "return_"}
+
+ def parse_name(value)
+ return if value.nil?
+
+ REPLACE.each do |k, v|
+ value = value.gsub(k, v)
+ end
+
+ value = KEYWORDS[value] if KEYWORDS.has_key? value
+ return value.intern()
+ end
+
+ end
+
+ end
+end
Added: incubator/qpid/trunk/qpid/ruby/lib/qpid/spec010.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/ruby/lib/qpid/spec010.rb?rev=713616&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/ruby/lib/qpid/spec010.rb (added)
+++ incubator/qpid/trunk/qpid/ruby/lib/qpid/spec010.rb Wed Nov 12 18:45:18 2008
@@ -0,0 +1,497 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require "qpid/spec"
+require 'pathname'
+require 'fileutils'
+
+module Qpid::Spec010
+
+ include Qpid::Spec
+
+ AMQP_SPEC_DEFAULT_DIR = "/usr/share/amqp"
+
+ # XXX: workaround for ruby bug/missfeature
+ Reference = Reference
+ Loader = Loader
+
+ class Spec
+
+ ENCODINGS = {
+ String => "vbin16",
+ Fixnum => "int64",
+ Bignum => "int64",
+ Float => "float",
+ NilClass => "void",
+ Array => "list",
+ Hash => "map"
+ }
+
+ fields(:major, :minor, :port, :children)
+
+ def init()
+ @controls = {}
+ @commands = {}
+ @structs = {}
+ @types = {}
+ children.each {|c|
+ case c
+ when Control
+ @controls[c.code] = c
+ when Command
+ @commands[c.code] = c
+ when Struct
+ @structs[c.code] = c
+ when Type
+ @types[c.code] = c unless c.code.nil?
+ end
+ }
+ end
+
+ attr_reader :controls, :commands, :structs, :types
+
+ def [](key)
+ return @children[key]
+ end
+
+ def encoding(klass)
+ if ENCODINGS.has_key?(klass)
+ return self[ENCODINGS[klass]]
+ end
+ for base in klass.__bases__
+ result = encoding(base)
+ return result unless result.nil?
+ end
+ end
+
+ def inspect; "spec"; end
+ end
+
+ class Constant
+
+ fields(:name, :value)
+
+ attr :parent, true
+
+ end
+
+ class Type
+
+ fields(:name, :code, :fixed, :variable)
+
+ attr :parent, true
+
+ def present?(value)
+ if @fixed == 0
+ return value
+ else
+ return !value.nil?
+ end
+ end
+
+ def encode(codec, value)
+ codec.send("write_#{name}", value)
+ end
+
+ def decode(codec)
+ return codec.send("read_#{name}")
+ end
+
+ def inspect; name; end
+
+ end
+
+ class Domain < Type
+
+ fields(:name, :type, :enum)
+
+ attr :parent, true
+
+ def encode(codec, value)
+ @type.encode(codec, value)
+ end
+
+ def decode(codec)
+ return @type.decode(codec)
+ end
+
+ end
+
+ class Enum
+ fields(:choices)
+
+ def [](choice)
+ case choice
+ when String
+ choice = choice.to_sym
+ return choices.find { |c| c.name == choice }
+ when Symbol
+ return choices.find { |c| c.name == choice }
+ else
+ return choices.find { |c| c.value == choice }
+ end
+ end
+
+ def method_missing(name, *args)
+ raise ArgumentError.new("wrong number of arguments") unless args.empty?
+ return self[name].value
+ end
+
+ end
+
+ class Choice
+ fields(:name, :value)
+ end
+
+ class Composite
+
+ fields(:name, :code, :size, :pack, :fields)
+
+ attr :parent, true
+
+ # Python calls this 'new', but that has special meaning in Ruby
+ def create(*args)
+ return Qpid::struct(self, *args)
+ end
+
+ def decode(codec)
+ codec.read_size(@size)
+ codec.read_uint16() unless @code.nil?
+ return Qpid::struct(self, self.decode_fields(codec))
+ end
+
+ def decode_fields(codec)
+ flags = 0
+ pack.times {|i| flags |= (codec.read_uint8() << 8*i)}
+
+ result = {}
+
+ fields.each_index {|i|
+ f = @fields[i]
+ if flags & (0x1 << i) != 0
+ result[f.name] = f.type.decode(codec)
+ else
+ result[f.name] = nil
+ end
+ }
+
+ return result
+ end
+
+ def encode(codec, value)
+ sc = Qpid::StringCodec.new(@spec)
+ sc.write_uint16(@code) unless @code.nil?
+ encode_fields(sc, value)
+ codec.write_size(@size, sc.encoded.size)
+ codec.write(sc.encoded)
+ end
+
+ def encode_fields(codec, values)
+ # FIXME: This could be written cleaner using select
+ # instead of flags
+ flags = 0
+ fields.each_index do |i|
+ f = fields[i]
+ flags |= (0x1 << i) if f.type.present?(values[f.name])
+ end
+
+ pack.times { |i| codec.write_uint8((flags >> 8*i) & 0xFF) }
+
+ fields.each_index do |i|
+ f = fields[i]
+ f.type.encode(codec, values[f.name]) if flags & (0x1 << i) != 0
+ end
+ end
+
+ def inspect; name; end
+
+ end
+
+ class Field
+
+ fields(:name, :type, :exceptions)
+
+ def default()
+ return nil
+ end
+
+ end
+
+ class Struct < Composite
+
+ def present?(value)
+ return !value.nil?
+ end
+
+ end
+
+ class Action < Composite; end
+
+ class Control < Action
+
+ def segment_type
+ @parent[:segment_type].enum[:control].value
+ end
+
+ def track
+ @parent[:track].enum[:control].value
+ end
+
+ end
+
+ class Command < Action
+
+ attr_accessor :payload, :result
+
+ def segment_type
+ @parent["segment_type"].enum["command"].value
+ end
+
+ def track
+ @parent["track"].enum["command"].value
+ end
+
+ end
+
+ class Doc
+ fields(:type, :title, :text)
+ end
+
+ class Loader010 < Loader
+
+ def initialize()
+ super()
+ end
+
+ def klass
+ cls = element
+ until cls.nil?
+ break if cls.name == "class"
+ cls = cls.parent
+ end
+ return cls
+ end
+
+ def scope
+ if element.name == "struct"
+ return nil
+ else
+ return class_name
+ end
+ end
+
+ def class_name
+ cls = klass
+ if cls.nil?
+ return nil
+ else
+ return parse_name(cls.attributes["name"].strip)
+ end
+ end
+
+ def class_code
+ cls = klass
+ if cls.nil?
+ return 0
+ else
+ return parse_int(cls.attributes["code"].strip)
+ end
+ end
+
+ def parse_decl(value)
+ name = parse_name(value)
+
+ s = scope
+ if s.nil?
+ return name
+ else
+ return :"#{s}_#{name}"
+ end
+ end
+
+ def parse_code(value)
+ c = parse_int(value)
+ if c.nil?
+ return nil
+ else
+ return c | (class_code << 8)
+ end
+ end
+
+ def parse_type(value)
+ name = parse_name(value.sub(".", "_"))
+ cls = class_name
+ return Reference.new {|spec|
+ candidates = [name]
+ candidates << :"#{cls}_#{name}" unless cls.nil?
+ for c in candidates
+ child = spec[c]
+ break unless child.nil?
+ end
+ if child.nil?
+ raise Exception.new("unresolved type: #{name}")
+ else
+ child
+ end
+}
+ end
+
+ def load_amqp()
+ children = nil
+
+ for s in ["constant", "type", "domain", "struct", "control",
+ "command"]
+ ch = load(s)
+ if children.nil?
+ children = ch
+ else
+ children += ch
+ end
+ children += load("class/#{s}")
+ end
+ children += load("class/command/result/struct")
+ Spec.new(attr("major", :int), attr("minor", :int), attr("port", :int),
+ children)
+ end
+
+ def load_constant()
+ Constant.new(attr("name", :decl), attr("value", :int))
+ end
+
+ def load_type()
+ Type.new(attr("name", :decl), attr("code", :code),
+ attr("fixed-width", :int), attr("variable-width", :int))
+ end
+
+ def load_domain()
+ Domain.new(attr("name", :decl), attr("type", :type), load("enum").first)
+ end
+
+ def load_enum()
+ Enum.new(load("choice"))
+ end
+
+ def load_choice()
+ Choice.new(attr("name", :name), attr("value", :int))
+ end
+
+ def load_field()
+ Field.new(attr("name", :name), attr("type", :type))
+ end
+
+ def load_struct()
+ Struct.new(attr("name", :decl), attr("code", :code), attr("size", :int),
+ attr("pack", :int), load("field"))
+ end
+
+ def load_action(cls)
+ cls.new(attr("name", :decl), attr("code", :code), 0, 2, load("field"))
+ end
+
+ def load_control()
+ load_action(Control)
+ end
+
+ def load_command()
+ result = attr("type", :type, nil, "result")
+ result = attr("name", :type, nil, "result/struct") if result.nil?
+ segs = load("segments")
+ cmd = load_action(Command)
+ cmd.result = result
+ cmd.payload = !segs.empty?
+ return cmd
+ end
+
+ def load_result()
+ true
+ end
+
+ def load_segments()
+ true
+ end
+
+ end
+
+ def self.spec_cache(specfile)
+ File::join(File::dirname(__FILE__), "spec_cache",
+ File::basename(specfile, ".xml") + ".rb_marshal")
+ end
+
+ # XXX: could be shared
+ def self.load(spec = nil)
+ return spec if spec.is_a?(Qpid::Spec010::Spec)
+ if spec.nil?
+ # FIXME: Need to add a packaging setup in here so we know where
+ # the installed spec is going to be.
+ specfile = nil
+ if ENV['AMQP_SPEC']
+ specfile = ENV['AMQP_SPEC']
+ else
+ topdir = File::dirname(File::dirname(File::expand_path(__FILE__)))
+ specfile = File::join(topdir, "../../specs", "amqp.0-10-qpid-errata.xml")
+ end
+ else
+ specfile = spec
+ end
+
+ unless Pathname.new(specfile).absolute?
+ path = ENV["AMQP_SPEC_PATH"] || AMQP_SPEC_DEFAULT_DIR
+
+ p = path.split(File::PATH_SEPARATOR).collect { |p|
+ Pathname.new(p).join(specfile)
+ }.find { |p| p.file? }
+ raise "Can not find file for spec #{spec}" unless p
+ specfile = p.to_s
+ end
+
+ specfile_cache = spec_cache(specfile)
+ # FIXME: Check that cache is newer than specfile
+ if File::exist?(specfile_cache)
+ begin
+ spec = File::open(specfile_cache, "r") do |f|
+ Marshal::load(f)
+ end
+ return spec
+ rescue
+ # Ignore, will load from XML
+ end
+ end
+
+ doc = File::open(specfile, "r") { |f| Document.new(f) }
+ spec = Loader010.new().load(doc.root)
+ spec.traverse! do |o|
+ if o.is_a?(Reference)
+ o.resolve(spec)
+ else
+ o
+ end
+ end
+
+ spec.children.each { |c| c.parent = spec }
+
+ begin
+ FileUtils::mkdir_p(File::dirname(specfile_cache))
+ File::open(specfile_cache, "w") { |f| Marshal::dump(spec, f) }
+ rescue
+ # Ignore, we are fine without the cached spec
+ end
+ return spec
+ end
+
+end
Added: incubator/qpid/trunk/qpid/ruby/lib/qpid/spec08.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/ruby/lib/qpid/spec08.rb?rev=713616&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/ruby/lib/qpid/spec08.rb (added)
+++ incubator/qpid/trunk/qpid/ruby/lib/qpid/spec08.rb Wed Nov 12 18:45:18 2008
@@ -0,0 +1,190 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require "qpid/spec"
+
+module Qpid08
+
+ module Spec
+
+ include Qpid::Spec
+
+ # XXX: workaround for ruby bug/missfeature
+ Reference = Reference
+
+ class Root
+ fields(:major, :minor, :classes, :constants, :domains)
+
+ def find_method(name)
+ classes.each do |c|
+ c.methods.each do |m|
+ if name == m.qname
+ return m
+ end
+ end
+ end
+
+ return nil
+ end
+ end
+
+ class Constant
+ fields(:name, :id, :type, :docs)
+ end
+
+ class Domain
+ fields(:name, :type)
+ end
+
+ class Class
+ fields(:name, :id, :handler, :fields, :methods, :docs)
+ end
+
+ class Method
+ fields(:name, :id, :content?, :responses, :synchronous?, :fields,
+ :docs)
+
+ def init()
+ @response = false
+ end
+
+ attr :parent, true
+
+ def response?; @response end
+ def response=(b); @response = b end
+
+ def qname
+ :"#{parent.name}_#{name}"
+ end
+ end
+
+ class Field
+ fields(:name, :id, :type, :docs)
+
+ def default
+ case type
+ when :bit then false
+ when :octet, :short, :long, :longlong then 0
+ when :shortstr, :longstr then ""
+ when :table then {}
+ end
+ end
+ end
+
+ class Doc
+ fields(:type, :text)
+ end
+
+ class Container08 < Container
+ def do_lookup(key)
+ case key
+ when Integer
+ return find {|x| x.id == key}
+ else
+ return super(key)
+ end
+ end
+ end
+
+ class Loader08 < Loader
+
+ def container()
+ return Container08.new()
+ end
+
+ def load_amqp()
+ Root.new(attr("major", :int), attr("minor", :int), load("class"),
+ load("constant"), load("domain"))
+ end
+
+ def load_class()
+ Class.new(attr("name", :name), attr("index", :int), attr("handler", :name),
+ load("field"), load("method"), load("doc"))
+ end
+
+ def load_method()
+ Method.new(attr("name", :name), attr("index", :int),
+ attr("content", :bool), load("response"),
+ attr("synchronous", :bool), load("field"), load("docs"))
+ end
+
+ def load_response()
+ name = attr("name", :name)
+ Reference.new {|spec, klass|
+ response = klass.methods[name]
+ if response.nil?
+ raise Exception.new("no such method: #{name}")
+ end
+ response
+ }
+ end
+
+ def load_field()
+ type = attr("type", :name)
+ if type.nil?
+ domain = attr("domain", :name)
+ type = Reference.new {|spec, klass|
+ spec.domains[domain].type
+ }
+ end
+ Field.new(attr("name", :name), @index, type, load("docs"))
+ end
+
+ def load_constant()
+ Constant.new(attr("name", :name), attr("value", :int), attr("class", :name),
+ load("doc"))
+ end
+
+ def load_domain()
+ Domain.new(attr("name", :name), attr("type", :name))
+ end
+
+ def load_doc()
+ Doc.new(attr("type", :symbol), text)
+ end
+
+ end
+
+ def self.load(spec)
+ case spec
+ when String
+ spec = File.new(spec)
+ end
+ doc = Document.new(spec)
+ spec = Loader08.new().load(doc.root)
+ spec.classes.each do |klass|
+ klass.traverse! do |o|
+ case o
+ when Reference
+ o.resolve(spec, klass)
+ else
+ o
+ end
+ end
+ klass.methods.each do |m|
+ m.parent = klass
+ m.responses.each do |r|
+ r.response = true
+ end
+ end
+ end
+ return spec
+ end
+ end
+end
Added: incubator/qpid/trunk/qpid/ruby/lib/qpid/test.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/ruby/lib/qpid/test.rb?rev=713616&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/ruby/lib/qpid/test.rb (added)
+++ incubator/qpid/trunk/qpid/ruby/lib/qpid/test.rb Wed Nov 12 18:45:18 2008
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require "qpid/spec08"
+require "qpid/client"
+
+module Qpid08
+
+ module Test
+
+ def connect()
+ spec = Spec.load("../specs/amqp.0-8.xml")
+ c = Client.new("0.0.0.0", 5672, spec)
+ c.start({"LOGIN" => "guest", "PASSWORD" => "guest"})
+ return c
+ end
+
+ end
+
+end
Added: incubator/qpid/trunk/qpid/ruby/lib/qpid/traverse.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/ruby/lib/qpid/traverse.rb?rev=713616&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/ruby/lib/qpid/traverse.rb (added)
+++ incubator/qpid/trunk/qpid/ruby/lib/qpid/traverse.rb Wed Nov 12 18:45:18 2008
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+class Object
+
+ public
+
+ def traverse()
+ traverse! {|o| yield(o); o}
+ end
+
+ def traverse_children!()
+ instance_variables.each {|v|
+ value = instance_variable_get(v)
+ replacement = yield(value)
+ instance_variable_set(v, replacement) unless replacement.equal? value
+ }
+ end
+
+ def traverse!(replacements = {})
+ return replacements[__id__] if replacements.has_key? __id__
+ replacement = yield(self)
+ replacements[__id__] = replacement
+ traverse_children! {|o| o.traverse!(replacements) {|c| yield(c)}}
+ return replacement
+ end
+
+end
+
+class Array
+ def traverse_children!()
+ map! {|o| yield(o)}
+ end
+end
+
+class Hash
+ def traverse_children!()
+ mods = {}
+ each_pair {|k, v|
+ key = yield(k)
+ value = yield(v)
+ mods[key] = value unless key.equal? k and value.equal? v
+ delete(k) unless key.equal? k
+ }
+
+ merge!(mods)
+ end
+end
Added: incubator/qpid/trunk/qpid/ruby/lib/qpid/util.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/ruby/lib/qpid/util.rb?rev=713616&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/ruby/lib/qpid/util.rb (added)
+++ incubator/qpid/trunk/qpid/ruby/lib/qpid/util.rb Wed Nov 12 18:45:18 2008
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require 'thread'
+require 'monitor'
+
+# Monkeypatch
+class MonitorMixin::ConditionVariable
+
+ # Wait until BLOCK returns TRUE or TIMEOUT seconds have passed
+ # Return TRUE if BLOCK returned TRUE within the TIMEOUT, FALSE
+ # otherswise
+ def wait_for(timeout=nil, &block)
+ start = Time.now
+ passed = 0
+ until yield
+ if timeout.nil?
+ wait
+ elsif passed < timeout
+ wait(timeout)
+ else
+ return false
+ end
+ passed = Time.now - start
+ end
+ return true
+ end
+end
+
+module Qpid::Util
+
+ # Similar to Python's threading.Event
+ class Event
+ def initialize
+ @monitor = Monitor.new
+ @cond = @monitor.new_cond
+ @set = false
+ end
+
+ def set
+ @monitor.synchronize do
+ @set = true
+ @cond.signal
+ end
+ end
+
+ def clear
+ @monitor.synchronize { @set = false }
+ end
+
+ def wait(timeout = nil)
+ @monitor.synchronize do
+ unless @set
+ @cond.wait_for(timeout) { @set }
+ end
+ end
+ end
+ end
+end