You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/11/13 03:45:19 UTC

svn commit: r713616 [2/3] - in /incubator/qpid/trunk/qpid/ruby: ./ examples/ lib/ lib/qpid/ qpid/ tests/ tests_0-8/

Added: incubator/qpid/trunk/qpid/ruby/lib/qpid/qmf.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/ruby/lib/qpid/qmf.rb?rev=713616&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/ruby/lib/qpid/qmf.rb (added)
+++ incubator/qpid/trunk/qpid/ruby/lib/qpid/qmf.rb Wed Nov 12 18:45:18 2008
@@ -0,0 +1,1503 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Console API for Qpid Management Framework
+
+require 'socket'
+require 'monitor'
+require 'uri'
+require 'time'
+
+module Qpid::Qmf
+
+  # To access the asynchronous operations, a class must be derived from
+  # Console with overrides of any combination of the available methods.
+  class Console
+
+    # Invoked when a connection is established to a broker
+    def broker_connected(broker); end
+
+    # Invoked when the connection to a broker is lost
+    def broker_disconnected(broker); end
+
+    # Invoked when a QMF package is discovered
+    def new_package(name); end
+
+    # Invoked when a new class is discovered.  Session.getSchema can be
+    # used to obtain details about the class
+    def new_class(kind, klass_key); end
+
+    # Invoked when a QMF agent is discovered
+    def new_agent(agent); end
+
+    # Invoked when a QMF agent disconects
+    def del_agent(agent); end
+
+    # Invoked when an object is updated
+    def object_props(broker, record); end
+
+    # Invoked when an object is updated
+    def objectStats(broker, record); end
+
+    # Invoked when an event is raised
+    def event(broker, event); end
+
+    def heartbeat(agent, timestamp); end
+
+    def broker_info(broker); end
+  end
+
+  class BrokerURL
+    def initialize(text)
+      uri = URI.parse(text)
+
+      @host = uri.host
+      @port = uri.port ? uri.port : 5672
+      @auth_name = uri.user ? uri.user : "guest"
+      @auth_pass = uri.password ? uri.password: "guest"
+      @auth_mech = "PLAIN"
+
+      return uri
+    end
+
+    def name
+      "#{@host}:#{@port}"
+    end
+
+    def match(host, port)
+      # FIXME: Unlcear what the Python code is actually checking for
+      # here, especially since HOST can resolve to multiple IP's
+      @port == port &&
+        (host == @host || ipaddr(host, port) == ipaddr(@host, @port))
+    end
+
+    private
+    def ipaddr(host, port)
+      s = Socket::getaddrinfo(host, port,
+                              Socket::AF_INET, Socket::SOCK_STREAM)
+      s[0][2]
+    end
+  end
+
+  # An instance of the Session class represents a console session running
+  # against one or more QMF brokers.  A single instance of Session is
+  # needed to interact with the management framework as a console.
+  class Session
+    CONTEXT_SYNC     = 1
+    CONTEXT_STARTUP  = 2
+    CONTEXT_MULTIGET = 3
+
+    GET_WAIT_TIME = 60
+
+    include MonitorMixin
+
+    attr_reader :binding_key_list, :select, :seq_mgr, :console, :packages
+
+    # Initialize a session.  If the console argument is provided, the
+    # more advanced asynchronous features are available.  If console is
+    # defaulted, the session will operate in a simpler, synchronous
+    # manner. The rcvObjects, rcvEvents, and rcvHeartbeats arguments
+    # are meaningful only if 'console' is provided.  They control
+    # whether object updates, events, and agent-heartbeats are
+    # subscribed to.  If the console is not interested in receiving one
+    # or more of the above, setting the argument to False will reduce
+    # tha bandwidth used by the API. If manageConnections is set to
+    # True, the Session object will manage connections to the brokers.
+    # This means that if a broker is unreachable, it will retry until a
+    # connection can be established.  If a connection is lost, the
+    # Session will attempt to reconnect.
+    #
+    # If manageConnections is set to False, the user is responsible for
+    # handing failures. In this case, an unreachable broker will cause
+    # addBroker to raise an exception. If userBindings is set to False
+    # (the default) and rcvObjects is True, the console will receive
+    # data for all object classes.  If userBindings is set to True, the
+    # user must select which classes the console shall receive by
+    # invoking the bindPackage or bindClass methods. This allows the
+    # console to be configured to receive only information that is
+    # relavant to a particular application.  If rcvObjects id False,
+    # userBindings has no meaning.
+    #
+    # Accept a hash of parameters, where keys can be :console,
+    # :rcv_objects, :rcv_events, :rcv_heartbeats, :manage_connections,
+    # and :user_bindings
+    def initialize(kwargs = {})
+      super()
+      @console           = kwargs[:console] || nil
+      @brokers           = []
+      @packages          = {}
+      @seq_mgr           = SequenceManager.new
+      @cv                = new_cond
+      @sync_sequence_list = []
+      @result         = []
+      @select         = []
+      @error             = nil
+      @rcv_objects       = kwargs[:rcv_objects] || true
+      @rcv_events        = kwargs[:rcv_events] || true
+      @rcv_heartbeats    = kwargs[:rcv_heartbeats] || true
+      @user_bindings     = kwargs[:user_bindings] || false
+      unless @console
+        @rcv_objects    = false
+        @rcv_events     = false
+        @rcv_heartbeats = false
+      end
+      @binding_key_list    = binding_keys
+      @manage_connections  = kwargs[:manage_connections] || false
+
+      if @user_bindings && ! @rcv_objects
+        raise ArgumentError, "user_bindings can't be set unless rcv_objects is set and a console is provided"
+      end
+
+      #raise NotImplementedError, @manage_connections
+    end
+
+    def to_s
+      "QMF Console Session Manager (brokers connected: #{@brokers.size})"
+    end
+
+    # Connect to a Qpid broker.  Returns an object of type Broker
+    def add_broker(target="amqp://localhost")
+      uri = URI.parse(target)
+      broker = Broker.new(self, uri.host, uri.port, "PLAIN", uri.user, uri.password)
+      unless broker.connected? || @manage_connections
+        raise broker.error
+      end
+
+      @brokers << broker
+      objects(:broker => broker, :class => "agent")
+      return broker
+    end
+
+    # Disconnect from a broker.  The 'broker' argument is the object
+    # returned from the addBroker call
+    def del_broker(broker)
+      broker.shutdown
+      @brokers.delete(broker)
+    end
+
+    # Get the list of known classes within a QMF package
+    def classes(package_name)
+      @brokers.each { |broker| broker.wait_for_stable }
+      if @packages.include?(package_name)
+        # FIXME What's the actual structure of @packages[package_name]
+        @packages[package_name].inject([]) do |list, cname, hash|
+          list << [ package_name, cname, hash]
+        end
+      end
+    end
+
+    # Get the schema for a QMF class
+    def schema(klass_key)
+      @brokers.each { |broker| broker.wait_for_stable }
+      pname, cname, hash = klass_key
+      if @packages.include?(pname)
+        @packages[pname][ [cname, hash] ]
+      end
+    end
+
+    def bind_package(package_name)
+      unless @user_bindings && @rcv_objects
+        raise "userBindings option not set for Session"
+      end
+      @brokers.each do |broker|
+        args = { :exchange => "qpid.management",
+          :queue => broker.topicName,
+          :binding_key => "console.obj.#{package_name}" }
+        broker.amqpSession.exchange_bind(args)
+      end
+    end
+
+    def bind_class(klass_key)
+      unless @user_bindings && @rcv_objects
+        raise "userBindings option not set for Session"
+      end
+      pname, cname, hash = klass_key
+      @brokers.each do |broker|
+        args = { :exchange => "qpid.management",
+          :queue => broker.topicName,
+          :binding_key => "console.obj.#{pname}.#{cname}" }
+        broker.amqpSession.exchange_bind(args)
+      end
+    end
+
+    # Get a list of currently known agents
+    def agents(broker=nil)
+      broker_list = []
+      if broker.nil?
+        broker_list = @brokers.dup
+      else
+        broker_list << broker
+      end
+      broker_list.each { |b| b.wait_for_stable }
+      agent_list = []
+      broker_list.each { |b| agent_list += b.agents }
+      return agent_list
+    end
+
+    # Get a list of objects from QMF agents.
+    # All arguments are passed by name(keyword).
+    #
+    # The class for queried objects may be specified in one of the
+    # following ways:
+    # :schema => <schema> - supply a schema object returned from getSchema.
+    # :key => <key>       - supply a klass_key from the list returned by getClasses.
+    # :class => <name>    - supply a class name as a string.  If the class name exists
+    #                       in multiple packages, a _package argument may also be supplied.
+    # :object_id = <id>   - get the object referenced by the object-id
+    #
+    # If objects should be obtained from only one agent, use the following argument.
+    # Otherwise, the query will go to all agents.
+    #
+    # :agent = <agent> - supply an agent from the list returned by getAgents.
+    # If the get query is to be restricted to one broker (as opposed to
+    # all connected brokers), add the following argument:
+    #
+    # :broker = <broker> - supply a broker as returned by addBroker.
+    #
+    # If additional arguments are supplied, they are used as property
+    # selectors, as long as their keys are strings.  For example, if
+    # the argument "name" => "test" is supplied, only objects whose
+    # "name" property is "test" will be returned in the result.
+    def objects(kwargs)
+      if kwargs.include?(:broker)
+        broker_list = []
+        broker_list << kwargs[:broker]
+      else
+        broker_list = @brokers
+      end
+      broker_list.each { |broker| broker.wait_for_stable }
+
+      agent_list = []
+      if kwargs.include?(:agent)
+        agent = kwargs[:agent]
+        unless broker_list.include?(agent.broker)
+          raise ArgumentError, "Supplied agent is not accessible through the supplied broker"
+        end
+        agent_list << agent
+      else
+        broker_list.each { |broker| agent_list += broker.agents }
+      end
+
+      cname = nil
+      if kwargs.include?(:schema)
+        # FIXME: What kind of object is kwargs[:schema]
+        pname, cname, hash = kwargs[:schema].getKey()
+      elsif kwargs.include?(:key)
+        pname, cname, hash = kwargs[:key]
+      elsif kwargs.include?(:class)
+        pname, cname, hash = [kwargs[:package], kwargs[:class], nil]
+      end
+      if cname.nil? && ! kwargs.include?(:object_id)
+        raise ArgumentError,
+        "No class supplied, use :schema, :key, :class, or :object_id' argument"
+      end
+
+      map = {}
+      @select = []
+      if kwargs.include?(:object_id)
+        map["_objectId"] = kwargs[:object_id].to_str
+      else
+        map["_class"] = cname
+        map["_package"] = pname if pname
+        map["_hash"]    = hash  if hash
+        kwargs.each do |k,v|
+          @select << [k, v] if k.is_a?(String)
+        end
+      end
+
+      @result = []
+      agent_list.each do |agent|
+        broker = agent.broker
+        send_codec = Qpid::StringCodec.new(broker.conn.spec)
+        seq = nil
+        synchronize do
+          seq = @seq_mgr.reserve(CONTEXT_MULTIGET)
+          @sync_sequence_list << seq
+        end
+        broker.set_header(send_codec, ?G, seq)
+        send_codec.write_map(map)
+        smsg = broker.message(send_codec.encoded, "agent.#{agent.bank}")
+        broker.emit(smsg)
+      end
+
+      timeout = false
+      synchronize do
+        unless @cv.wait_for(GET_WAIT_TIME) {
+            @sync_sequence_list.empty? || @error }
+          @sync_sequence_list.each do |pending_seq|
+            @seq_mgr.release(pending_seq)
+          end
+          @sync_sequence_list = []
+          timeout = true
+        end
+      end
+
+      if @error
+        errorText = @error
+        @error = nil
+        raise errorText
+      end
+
+      if @result.empty? && timeout
+        raise "No agent responded within timeout period"
+      end
+      @result
+    end
+
+    def set_event_filter(kwargs); end
+
+    def handle_broker_connect(broker); end
+
+    def handle_broker_resp(broker, codec, seq)
+      broker.broker_id = codec.read_uuid
+      @console.broker_info(broker) if @console
+
+      # Send a package request
+      # (effectively inc and dec outstanding by not doing anything)
+      send_codec = Qpid::StringCodec.new(broker.conn.spec)
+      seq = @seq_mgr.reserve(CONTEXT_STARTUP)
+      broker.set_header(send_codec, ?P, seq)
+      smsg = broker.message(send_codec.encoded)
+      broker.emit(smsg)
+    end
+
+    def handle_package_ind(broker, codec, seq)
+      pname = codec.read_str8
+      new_package = false
+      synchronize do
+        new_package = ! @packages.include?(pname)
+        @packages[pname] = {} if new_package
+      end
+      @console.new_package(pname) if @console
+
+      # Send a class request
+      broker.inc_outstanding
+      send_codec = Qpid::StringCodec.new(broker.conn.spec)
+      seq = @seq_mgr.reserve(CONTEXT_STARTUP)
+      broker.set_header(send_codec, ?Q, seq)
+      send_codec.write_str8(pname)
+      smsg = broker.message(send_codec.encoded)
+      broker.emit(smsg)
+    end
+
+    def handle_command_complete(broker, codec, seq)
+      code = codec.read_uint32
+      text = codec.read_str8
+      context = @seq_mgr.release(seq)
+      if context == CONTEXT_STARTUP
+        broker.dec_outstanding
+      elsif context == CONTEXT_SYNC && seq == broker.sync_sequence
+        broker.sync_done
+      elsif context == CONTEXT_MULTIGET && @sync_sequence_list.include?(seq)
+        synchronize do
+          @sync_sequence_list.delete(seq)
+          @cv.signal if @sync_sequence_list.empty?
+        end
+      end
+    end
+
+    def handle_class_ind(broker, codec, seq)
+      kind  = codec.read_uint8
+      pname = codec.read_str8
+      cname = codec.read_str8
+      hash  = codec.read_bin128
+      unknown = false
+
+      synchronize do
+        return unless @packages.include?(pname)
+        unknown = true unless @packages[pname].include?([cname, hash])
+      end
+
+      if unknown
+        # Send a schema request for the unknown class
+        broker.inc_outstanding
+        send_codec = Qpid::StringCodec.new(broker.conn.spec)
+        seq = @seq_mgr.reserve(CONTEXT_STARTUP)
+        broker.set_header(send_codec, ?S, seq)
+        send_codec.write_str8(pname)
+        send_codec.write_str8(cname)
+        send_codec.write_bin128(hash)
+        smsg = broker.message(send_codec.encoded)
+        broker.emit(smsg)
+      end
+    end
+
+    def handle_method_resp(broker, codec, seq)
+      code = codec.read_uint32
+
+      text = codec.read_str16
+      out_args = {}
+      method, synchronous = @seq_mgr.release(seq)
+      if code == 0
+        method.arguments.each do |arg|
+          if arg.dir.index(?O)
+            out_args[arg.name] = decode_value(codec, arg.type)
+          end
+        end
+      end
+      result = MethodResult.new(code, text, out_args)
+      if synchronous:
+          broker.synchronize do
+          broker.sync_result = MethodResult.new(code, text, out_args)
+          broker.sync_done
+        end
+      else
+        @console.method_response(broker, seq, result) if @console
+      end
+    end
+
+    def handle_heartbeat_ind(broker, codec, seq, msg)
+      if @console
+        broker_bank = 1
+        agent_bank = 0
+        dp = msg.get("delivery_properties")
+        if dp
+          key = dp["routing_key"]
+          key_elements = key.split(".")
+          if key_elements.length == 4
+            broker_bank = key_elements[2].to_i
+            agent_bank = key_lements[2].to_i
+          end
+        end
+        agent = broker.agent(broker_bank, agent_bank)
+        timestamp = codec.read_uint64
+        @console.heartbeat(agent, timestamp)
+      end
+    end
+
+    def handle_event_ind(broker, codec, seq)
+      if @console
+        event = Event.new(self, broker, codec)
+        @console.event(broker, event)
+      end
+    end
+
+    def handle_schema_resp(broker, codec, seq)
+      kind  = codec.read_uint8
+      pname = codec.read_str8
+      cname = codec.read_str8
+      hash  = codec.read_bin128
+      klass_key = [pname, cname, hash]
+      klass = SchemaClass.new(kind, klass_key, codec)
+      synchronize { @packages[pname][ [cname, hash] ] = klass }
+
+      @seq_mgr.release(seq)
+      broker.dec_outstanding
+      @console.new_class(kind, klass_key) if @console
+    end
+
+    def handle_content_ind(broker, codec, seq, prop=false, stat=false)
+      pname = codec.read_str8
+      cname = codec.read_str8
+      hash  = codec.read_bin128
+      klass_key = [pname, cname, hash]
+
+      schema = nil
+      synchronize do
+        return unless @packages.include?(pname)
+        return unless @packages[pname].include?([cname, hash])
+        schema = @packages[pname][ [cname, hash] ]
+      end
+
+      object = Qpid::Qmf::Object.new(self, broker, schema, codec, prop, stat)
+      if pname == "org.apache.qpid.broker" && cname == "agent"
+        broker.update_agent(object)
+      end
+
+      synchronize do
+        if @sync_sequence_list.include?(seq)
+          if object.timestamps()[2] == 0 && select_match(object)
+            @result << object
+          end
+          return
+        end
+      end
+
+      @console.object_props(broker, object) if @console && prop
+      @console.object_stats(broker, object) if @console && stat
+    end
+
+    def handle_broker_disconnect(broker); end
+
+    def handle_error(error)
+      @error = error
+      synchronize do
+        @sync_sequence_list = []
+        @cv.signal
+      end
+    end
+
+    # Decode, from the codec, a value based on its typecode
+    def decode_value(codec, typecode)
+      case typecode
+      when 1:  data = codec.read_uint8      # U8
+      when 2:  data = codec.read_uint16     # U16
+      when 3:  data = codec.read_uint32     # U32
+      when 4:  data = codec.read_uint64     # U64
+      when 6:  data = codec.read_str8       # SSTR
+      when 7:  data = codec.read_str16      # LSTR
+      when 8:  data = codec.read_int64      # ABSTIME
+      when 9:  data = codec.read_uint64     # DELTATIME
+      when 10: data = ObjectId.new(codec)   # REF
+      when 11: data = codec.read_uint8 != 0 # BOOL
+      when 12: data = codec.read_float      # FLOAT
+      when 13: data = codec.read_double     # DOUBLE
+      when 14: data = codec.read_uuid       # UUID
+      when 15: data = codec.read_map        # FTABLE
+      when 16: data = codec.read_int8       # S8
+      when 17: data = codec.read_int16      # S16
+      when 18: data = codec.read_int32      # S32
+      when 19: data = codec.read_int64      # S64
+      else
+        raise ArgumentError, "Invalid type code: #{typecode} - #{typecode.inspect}"
+      end
+      return data
+    end
+
+    # Encode, into the codec, a value based on its typecode
+    def encode_value(codec, value, typecode)
+      # FIXME: Python does a lot of magic type conversions
+      # We just assume that value has the right type; this is safer
+      # than coercing explicitly, since Array::pack will complain
+      # loudly about various type errors
+      case typecode
+      when 1:  codec.write_uint8(value)         # U8
+      when 2:  codec.write_uint16(value)        # U16
+      when 3:  codec.write_uint32(value)        # U32
+      when 4:  codec.write_uint64(value)        # U64
+      when 6:  codec.write_str8(value)          # SSTR
+      when 7:  codec.write_str16(value)         # LSTR
+      when 8:  codec.write_int64(value)         # ABSTIME
+      when 9:  codec.write_uint64(value)        # DELTATIME
+      when 10: value.encode(codec)              # REF
+      when 11: codec.write_uint8(value ? 1 : 0) # BOOL
+      when 12: codec.write_float(value)         # FLOAT
+      when 13: codec.write_double(value)        # DOUBLE
+      when 14: codec.write_uuid(value)          # UUID
+      when 15: codec.write_map(value)           # FTABLE
+      when 16: codec.write_int8(value)          # S8
+      when 17: codec.write_int16(value)         # S16
+      when 18: codec.write_int32(value)         # S32
+      when 19: codec.write_int64(value)         # S64
+      else
+        raise ValueError, "Invalid type code: %d" % typecode
+      end
+    end
+
+    def display_value(value, typecode)
+      case typecode
+      when 1:  return value.to_s
+      when 2:  return value.to_s
+      when 3:  return value.to_s
+      when 4:  return value.to_s
+      when 6:  return value.to_s
+      when 7:  return value.to_s
+      when 8:  return strftime("%c", gmtime(value / 1000000000))
+      when 9:  return value.to_s
+      when 10: return value.to_s
+      when 11: return value ? 'T' : 'F'
+      when 12: return value.to_s
+      when 13: return value.to_s
+      when 14: return Qpid::UUID::format(hash)
+      when 15: return value.to_s
+      when 16: return value.to_s
+      when 17: return value.to_s
+      when 18: return value.to_s
+      when 19: return value.to_s
+      else
+        raise ValueError, "Invalid type code: %d" % typecode
+      end
+    end
+
+    private
+
+    def binding_keys
+      key_list = []
+      key_list << "schema.#"
+      if @rcv_objects && @rcv_events && @rcv_heartbeats &&
+          ! @user_bindings
+        key_list << "console.#"
+      else
+        if @rcv_objects && ! @user_bindings
+          key_list << "console.obj.#"
+        else
+          key_list << "console.obj.org.apache.qpid.broker.agent"
+        end
+        key_list << "console.event.#" if @rcv_events
+        key_list << "console.heartbeat" if @rcv_heartbeats
+      end
+      return key_list
+    end
+
+    # Check the object against select to check for a match
+    def select_match(object)
+      select.each do |key, value|
+        object.properties.each do |prop, propval|
+          return false if key == prop.name && value != propval
+        end
+      end
+      return true
+    end
+
+  end
+
+  class Package
+    attr_reader :name
+
+    def initialize(name)
+      @name = name
+    end
+  end
+
+  class ClassKey
+    attr_reader :package, :klass_name, :hash
+
+    def initialize(package, klass_name, hash)
+      @package = package
+      @klass_name = klass_name
+      @hash = hash
+    end
+  end
+
+  class SchemaClass
+
+    CLASS_KIND_TABLE = 1
+    CLASS_KIND_EVENT = 2
+
+    attr_reader :klass_key, :properties, :statistics, :methods, :arguments
+
+    def initialize(kind, key, codec)
+      @kind = kind
+      @klass_key = key
+      @properties = []
+      @statistics = []
+      @methods = []
+      @arguments = []
+
+      if @kind == CLASS_KIND_TABLE
+        prop_count   = codec.read_uint16
+        stat_count   = codec.read_uint16
+        method_count = codec.read_uint16
+        prop_count.times { |idx|
+          @properties << SchemaProperty.new(codec) }
+        stat_count.times { |idx|
+          @statistics << SchemaStatistic.new(codec) }
+        method_count.times { |idx|
+          @methods<< SchemaMethod.new(codec) }
+      elsif @kind == CLASS_KIND_EVENT
+        arg_count = codec.read_uint16
+        arg_count.times { |idx|
+          sa = SchemaArgument.new(codec, false)
+          @arguments << sa
+        }
+      end
+    end
+
+    def to_s
+      pname, cname, hash = @klass_key
+      if @kind == CLASS_KIND_TABLE
+        kind_str = "Table"
+      elsif @kind == CLASS_KIND_EVENT
+        kind_str = "Event"
+      else
+        kind_str = "Unsupported"
+      end
+      result = "%s Class: %s:%s " % [kind_str, pname, cname]
+      result += Qpid::UUID::format(hash)
+      return result
+    end
+  end
+
+  class SchemaProperty
+
+    attr_reader :name, :type, :access, :index, :optional,
+    :unit, :min, :max, :maxlan, :desc
+
+    def initialize(codec)
+      map = codec.read_map
+      @name     = map["name"]
+      @type     = map["type"]
+      @access   = map["access"]
+      @index    = map["index"] != 0
+      @optional = map["optional"] != 0
+      @unit     = map["unit"]
+      @min      = map["min"]
+      @max      = map["max"]
+      @maxlan   = map["maxlen"]
+      @desc     = map["desc"]
+    end
+
+    def to_s
+      @name
+    end
+  end
+
+  class SchemaStatistic
+
+    attr_reader :name, :type, :unit, :desc
+
+    def initialize(codec)
+      map = codec.read_map
+      @name     = map["name"]
+      @type     = map["type"]
+      @unit     = map["unit"]
+      @desc     = map["desc"]
+    end
+
+    def to_s
+      @name
+    end
+  end
+
+  class SchemaMethod
+
+    attr_reader :name, :desc, :arguments
+
+    def initialize(codec)
+      map = codec.read_map
+      @name = map["name"]
+      arg_count  = map["argCount"]
+      @desc = map["desc"]
+      @arguments = []
+      arg_count.times { |idx|
+        @arguments << SchemaArgument.new(codec, true)
+      }
+    end
+
+    def to_s
+      result = @name + "("
+      first = true
+      result += @arguments.select { |arg| arg.dir.index(?I) }.join(", ")
+      result += ")"
+      return result
+    end
+  end
+
+  class SchemaArgument
+
+    attr_reader :name, :type, :dir, :unit, :min, :max, :maxlen
+    attr_reader :desc, :default
+
+    def initialize(codec, method_arg)
+      map = codec.read_map
+      @name    = map["name"]
+      @type    = map["type"]
+      @dir     = map["dir"].upcase if method_arg
+      @unit    = map["unit"]
+      @min     = map["min"]
+      @max     = map["max"]
+      @maxlen  = map["maxlen"]
+      @desc    = map["desc"]
+      @default = map["default"]
+    end
+  end
+
+  # Object that represents QMF object identifiers
+  class ObjectId
+
+    include Comparable
+
+    attr_reader :first, :second
+
+    def initialize(codec, first=0, second=0)
+      if codec
+        @first  = codec.read_uint64
+        @second = codec.read_uint64
+      else
+        @first = first
+        @second = second
+      end
+    end
+
+    def <=>(other)
+      return 1 unless other.is_a?(ObjectId)
+      return -1 if first < other.first
+      return 1  if first > other.first
+      return second <=> other.second
+    end
+
+    def to_s
+      "%d-%d-%d-%d-%d" % [flags, sequence, broker_bank, agent_bank, object]
+    end
+
+    def index
+      [first, second]
+    end
+
+    def flags
+      (first & 0xF000000000000000) >> 60
+    end
+
+    def sequence
+      (first & 0x0FFF000000000000) >> 48
+    end
+
+    def broker_bank
+      (first & 0x0000FFFFF0000000) >> 28
+    end
+
+    def agent_bank
+      first & 0x000000000FFFFFFF
+    end
+
+    def object
+      second
+    end
+
+    def durable?
+      sequence == 0
+    end
+
+    def encode(codec)
+      codec.write_uint64(first)
+      codec.write_uint64(second)
+    end
+  end
+
+  class Object
+
+    attr_reader :object_id, :schema, :properties, :statistics,
+    :current_time, :create_time, :delete_time, :broker
+
+    def initialize(session, broker, schema, codec, prop, stat)
+      @session = session
+      @broker  = broker
+      @schema  = schema
+      @current_time = codec.read_uint64
+      @create_time  = codec.read_uint64
+      @delete_time  = codec.read_uint64
+      @object_id   = ObjectId.new(codec)
+      @properties  = []
+      @statistics  = []
+      if prop
+        missing = parse_presence_masks(codec, schema)
+        schema.properties.each do |property|
+          v = nil
+          unless missing.include?(property.name)
+            v = @session.decode_value(codec, property.type)
+          end
+          @properties << [property, v]
+        end
+      end
+
+      if stat
+        schema.statistics.each do |statistic|
+          s = @session.decode_value(codec, statistic.type)
+          @statistics << [statistic, s]
+        end
+      end
+    end
+
+    def klass_key
+      @schema.klass_key
+    end
+
+
+    def methods
+      @schema.methods
+    end
+
+    # Return the current, creation, and deletion times for this object
+    def timestamps
+      return [@current_time, @create_time, @delete_time]
+    end
+
+    # Return a string describing this object's primary key
+    def index
+      @properties.select { |property, value|
+        property.index
+      }.collect { |property,value|
+        value.to_s }.join(":")
+    end
+
+    # Replace properties and/or statistics with a newly received update
+    def merge_update(newer)
+      unless object_id == newer.object_id
+        raise "Objects with different object-ids"
+      end
+      @properties = newer.getProperties unless newer.properties.empty?
+      @statistics = newer.getStatistics unless newer.statistics.empty?
+    end
+
+    def to_s
+      index
+    end
+
+    # This must be defined because ruby has this (deprecated) method built in.
+    def id
+      method_missing(:id)
+    end
+
+    # Same here..
+    def type
+      method_missing(:type)
+    end
+
+    def name
+      method_missing(:name)
+    end
+
+    def method_missing(name, *args)
+      name = name.to_s
+
+      if method = @schema.methods.find { |method| name == method.name }
+        return invoke(method, name, args)
+      end
+
+      @properties.each do |property, value|
+        return value if name == property.name
+        if name == "_#{property.name}_" && property.type == 10
+          # Dereference references
+          deref = @session.objects(:object_id => value, :broker => @broker)
+          return nil unless deref.size == 1
+          return deref[0]
+        end
+      end
+      @statistics.each do |statistic, value|
+        if name == statistic.name
+          return value
+        end
+      end
+      raise "Type Object has no attribute '#{name}'"
+    end
+
+    private
+
+    def send_method_request(method, name, args, synchronous = false)
+      @schema.methods.each do |schema_method|
+        if name == schema_method.name
+          send_codec = Qpid::StringCodec.new(@broker.conn.spec)
+          seq = @session.seq_mgr.reserve([schema_method, synchronous])
+          @broker.set_header(send_codec, ?M, seq)
+          @object_id.encode(send_codec)
+          pname, cname, hash = @schema.klass_key
+          send_codec.write_str8(pname)
+          send_codec.write_str8(cname)
+          send_codec.write_bin128(hash)
+          send_codec.write_str8(name)
+
+          formals = method.arguments.select { |arg| arg.dir.index(?I) }
+          count = method.arguments.select { |arg| arg.dir.index(?I) }.size
+          unless formals.size == args.size
+            raise "Incorrect number of arguments: expected #{formals.size}, got #{args.size}"
+          end
+
+          formals.zip(args).each do |formal, actual|
+            @session.encode_value(send_codec, actual, formal.type)
+          end
+
+          smsg = @broker.message(send_codec.encoded,
+                                 "agent.#{object_id.broker_bank}.#{object_id.agent_bank}")
+
+          @broker.sync_start if synchronous
+          @broker.emit(smsg)
+
+          return seq
+        end
+      end
+    end
+
+    def invoke(method, name, args)
+      if send_method_request(method, name, args, synchronous = true)
+        unless @broker.wait_for_sync_done
+          @session.seq_mgr.release(seq)
+          raise "Timed out waiting for method to respond"
+        end
+
+        if @broker.error
+          error_text = @broker.error
+          @broker.error = nil
+          raise error_text
+        end
+
+        return @broker.sync_result
+      end
+      raise "Invalid Method (software defect) [#{name}]"
+    end
+
+    def parse_presence_masks(codec, schema)
+      exclude_list = []
+      bit = 0
+      schema.properties.each do |property|
+        if property.optional
+          if bit == 0
+            mask = codec.read_uint8
+            bit = 1
+          end
+          if (mask & bit) == 0
+            exclude_list << property.name
+          end
+          bit *= 2
+          bit = 0 if bit == 256
+        end
+      end
+      return exclude_list
+    end
+  end
+
+  class MethodResult
+
+    attr_reader :status, :text
+
+    def initialize(status, text, out_args)
+      @status   = status
+      @text     = text
+      @out_args = out_args
+    end
+
+    def method_missing(name)
+      name = name.to_s()
+      if @out_args.include?(name)
+        return @out_args[name]
+      else
+        raise "Unknown method result arg #{name}"
+      end
+    end
+
+    def to_s
+      "#{text} (#{status}) - #{out_args.inspect}"
+    end
+  end
+
+  class Broker
+
+    SYNC_TIME = 60
+
+    include MonitorMixin
+
+    attr_accessor :error
+
+    attr_reader :amqp_session_id, :amqp_session, :conn
+
+    attr_accessor :broker_id, :sync_result
+
+    def initialize(session, host, port, auth_mech, auth_user, auth_pass)
+      super()
+
+      # For debugging..
+      Thread.abort_on_exception = true
+
+      @session  = session
+      @host     = host
+      @port     = port
+      @auth_user = auth_user
+      @auth_pass = auth_pass
+      @agents   = {}
+      @agents["1.0"] = Agent.new(self, "1.0", "BrokerAgent")
+      @topic_bound = false
+      @cv = new_cond
+      @sync_in_flight = false
+      @sync_request = 0
+      @sync_result = nil
+      @reqs_outstanding = 1
+      @error     = nil
+      @broker_id  = nil
+      @is_connected = false
+      @conn = nil
+      try_to_connect
+    end
+
+    def connected?
+      @is_connected
+    end
+
+    def agent(broker_bank, agent_bank)
+      bank_key = "%d.%d" % [broker_bank, agent_bank]
+      return @agents[bank_key]
+    end
+
+    # Get the list of agents reachable via this broker
+    def agents
+      @agents.values
+    end
+
+    def url
+      "#{@host}:#{@port}"
+    end
+
+    def to_s
+      if connected?
+        "Broker connected at: #{url}"
+      else
+        "Disconnected Broker"
+      end
+    end
+
+    def wait_for_sync_done
+      synchronize do
+        return @cv.wait_for(SYNC_TIME) { ! @sync_in_flight || @error }
+      end
+    end
+
+    def wait_for_stable
+      synchronize do
+        return if @reqs_outstanding == 0
+        @sync_in_flight = true
+        unless @cv.wait_for(SYNC_TIME) { @reqs_outstanding == 0 }
+          raise "Timed out waiting for broker to synchronize"
+        end
+      end
+    end
+
+    # Compose the header of a management message
+    def set_header(codec, opcode, seq=0)
+      codec.write_uint8(?A)
+      codec.write_uint8(?M)
+      codec.write_uint8(?2)
+      codec.write_uint8(opcode)
+      codec.write_uint32(seq)
+    end
+
+    def message(body, routing_key="broker")
+      dp = @amqp_session.delivery_properties
+      dp.routing_key = routing_key
+      mp = @amqp_session.message_properties
+      mp.content_type = "x-application/qmf"
+      mp.reply_to = amqp_session.reply_to("amq.direct", @reply_name)
+      return Qpid::Message.new(dp, mp, body)
+    end
+
+    def emit(msg, dest="qpid.management")
+      @amqp_session.message_transfer(:destination => dest,
+                                     :message => msg)
+    end
+
+    def inc_outstanding
+      synchronize { @reqs_outstanding += 1 }
+    end
+
+    def dec_outstanding
+      synchronize do
+        @reqs_outstanding -= 1
+        if @reqs_outstanding == 0 && ! @topic_bound
+          @topic_bound = true
+          @session.binding_key_list.each do |key|
+            args = {
+              :exchange => "qpid.management",
+              :queue => @topic_name,
+              :binding_key => key }
+            @amqp_session.exchange_bind(args)
+          end
+        end
+        if @reqs_outstanding == 0 && @sync_in_flight
+          sync_done
+        end
+      end
+    end
+
+    def sync_start
+      synchronize { @sync_in_flight = true }
+    end
+
+    def sync_done
+      synchronize do
+        @sync_in_flight = false
+        @cv.signal
+      end
+    end
+
+    def update_agent(obj)
+      bank_key = "%d.%d" % [obj.brokerBank, obj.agentBank]
+      if obj.delete_time == 0
+        unless @agents.include?(bank_key)
+          agent = Agent.new(self, bank_key, obj.label)
+          @agents[bank_key] = agent
+          @session.console.new_agent(agent) if @session.console
+        end
+      else
+        agent = @agents.delete(bank_key)
+        @session.console.del_agent(agent) if agent && @session.console
+      end
+    end
+
+    def shutdown
+      if connected?
+        @amqp_session.incoming("rdest").stop
+        if @session.console
+          @amqp_session.incoming("tdest").stop
+        end
+        @amqp_session.close
+        @is_connected = false
+      else
+        raise "Broker already disconnected"
+      end
+    end
+
+    private
+
+    def try_to_connect
+      #begin
+      @amqp_session_id = "%s.%d" % [Socket.gethostname, Process::pid]
+      # FIXME: Need sth for Qpid::Util::connect
+
+      @conn = Qpid::Connection.new(TCPSocket.new(@host, @port),
+                                   :username => @auth_user,
+                                   :password => @auth_pass)
+      @conn.start
+      @reply_name = "reply-%s" % amqp_session_id
+      @amqp_session = @conn.session(@amqp_session_id)
+      @amqp_session.auto_sync = true
+
+      @amqp_session.queue_declare(:queue => @reply_name,
+                                  :exclusive => true,
+                                  :auto_delete => true)
+
+      @amqp_session.exchange_bind(:exchange => "amq.direct",
+                                  :queue => @reply_name,
+                                  :binding_key => @reply_name)
+      @amqp_session.message_subscribe(:queue => @reply_name,
+                                      :destination => "rdest",
+                                      :accept_mode => @amqp_session.message_accept_mode.none,
+                                      :acquire_mode => @amqp_session.message_acquire_mode.pre_acquired)
+      q = @amqp_session.incoming("rdest")
+      q.exc_listen(& method(:exception_cb))
+      q.listen(& method(:reply_cb))
+      @amqp_session.message_set_flow_mode(:destination => "rdest",
+                                          :flow_mode => 1)
+      @amqp_session.message_flow(:destination => "rdest",
+                                 :unit => 0,
+                                 :value => 0xFFFFFFFF)
+      @amqp_session.message_flow(:destination => "rdest",
+                                 :unit => 1,
+                                 :value => 0xFFFFFFFF)
+
+      @topic_name = "topic-#{@amqp_session_id}"
+      @amqp_session.queue_declare(:queue => @topic_name,
+                                  :exclusive => true,
+                                  :auto_delete => true)
+      @amqp_session.message_subscribe(:queue => @topic_name,
+                                      :destination => "tdest",
+                                      :accept_mode => @amqp_session.message_accept_mode.none,
+                                      :acquire_mode => @amqp_session.message_acquire_mode.pre_acquired)
+      @amqp_session.incoming("tdest").listen(& method(:reply_cb))
+      @amqp_session.message_set_flow_mode(:destination => "tdest",
+                                          :flow_mode => 1)
+      @amqp_session.message_flow(:destination => "tdest",
+                                 :unit => 0,
+                                 :value => 0xFFFFFFFF)
+      @amqp_session.message_flow(:destination => "tdest",
+                                 :unit => 1,
+                                 :value => 0xFFFFFFFF)
+
+      @is_connected = true
+      @session.handle_broker_connect(self)
+
+      codec = Qpid::StringCodec.new(@conn.spec)
+      set_header(codec, ?B)
+      msg = message(codec.encoded)
+      emit(msg)
+      # FIXME: These exceptions are bogus here
+      #rescue socket.error => e
+      #   @error = "Socket Error %s - %s" % [e[0], e[1]]
+      #rescue Closed => e
+      #    @error = "Connect Failed %d - %s" % [e[0], e[1]]
+      #rescue ConnectionFailed => e
+      #    @error = "Connect Failed %d - %s" % [e[0], e[1]]
+      #end
+    end
+
+    # Check the header of a management message and extract the opcode and
+    # class
+    def check_header(codec)
+      begin
+        return [nil, nil] unless codec.read_uint8 == ?A
+        return [nil, nil] unless codec.read_uint8 == ?M
+        return [nil, nil] unless codec.read_uint8 == ?2
+        opcode = codec.read_uint8
+        seq    = codec.read_uint32
+        return [opcode, seq]
+      rescue
+        return [nil, nil]
+      end
+    end
+
+    def reply_cb(msg)
+      codec = Qpid::StringCodec.new(@conn.spec, msg.body)
+      loop do
+        opcode, seq = check_header(codec)
+        return unless opcode
+        case opcode
+        when ?b: @session.handle_broker_resp(self, codec, seq)
+        when ?p: @session.handle_package_ind(self, codec, seq)
+        when ?z: @session.handle_command_complete(self, codec, seq)
+        when ?q: @session.handle_class_ind(self, codec, seq)
+        when ?m: @session.handle_method_resp(self, codec, seq)
+        when ?h: @session.handle_heartbeat_ind(self, codec, seq, msg)
+        when ?e: @session.handle_event_ind(self, codec, seq)
+        when ?s: @session.handle_schema_resp(self, codec, seq)
+        when ?c: @session.handle_content_ind(self, codec, seq, true, false)
+        when ?i: @session.handle_content_ind(self, codec, seq, false, true)
+        when ?g: @session.handle_content_ind(self, codec, seq, true, true)
+        else
+          raise "Unexpected opcode #{opcode.inspect}"
+        end
+      end
+    end
+
+    def exception_cb(data)
+      @is_connected = false
+      @error = data
+      synchronize { @cv.signal if @sync_in_flight }
+      @session.handle_error(@error)
+      @session.handle_broker_disconnect(self)
+    end
+  end
+
+  class Agent
+    attr_reader :broker, :bank
+
+    def initialize(broker, bank, label)
+      @broker = broker
+      @bank   = bank
+      @label  = label
+    end
+
+    def to_s
+      "Agent at bank %s (%s)" % [@bank, @label]
+    end
+  end
+
+  class Event
+
+    attr_reader :klass_key, :arguments, :timestamp, :name, :schema
+
+    def initialize(session, broker, codec)
+      @session = session
+      @broker  = broker
+      pname = codec.read_str8
+      cname = codec.read_str8
+      hash  = codec.read_bin128
+      @klass_key = [pname, cname, hash]
+      @timestamp = codec.read_int64
+      @severity = codec.read_uint8
+      @schema = nil
+      session.packages.keys.each do |pname|
+        k = [cname, hash]
+        if session.packages[pname].include?(k)
+          @schema = session.packages[pname][k]
+          @arguments = {}
+          @schema.arguments.each do |arg|
+            v = session.decode_value(codec, arg.type)
+            @arguments[arg.name] = v
+          end
+        end
+      end
+    end
+
+    def to_s
+      return "<uninterpretable>" unless @schema
+      t = Time.at(self.timestamp / 1000000000)
+      out = t.strftime("%c")
+      out += " " + sev_name + " " + @klass_key[0] + ":" + klass_key[1]
+      out += " broker=" + @broker.url
+      @schema.arguments.each do |arg|
+        out += " " + arg.name + "=" + @session.display_value(@arguments[arg.name], arg.type)
+      end
+      return out
+    end
+
+    def sev_name
+      case @severity
+      when 0 : return "EMER "
+      when 1 : return "ALERT"
+      when 2 : return "CRIT "
+      when 3 : return "ERROR"
+      when 4 : return "WARN "
+      when 5 : return "NOTIC"
+      when 6 : return "INFO "
+      when 7 : return "DEBUG"
+      else
+        return "INV-%d" % @severity
+      end
+    end
+
+  end
+
+  # Manage sequence numbers for asynchronous method calls
+  class SequenceManager
+    include MonitorMixin
+
+    def initialize
+      super()
+      @sequence = 0
+      @pending  = {}
+    end
+
+    # Reserve a unique sequence number
+    def reserve (data)
+      synchronize do
+        result = @sequence
+        @sequence += 1
+        @pending[result] = data
+        return result
+      end
+    end
+
+    # Release a reserved sequence number
+    def release (seq)
+      synchronize { @pending.delete(seq) }
+    end
+  end
+
+  class DebugConsole < Console
+
+    def broker_connected(broker)
+      puts "brokerConnected #{broker}"
+    end
+
+    def broker_disconnected(broker)
+      puts "brokerDisconnected #{broker}"
+    end
+
+    def new_package(name)
+      puts "newPackage #{name}"
+    end
+
+    def new_class(kind, klass_key)
+      puts "newClass #{kind} #{klass_key}"
+    end
+
+    def new_agent(agent)
+      puts "new_agent #{agent}"
+    end
+
+    def del_agent(agent)
+      puts "delAgent #{agent}"
+    end
+
+    def object_props(broker, record)
+      puts "objectProps #{record.klass_key}"
+    end
+
+    def object_stats(broker, record)
+      puts "objectStats #{record.klass_key}"
+    end
+
+    def event(broker, event)
+      puts "event #{event}"
+    end
+
+    def heartbeat(agent, timestamp)
+      puts "heartbeat #{agent}"
+    end
+
+    def broker_info(broker)
+      puts "brokerInfo #{broker}"
+    end
+  end
+end

Added: incubator/qpid/trunk/qpid/ruby/lib/qpid/queue.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/ruby/lib/qpid/queue.rb?rev=713616&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/ruby/lib/qpid/queue.rb (added)
+++ incubator/qpid/trunk/qpid/ruby/lib/qpid/queue.rb Wed Nov 12 18:45:18 2008
@@ -0,0 +1,101 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Augment the standard python multithreaded Queue implementation to add a
+# close() method so that threads blocking on the content of a queue can be
+# notified if the queue is no longer in use.
+
+require 'thread'
+
+# Python nominally uses a bounded queue, but the code never establishes
+# a maximum size; we therefore use Ruby's unbounded queue
+class Qpid::Queue < ::Queue
+
+  DONE = Object.new
+  STOP = Object.new
+
+  def initialize
+    super
+    @error = nil
+    @listener = nil
+    @exc_listener = nil
+    @exc_listener_lock = Monitor.new
+    @thread = nil
+  end
+
+  def close(error = nil)
+    @error = error
+    put(DONE)
+    unless @thread.nil?
+      @thread.join()
+      @thread = nil
+    end
+  end
+
+  def get(block = true, timeout = nil)
+    unless timeout.nil?
+      raise NotImplementedError
+    end
+    result = pop(! block)
+    if result == DONE
+      # this guarantees that any other waiting threads or any future
+      # calls to get will also result in a Qpid::Closed exception
+      put(DONE)
+      raise Qpid::Closed.new(@error)
+    else
+      return result
+    end
+  end
+
+  alias :put :push
+
+  def exc_listen(&block)
+    @exc_listener_lock.synchronize do
+      @exc_listener = block
+    end
+  end
+
+  def listen(&block)
+    if ! block_given? && @thread
+      put(STOP)
+      @thread.join()
+      @thread = nil
+    end
+
+    # FIXME: There is a potential race since we could be changing one
+    # non-nil listener to another
+    @listener = block
+
+    if block_given? && @thread.nil?
+      @thread = Thread.new do
+        loop do
+          begin
+            o = get()
+            break if o == STOP
+            @listener.call(o)
+          rescue Qpid::Closed => e
+            @exc_listener.call(e) if @exc_listener
+            break
+          end
+        end
+      end
+    end
+  end
+
+end

Added: incubator/qpid/trunk/qpid/ruby/lib/qpid/session.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/ruby/lib/qpid/session.rb?rev=713616&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/ruby/lib/qpid/session.rb (added)
+++ incubator/qpid/trunk/qpid/ruby/lib/qpid/session.rb Wed Nov 12 18:45:18 2008
@@ -0,0 +1,458 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require 'monitor'
+
+module Qpid
+
+  class Session < Invoker
+
+    def log; Qpid::logger["qpid.io.cmd"]; end
+    def msg; Qpid::logger["qpid.io.msg"]; end
+
+
+    class Exception < RuntimeError; end
+    class Closed < Qpid::Session::Exception; end
+    class Detached < Qpid::Session::Exception; end
+
+
+    INCOMPLETE = Object.new
+
+    def self.client(*args)
+      return Qpid::Client(*args)
+    end
+
+    def self.server(*args)
+      return Server(*args)
+    end
+
+    attr_reader :name, :spec, :auto_sync, :timeout, :channel
+    attr_reader :results, :exceptions
+    attr_accessor :channel, :auto_sync, :send_id, :receiver, :sender
+
+    # FIXME: Pass delegate through a block ?
+    def initialize(name, spec, kwargs = {})
+      auto_sync = true
+      auto_sync = kwargs[:auto_sync] if kwargs.key?(:auto_sync)
+      timeout = kwargs[:timeout] || 10
+      delegate = kwargs[:delegate]
+
+      @name = name
+      @spec = spec
+      @auto_sync = auto_sync
+      @timeout = timeout
+      @invoke_lock = Monitor.new
+      @closing = false
+      @closed = false
+
+      @cond_lock = Monitor.new
+      @condition = @cond_lock.new_cond
+
+      @send_id = true
+      @receiver = Receiver.new(self)
+      @sender = Sender.new(self)
+
+      @lock = Monitor.new
+      @incoming = {}
+      @results = {}
+      @exceptions = []
+
+      @assembly = nil
+
+      @delegate = delegate.call(self) if delegate
+
+      @ctl_seg = spec[:segment_type].enum[:control].value
+      @cmd_seg = spec[:segment_type].enum[:command].value
+      @hdr_seg = spec[:segment_type].enum[:header].value
+      @body_seg = spec[:segment_type].enum[:body].value
+    end
+
+    def incoming(destination)
+      @lock.synchronize do
+        queue = @incoming[destination]
+        unless queue
+          queue = Incoming.new(self, destination)
+          @incoming[destination] = queue
+        end
+        return queue
+      end
+    end
+
+    def error?
+      @exceptions.size > 0
+    end
+
+    def sync(timeout=nil)
+      if channel && Thread.current == channel.connection.thread
+        raise Qpid::Session::Exception, "deadlock detected"
+      end
+      unless @auto_sync
+        execution_sync(:sync => true)
+      end
+      last = @sender.next_id - 1
+      @cond_lock.synchronize do
+        unless @condition.wait_for(timeout) {
+            @sender.completed.include?(last) || error?
+          }
+          raise Qpid::Timeout
+        end
+      end
+      if error?
+        raise Qpid::Session::Exception, exceptions
+      end
+    end
+
+    def close(timeout=nil)
+      @invoke_lock.synchronize do
+        @closing = true
+        channel.session_detach(name)
+      end
+      @cond_lock.synchronize do
+        unless @condition.wait_for(timeout) { @closed }
+          raise Qpid::Timeout
+        end
+      end
+    end
+
+    def closed
+      @lock.synchronize do
+        return if @closed
+
+        @results.each { |id, f| f.error(exceptions) }
+        @results.clear
+
+        @incoming.values.each { |q| q.close(exceptions) }
+        @closed = true
+        @cond_lock.synchronize { @condition.signal }
+      end
+    end
+
+    def resolve_method(name)
+      o = @spec.children[name]
+      case o
+      when Qpid::Spec010::Command
+        return invocation(:method, o)
+      when Qpid::Spec010::Struct
+        return invocation(:method, o)
+      when Qpid::Spec010::Domain
+        return invocation(:value, o.enum) unless o.enum.nil?
+      end
+
+      matches = @spec.children.select { |x|
+        x.name.to_s.include?(name.to_s)
+      }.collect { |x| x.name.to_s }.sort
+      if matches.size == 0
+        msg = nil
+      elsif matches.size == 1
+        msg = "Did you mean #{matches[0]} ? "
+      else
+        msg =  "Did you mean one of #{matches.join(",")} ? "
+      end
+      return invocation(:error, msg)
+    end
+
+    def invoke(type, args)
+      # XXX
+      unless type.respond_to?(:track)
+        return type.create(*args)
+      end
+      @invoke_lock.synchronize do
+        return do_invoke(type, args)
+      end
+    end
+
+    def do_invoke(type, args)
+      raise Qpid::Session::Closed if @closing
+      raise Qpid::Session::Detached unless channel
+
+      # Clumsy simulation of Python's keyword args
+      kwargs = {}
+      if args.size > 0 && args[-1].is_a?(Hash)
+        if args.size > type.fields.size
+          kwargs = args.pop
+        elsif type.fields[args.size - 1].type != @spec[:map]
+          kwargs = args.pop
+        end
+      end
+
+      if type.payload
+        if args.size == type.fields.size + 1
+          message = args.pop
+        else
+          message = kwargs.delete(:message) # XXX Really ?
+        end
+      else
+        message = nil
+      end
+
+      hdr = Qpid::struct(@spec[:header])
+      hdr.sync = @auto_sync || kwargs.delete(:sync)
+
+      cmd = type.create(*args.push(kwargs))
+      sc = Qpid::StringCodec.new(@spec)
+      sc.write_command(hdr, cmd)
+
+      seg = Segment.new(true, (message.nil? ||
+                               (message.headers.nil? && message.body.nil?)),
+                        type.segment_type, type.track, @channel.id, sc.encoded)
+
+      unless type.result.nil?
+        result = Future.new(exception=Exception)
+        @results[@sender.next_id] = result
+      end
+      emit(seg)
+
+      log.debug("SENT %s %s %s" % [seg.id, hdr, cmd]) if log
+
+      unless message.nil?
+        unless message.headers.nil?
+          sc = Qpid::StringCodec.new(@spec)
+          message.headers.each { |st| sc.write_struct32(st) }
+
+          seg = Segment.new(false, message.body.nil?, @hdr_seg,
+                            type.track, @channel.id, sc.encoded)
+          emit(seg)
+        end
+        unless message.body.nil?
+          seg = Segment.new(false, true, @body_seg, type.track,
+                            @channel.id, message.body)
+          emit(seg)
+        end
+        msg.debug("SENT %s" % message) if msg
+      end
+
+      if !type.result.nil?
+        return @auto_sync ? result.get(@timeout) : result
+      elsif @auto_sync
+        sync(@timeout)
+      end
+    end
+
+    def received(seg)
+      @receiver.received(seg)
+      if seg.first_segment?
+        raise Qpid::Session::Exception unless @assembly.nil?
+        @assembly = []
+      end
+      @assembly << seg
+      if seg.last_segment?
+        dispatch(@assembly)
+        @assembly = nil
+      end
+    end
+
+    def dispatch(assembly)
+      hdr = nil
+      cmd = nil
+      header = nil
+      body = nil
+      assembly.each do |seg|
+        d = seg.decode(@spec)
+        case seg.type
+        when @cmd_seg
+          hdr, cmd = d
+        when @hdr_seg
+          header = d
+        when @body_seg
+          body = d
+        else
+          raise Qpid::Session::Exception
+        end
+      end
+      log.debug("RECV %s %s %s" % [cmd.id, hdr, cmd]) if log
+
+      if cmd.type.payload
+        result = @delegate.send(cmd.type.name, cmd, header, body)
+      else
+        result = @delegate.send(cmd.type.name, cmd)
+      end
+
+      unless cmd.type.result.nil?
+        execution_result(cmd.id, result)
+      end
+
+      if result != INCOMPLETE
+        assembly.each do |seg|
+          @receiver.has_completed(seg)
+          # XXX: don't forget to obey sync for manual completion as well
+          if hdr.sync
+            @channel.session_completed(@receiver.completed)
+          end
+        end
+      end
+    end
+
+    # Python calls this 'send', but that has a special meaning
+    # in Ruby, so we call it 'emit'
+    def emit(seg)
+      @sender.emit(seg)
+    end
+
+    def signal
+      @cond_lock.synchronize { @condition.signal }
+    end
+
+    def wait_for(timeout = nil, &block)
+      @cond_lock.synchronize { @condition.wait_for(timeout, &block) }
+    end
+
+    def to_s
+      "<Session: #{name}, #{channel}>"
+    end
+
+    class Receiver
+
+      attr_reader :completed
+      attr_accessor :next_id, :next_offset
+
+      def initialize(session)
+        @session = session
+        @next_id = nil
+        @next_offset = nil
+        @completed = Qpid::RangedSet.new()
+      end
+
+      def received(seg)
+        if @next_id.nil? || @next_offset.nil?
+          raise Exception, "todo"
+        end
+        seg.id = @next_id
+        seg.offset = @next_offset
+        if seg.last_segment?
+          @next_id += 1
+          @next_offset = 0
+        else
+          @next_offset += seg.payload.size
+        end
+      end
+
+      def has_completed(seg)
+        if seg.id.nil?
+          raise ArgumentError, "cannot complete unidentified segment"
+        end
+        if seg.last_segment?
+          @completed.add(seg.id)
+        end
+      end
+
+      def known_completed(commands)
+        completed = Qpid::RangedSet.new()
+        @completed.ranges.each do |c|
+          unless commands.ranges.find { |kc|
+              kc.contains(c.lower) && kc.contains(c.upper)
+            }
+            completed.add_range(c)
+          end
+        end
+        @completed = completed
+      end
+    end
+
+    class Sender
+
+      def initialize(session)
+        @session = session
+        @next_id = 0.to_serial
+        @next_offset = 0
+        @segments = []
+        @completed = RangedSet.new()
+      end
+
+      attr_reader :next_id, :completed
+
+      def emit(seg)
+        seg.id = @next_id
+        seg.offset = @next_offset
+        if seg.last_segment?
+          @next_id += 1
+          @next_offset = 0
+        else
+          @next_offset += seg.payload.size
+        end
+        @segments << seg
+        if @session.send_id
+          @session.send_id = false
+          @session.channel.session_command_point(seg.id, seg.offset)
+        end
+        @session.channel.connection.write_segment(seg)
+      end
+
+      def has_completed(commands)
+        @segments = @segments.reject { |seg| commands.include?(seg.id) }
+        commands.ranges.each do |range|
+          @completed.add(range.lower, range.upper)
+        end
+      end
+    end
+
+    class Incoming < Qpid::Queue
+
+      def initialize(session, destination)
+        super()
+        @session = session
+        @destination = destination
+      end
+
+      def start
+        @session.message_credit_unit.choices.each do |unit|
+          @session.message_flow(@destination, unit.value, 0xFFFFFFFF)
+        end
+      end
+
+      def stop
+        @session.message_cancel(@destination)
+        listen             # Kill the listener
+      end
+    end
+
+    class Delegate
+
+      def initialize(session)
+        @session = session
+      end
+
+      #XXX: do something with incoming accepts
+      def message_accept(ma) nil; end
+
+      def execution_result(er)
+        future = @session.results.delete(er.command_id)
+        future.set(er.value)
+      end
+
+      def execution_exception(ex)
+        @session.exceptions.append(ex)
+      end
+    end
+
+    class Client < Delegate
+
+      def log ; Qpid::logger["qpid.io.msg"]; end
+
+      def message_transfer(cmd, headers, body)
+        m = Qpid::Message.new(body)
+        m.headers = headers
+        m.id = cmd.id
+        messages = @session.incoming(cmd.destination)
+        messages.put(m)
+        log.debug("RECV %s" % m) if log
+        return INCOMPLETE
+      end
+    end
+  end
+end

Added: incubator/qpid/trunk/qpid/ruby/lib/qpid/spec.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/ruby/lib/qpid/spec.rb?rev=713616&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/ruby/lib/qpid/spec.rb (added)
+++ incubator/qpid/trunk/qpid/ruby/lib/qpid/spec.rb Wed Nov 12 18:45:18 2008
@@ -0,0 +1,183 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require "set"
+require "rexml/document"
+require "qpid/fields"
+require "qpid/traverse"
+
+module Qpid
+  module Spec
+
+    include REXML
+
+    class Container < Array
+
+      def initialize()
+        @cache = {}
+      end
+
+      def [](key)
+        return @cache[key] if @cache.include?(key)
+        value = do_lookup(key)
+        @cache[key] = value
+        return value
+      end
+
+      def do_lookup(key)
+        case key
+        when String
+          return find {|x| x.name == key.intern()}
+        when Symbol
+          return find {|x| x.name == key}
+        else
+          return slice(key)
+        end
+      end
+
+      def +(other)
+        copy = clone()
+        copy.concat(other)
+        return copy
+      end
+
+    end
+
+    class Reference
+
+      fields(:name)
+
+      def init(&block)
+        @resolver = block
+      end
+
+      def resolve(*args)
+        @resolver.call(*args)
+      end
+
+    end
+
+    class Loader
+
+      def initialize()
+        @stack = []
+      end
+
+      def container()
+        return Container.new()
+      end
+
+      def load(obj)
+        case obj
+        when String
+          elem = @stack[-1]
+          result = container()
+          elem.elements.each(obj) {|e|
+            @index = result.size
+            result << load(e)
+          }
+          @index = nil
+          return result
+        else
+          elem = obj
+          @stack << elem
+          begin
+            result = send(:"load_#{elem.name}")
+          ensure
+            @stack.pop()
+          end
+          return result
+        end
+      end
+
+      def element
+        @stack[-1]
+      end
+
+      def text
+        element.text
+      end
+
+      def attr(name, type = :string, default = nil, path = nil)
+        if path.nil?
+          elem = element
+        else
+          elem = nil
+          element.elements.each(path) {|elem|}
+          if elem.nil?
+            return default
+          end
+        end
+
+        value = elem.attributes[name]
+        value = value.strip() unless value.nil?
+        if value.nil?
+          default
+        else
+          send(:"parse_#{type}", value)
+        end
+      end
+
+      def parse_int(value)
+        if value.nil?
+          return nil
+        else
+          value.to_i(0)
+        end
+      end
+
+      TRUE = ["yes", "true", "1"].to_set
+      FALSE = ["no", "false", "0", nil].to_set
+
+      def parse_bool(value)
+        if TRUE.include?(value)
+          true
+        elsif FALSE.include?(value)
+          false
+        else
+          raise Exception.new("parse error, expecting boolean: #{value}")
+        end
+      end
+
+      def parse_string(value)
+        value.to_s
+      end
+
+      def parse_symbol(value)
+        value.intern() unless value.nil?
+      end
+
+      REPLACE = {" " => "_", "-" => "_"}
+      KEYWORDS = {"global" => "global_", "return" => "return_"}
+
+      def parse_name(value)
+        return if value.nil?
+
+        REPLACE.each do |k, v|
+          value = value.gsub(k, v)
+        end
+
+        value = KEYWORDS[value] if KEYWORDS.has_key? value
+        return value.intern()
+      end
+
+    end
+
+  end
+end

Added: incubator/qpid/trunk/qpid/ruby/lib/qpid/spec010.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/ruby/lib/qpid/spec010.rb?rev=713616&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/ruby/lib/qpid/spec010.rb (added)
+++ incubator/qpid/trunk/qpid/ruby/lib/qpid/spec010.rb Wed Nov 12 18:45:18 2008
@@ -0,0 +1,497 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require "qpid/spec"
+require 'pathname'
+require 'fileutils'
+
+module Qpid::Spec010
+
+  include Qpid::Spec
+
+  AMQP_SPEC_DEFAULT_DIR = "/usr/share/amqp"
+
+  # XXX: workaround for ruby bug/missfeature
+  Reference = Reference
+  Loader = Loader
+
+  class Spec
+
+    ENCODINGS = {
+      String => "vbin16",
+      Fixnum => "int64",
+      Bignum => "int64",
+      Float => "float",
+      NilClass => "void",
+      Array => "list",
+      Hash => "map"
+    }
+
+    fields(:major, :minor, :port, :children)
+
+    def init()
+      @controls = {}
+      @commands = {}
+      @structs = {}
+      @types = {}
+      children.each {|c|
+        case c
+        when Control
+          @controls[c.code] = c
+        when Command
+          @commands[c.code] = c
+        when Struct
+          @structs[c.code] = c
+        when Type
+          @types[c.code] = c unless c.code.nil?
+        end
+      }
+    end
+
+    attr_reader :controls, :commands, :structs, :types
+
+    def [](key)
+      return @children[key]
+    end
+
+    def encoding(klass)
+      if ENCODINGS.has_key?(klass)
+        return self[ENCODINGS[klass]]
+      end
+      for base in klass.__bases__
+        result = encoding(base)
+        return result unless result.nil?
+      end
+    end
+
+    def inspect; "spec"; end
+  end
+
+  class Constant
+
+    fields(:name, :value)
+
+    attr :parent, true
+
+  end
+
+  class Type
+
+    fields(:name, :code, :fixed, :variable)
+
+    attr :parent, true
+
+    def present?(value)
+      if @fixed == 0
+        return value
+      else
+        return !value.nil?
+      end
+    end
+
+    def encode(codec, value)
+      codec.send("write_#{name}", value)
+    end
+
+    def decode(codec)
+      return codec.send("read_#{name}")
+    end
+
+    def inspect; name; end
+
+  end
+
+  class Domain < Type
+
+    fields(:name, :type, :enum)
+
+    attr :parent, true
+
+    def encode(codec, value)
+      @type.encode(codec, value)
+    end
+
+    def decode(codec)
+      return @type.decode(codec)
+    end
+
+  end
+
+  class Enum
+    fields(:choices)
+
+    def [](choice)
+      case choice
+      when String
+        choice = choice.to_sym
+        return choices.find { |c| c.name == choice }
+      when Symbol
+        return choices.find { |c| c.name == choice }
+      else
+        return choices.find { |c| c.value == choice }
+      end
+    end
+
+    def method_missing(name, *args)
+      raise ArgumentError.new("wrong number of arguments") unless  args.empty?
+      return self[name].value
+    end
+
+  end
+
+  class Choice
+    fields(:name, :value)
+  end
+
+  class Composite
+
+    fields(:name, :code, :size, :pack, :fields)
+
+    attr :parent, true
+
+    # Python calls this 'new', but that has special meaning in Ruby
+    def create(*args)
+      return Qpid::struct(self, *args)
+    end
+
+    def decode(codec)
+      codec.read_size(@size)
+      codec.read_uint16() unless @code.nil?
+      return Qpid::struct(self, self.decode_fields(codec))
+    end
+
+    def decode_fields(codec)
+      flags = 0
+      pack.times {|i| flags |= (codec.read_uint8() << 8*i)}
+
+      result = {}
+
+      fields.each_index {|i|
+        f = @fields[i]
+        if flags & (0x1 << i) != 0
+          result[f.name] = f.type.decode(codec)
+        else
+          result[f.name] = nil
+        end
+      }
+
+      return result
+    end
+
+    def encode(codec, value)
+      sc = Qpid::StringCodec.new(@spec)
+      sc.write_uint16(@code) unless @code.nil?
+      encode_fields(sc, value)
+      codec.write_size(@size, sc.encoded.size)
+      codec.write(sc.encoded)
+    end
+
+    def encode_fields(codec, values)
+      # FIXME: This could be written cleaner using select
+      # instead of flags
+      flags = 0
+      fields.each_index do |i|
+        f = fields[i]
+        flags |= (0x1 << i) if f.type.present?(values[f.name])
+      end
+
+      pack.times { |i| codec.write_uint8((flags >> 8*i) & 0xFF) }
+
+      fields.each_index do |i|
+        f = fields[i]
+        f.type.encode(codec, values[f.name]) if flags & (0x1 << i) != 0
+      end
+    end
+
+    def inspect; name; end
+
+  end
+
+  class Field
+
+    fields(:name, :type, :exceptions)
+
+    def default()
+      return nil
+    end
+
+  end
+
+  class Struct < Composite
+
+    def present?(value)
+      return !value.nil?
+    end
+
+  end
+
+  class Action < Composite; end
+
+  class Control < Action
+
+    def segment_type
+      @parent[:segment_type].enum[:control].value
+    end
+
+    def track
+      @parent[:track].enum[:control].value
+    end
+
+  end
+
+  class Command < Action
+
+    attr_accessor :payload, :result
+
+    def segment_type
+      @parent["segment_type"].enum["command"].value
+    end
+
+    def track
+      @parent["track"].enum["command"].value
+    end
+
+  end
+
+  class Doc
+    fields(:type, :title, :text)
+  end
+
+  class Loader010 < Loader
+
+    def initialize()
+      super()
+    end
+
+    def klass
+      cls = element
+      until cls.nil?
+        break if cls.name == "class"
+        cls = cls.parent
+      end
+      return cls
+    end
+
+    def scope
+      if element.name == "struct"
+        return nil
+      else
+        return class_name
+      end
+    end
+
+    def class_name
+      cls = klass
+      if cls.nil?
+        return nil
+      else
+        return parse_name(cls.attributes["name"].strip)
+      end
+    end
+
+    def class_code
+      cls = klass
+      if cls.nil?
+        return 0
+      else
+        return parse_int(cls.attributes["code"].strip)
+      end
+    end
+
+    def parse_decl(value)
+      name = parse_name(value)
+
+      s = scope
+      if s.nil?
+        return name
+      else
+        return :"#{s}_#{name}"
+      end
+    end
+
+    def parse_code(value)
+      c = parse_int(value)
+      if c.nil?
+        return nil
+      else
+        return c | (class_code << 8)
+      end
+    end
+
+    def parse_type(value)
+      name = parse_name(value.sub(".", "_"))
+      cls = class_name
+      return Reference.new {|spec|
+        candidates = [name]
+        candidates << :"#{cls}_#{name}" unless cls.nil?
+        for c in candidates
+          child = spec[c]
+          break unless child.nil?
+        end
+        if child.nil?
+          raise Exception.new("unresolved type: #{name}")
+  else
+    child
+  end
+}
+    end
+
+    def load_amqp()
+      children = nil
+
+      for s in ["constant", "type", "domain", "struct", "control",
+                  "command"]
+        ch = load(s)
+        if children.nil?
+          children = ch
+        else
+          children += ch
+        end
+        children += load("class/#{s}")
+      end
+      children += load("class/command/result/struct")
+      Spec.new(attr("major", :int), attr("minor", :int), attr("port", :int),
+               children)
+    end
+
+    def load_constant()
+      Constant.new(attr("name", :decl), attr("value", :int))
+    end
+
+    def load_type()
+      Type.new(attr("name", :decl), attr("code", :code),
+               attr("fixed-width", :int), attr("variable-width", :int))
+    end
+
+    def load_domain()
+      Domain.new(attr("name", :decl), attr("type", :type), load("enum").first)
+    end
+
+    def load_enum()
+      Enum.new(load("choice"))
+    end
+
+    def load_choice()
+      Choice.new(attr("name", :name), attr("value", :int))
+    end
+
+    def load_field()
+      Field.new(attr("name", :name), attr("type", :type))
+    end
+
+    def load_struct()
+      Struct.new(attr("name", :decl), attr("code", :code), attr("size", :int),
+                 attr("pack", :int), load("field"))
+    end
+
+    def load_action(cls)
+      cls.new(attr("name", :decl), attr("code", :code), 0, 2, load("field"))
+    end
+
+    def load_control()
+      load_action(Control)
+    end
+
+    def load_command()
+      result = attr("type", :type, nil, "result")
+      result = attr("name", :type, nil, "result/struct") if result.nil?
+      segs = load("segments")
+      cmd = load_action(Command)
+      cmd.result = result
+      cmd.payload = !segs.empty?
+      return cmd
+    end
+
+    def load_result()
+      true
+    end
+
+    def load_segments()
+      true
+    end
+
+  end
+
+  def self.spec_cache(specfile)
+      File::join(File::dirname(__FILE__), "spec_cache",
+                 File::basename(specfile, ".xml") + ".rb_marshal")
+  end
+
+  # XXX: could be shared
+  def self.load(spec = nil)
+      return spec if spec.is_a?(Qpid::Spec010::Spec)
+      if spec.nil?
+          # FIXME: Need to add a packaging setup in here so we know where
+          # the installed spec is going to be.
+          specfile = nil
+          if ENV['AMQP_SPEC']
+              specfile = ENV['AMQP_SPEC']
+          else
+              topdir = File::dirname(File::dirname(File::expand_path(__FILE__)))
+              specfile = File::join(topdir, "../../specs", "amqp.0-10-qpid-errata.xml")
+          end
+      else
+          specfile = spec
+      end
+
+      unless Pathname.new(specfile).absolute?
+          path = ENV["AMQP_SPEC_PATH"] || AMQP_SPEC_DEFAULT_DIR
+
+          p = path.split(File::PATH_SEPARATOR).collect { |p|
+              Pathname.new(p).join(specfile)
+          }.find { |p| p.file? }
+          raise "Can not find file for spec #{spec}" unless p
+          specfile = p.to_s
+      end
+
+      specfile_cache = spec_cache(specfile)
+      # FIXME: Check that cache is newer than specfile
+      if File::exist?(specfile_cache)
+          begin
+              spec = File::open(specfile_cache, "r") do |f|
+                  Marshal::load(f)
+              end
+              return spec
+          rescue
+              # Ignore, will load from XML
+          end
+      end
+
+      doc = File::open(specfile, "r") { |f| Document.new(f) }
+      spec = Loader010.new().load(doc.root)
+      spec.traverse! do |o|
+          if o.is_a?(Reference)
+              o.resolve(spec)
+          else
+              o
+          end
+      end
+
+      spec.children.each { |c| c.parent = spec }
+
+      begin
+          FileUtils::mkdir_p(File::dirname(specfile_cache))
+          File::open(specfile_cache, "w") { |f| Marshal::dump(spec, f) }
+      rescue
+          # Ignore, we are fine without the cached spec
+      end
+      return spec
+  end
+
+end

Added: incubator/qpid/trunk/qpid/ruby/lib/qpid/spec08.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/ruby/lib/qpid/spec08.rb?rev=713616&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/ruby/lib/qpid/spec08.rb (added)
+++ incubator/qpid/trunk/qpid/ruby/lib/qpid/spec08.rb Wed Nov 12 18:45:18 2008
@@ -0,0 +1,190 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require "qpid/spec"
+
+module Qpid08
+
+  module Spec
+
+    include Qpid::Spec
+
+    # XXX: workaround for ruby bug/missfeature
+    Reference = Reference
+
+    class Root
+      fields(:major, :minor, :classes, :constants, :domains)
+
+      def find_method(name)
+        classes.each do |c|
+          c.methods.each do |m|
+            if name == m.qname
+              return m
+            end
+          end
+        end
+
+        return nil
+      end
+    end
+
+    class Constant
+      fields(:name, :id, :type, :docs)
+    end
+
+    class Domain
+      fields(:name, :type)
+    end
+
+    class Class
+      fields(:name, :id, :handler, :fields, :methods, :docs)
+    end
+
+    class Method
+      fields(:name, :id, :content?, :responses, :synchronous?, :fields,
+             :docs)
+
+      def init()
+        @response = false
+      end
+
+      attr :parent, true
+
+      def response?; @response end
+      def response=(b); @response = b end
+
+      def qname
+        :"#{parent.name}_#{name}"
+      end
+    end
+
+    class Field
+      fields(:name, :id, :type, :docs)
+
+      def default
+        case type
+        when :bit then false
+        when :octet, :short, :long, :longlong then 0
+        when :shortstr, :longstr then ""
+        when :table then {}
+        end
+      end
+    end
+
+    class Doc
+      fields(:type, :text)
+    end
+
+    class Container08 < Container
+      def do_lookup(key)
+        case key
+        when Integer
+          return find {|x| x.id == key}
+        else
+          return super(key)
+        end
+      end
+    end
+
+    class Loader08 < Loader
+
+      def container()
+        return Container08.new()
+      end
+
+      def load_amqp()
+        Root.new(attr("major", :int), attr("minor", :int), load("class"),
+                 load("constant"), load("domain"))
+      end
+
+      def load_class()
+        Class.new(attr("name", :name), attr("index", :int), attr("handler", :name),
+                  load("field"), load("method"), load("doc"))
+      end
+
+      def load_method()
+        Method.new(attr("name", :name), attr("index", :int),
+                   attr("content", :bool), load("response"),
+                   attr("synchronous", :bool), load("field"), load("docs"))
+      end
+
+      def load_response()
+        name = attr("name", :name)
+        Reference.new {|spec, klass|
+          response = klass.methods[name]
+          if response.nil?
+            raise Exception.new("no such method: #{name}")
+          end
+          response
+        }
+      end
+
+      def load_field()
+        type = attr("type", :name)
+        if type.nil?
+          domain = attr("domain", :name)
+          type = Reference.new {|spec, klass|
+            spec.domains[domain].type
+          }
+        end
+        Field.new(attr("name", :name), @index, type, load("docs"))
+      end
+
+      def load_constant()
+        Constant.new(attr("name", :name), attr("value", :int), attr("class", :name),
+                     load("doc"))
+      end
+
+      def load_domain()
+        Domain.new(attr("name", :name), attr("type", :name))
+      end
+
+      def load_doc()
+        Doc.new(attr("type", :symbol), text)
+      end
+
+    end
+
+    def self.load(spec)
+      case spec
+      when String
+        spec = File.new(spec)
+      end
+      doc = Document.new(spec)
+      spec = Loader08.new().load(doc.root)
+      spec.classes.each do |klass|
+        klass.traverse! do |o|
+          case o
+          when Reference
+            o.resolve(spec, klass)
+          else
+            o
+          end
+        end
+        klass.methods.each do |m|
+          m.parent = klass
+          m.responses.each do |r|
+            r.response = true
+          end
+        end
+      end
+      return spec
+    end
+  end
+end

Added: incubator/qpid/trunk/qpid/ruby/lib/qpid/test.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/ruby/lib/qpid/test.rb?rev=713616&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/ruby/lib/qpid/test.rb (added)
+++ incubator/qpid/trunk/qpid/ruby/lib/qpid/test.rb Wed Nov 12 18:45:18 2008
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require "qpid/spec08"
+require "qpid/client"
+
+module Qpid08
+
+  module Test
+
+    def connect()
+      spec = Spec.load("../specs/amqp.0-8.xml")
+      c = Client.new("0.0.0.0", 5672, spec)
+      c.start({"LOGIN" => "guest", "PASSWORD" => "guest"})
+      return c
+    end
+
+  end
+
+end

Added: incubator/qpid/trunk/qpid/ruby/lib/qpid/traverse.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/ruby/lib/qpid/traverse.rb?rev=713616&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/ruby/lib/qpid/traverse.rb (added)
+++ incubator/qpid/trunk/qpid/ruby/lib/qpid/traverse.rb Wed Nov 12 18:45:18 2008
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+class Object
+
+  public
+
+  def traverse()
+    traverse! {|o| yield(o); o}
+  end
+
+  def traverse_children!()
+    instance_variables.each {|v|
+      value = instance_variable_get(v)
+      replacement = yield(value)
+      instance_variable_set(v, replacement) unless replacement.equal? value
+    }
+  end
+
+  def traverse!(replacements = {})
+    return replacements[__id__] if replacements.has_key? __id__
+    replacement = yield(self)
+    replacements[__id__] = replacement
+    traverse_children! {|o| o.traverse!(replacements) {|c| yield(c)}}
+    return replacement
+  end
+
+end
+
+class Array
+  def traverse_children!()
+    map! {|o| yield(o)}
+  end
+end
+
+class Hash
+  def traverse_children!()
+    mods = {}
+    each_pair {|k, v|
+      key = yield(k)
+      value = yield(v)
+      mods[key] = value unless key.equal? k and value.equal? v
+      delete(k) unless key.equal? k
+    }
+
+    merge!(mods)
+  end
+end

Added: incubator/qpid/trunk/qpid/ruby/lib/qpid/util.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/ruby/lib/qpid/util.rb?rev=713616&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/ruby/lib/qpid/util.rb (added)
+++ incubator/qpid/trunk/qpid/ruby/lib/qpid/util.rb Wed Nov 12 18:45:18 2008
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require 'thread'
+require 'monitor'
+
+# Monkeypatch
+class MonitorMixin::ConditionVariable
+
+  # Wait until BLOCK returns TRUE or TIMEOUT seconds have passed
+  # Return TRUE if BLOCK returned TRUE within the TIMEOUT, FALSE
+  # otherswise
+  def wait_for(timeout=nil, &block)
+    start = Time.now
+    passed = 0
+    until yield
+      if timeout.nil?
+        wait
+      elsif passed < timeout
+        wait(timeout)
+      else
+        return false
+      end
+      passed = Time.now - start
+    end
+    return true
+  end
+end
+
+module Qpid::Util
+
+  # Similar to Python's threading.Event
+  class Event
+    def initialize
+      @monitor = Monitor.new
+      @cond = @monitor.new_cond
+      @set = false
+    end
+
+    def set
+      @monitor.synchronize do
+        @set = true
+        @cond.signal
+      end
+    end
+
+    def clear
+      @monitor.synchronize { @set = false }
+    end
+
+    def wait(timeout = nil)
+      @monitor.synchronize do
+        unless @set
+          @cond.wait_for(timeout) { @set }
+        end
+      end
+    end
+  end
+end