You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2011/02/04 07:38:32 UTC
svn commit: r1067095 - in /qpid/trunk/qpid/cpp/bindings/qmf2:
examples/python/find_agents.py examples/ruby/
examples/ruby/agent_external.rb examples/ruby/agent_internal.rb
examples/ruby/find_agents.rb python/qmf2.py ruby/qmf2.rb
Author: tross
Date: Fri Feb 4 06:38:31 2011
New Revision: 1067095
URL: http://svn.apache.org/viewvc?rev=1067095&view=rev
Log:
Merged missing functionality from the QMFv1 Ruby and Python interfaces to the QMFv2 interfaces:
Added:
qpid/trunk/qpid/cpp/bindings/qmf2/examples/python/find_agents.py
qpid/trunk/qpid/cpp/bindings/qmf2/examples/ruby/
qpid/trunk/qpid/cpp/bindings/qmf2/examples/ruby/agent_external.rb
qpid/trunk/qpid/cpp/bindings/qmf2/examples/ruby/agent_internal.rb
qpid/trunk/qpid/cpp/bindings/qmf2/examples/ruby/find_agents.rb
Modified:
qpid/trunk/qpid/cpp/bindings/qmf2/python/qmf2.py
qpid/trunk/qpid/cpp/bindings/qmf2/ruby/qmf2.rb
Added: qpid/trunk/qpid/cpp/bindings/qmf2/examples/python/find_agents.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/bindings/qmf2/examples/python/find_agents.py?rev=1067095&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/bindings/qmf2/examples/python/find_agents.py (added)
+++ qpid/trunk/qpid/cpp/bindings/qmf2/examples/python/find_agents.py Fri Feb 4 06:38:31 2011
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import cqpid
+import qmf2
+
+class FindAgents(qmf2.ConsoleHandler):
+
+ def __init__(self, session):
+ qmf2.ConsoleHandler.__init__(self, session)
+
+ def agentAdded(self, agent):
+ print "Agent Added: %r" % agent
+
+ def agentDeleted(self, agent, reason):
+ print "Agent Deleted: %r reason: %s" % (agent, reason)
+
+ def agentRestarted(self, agent):
+ print "Agent Restarted: %r" % agent
+
+ def agentSchemaUpdated(self, agent):
+ print "Agent Schema Updated: %r" % agent
+
+
+
+url = "localhost"
+options = ""
+
+connection = cqpid.Connection(url, options)
+connection.open()
+
+session = qmf2.ConsoleSession(connection)
+session.open()
+session.setAgentFilter("[]")
+
+main = FindAgents(session)
+main.run()
+
Added: qpid/trunk/qpid/cpp/bindings/qmf2/examples/ruby/agent_external.rb
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/bindings/qmf2/examples/ruby/agent_external.rb?rev=1067095&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/bindings/qmf2/examples/ruby/agent_external.rb (added)
+++ qpid/trunk/qpid/cpp/bindings/qmf2/examples/ruby/agent_external.rb Fri Feb 4 06:38:31 2011
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require 'cqpid'
+require 'qmf2'
+
+class MyAgent < Qmf2::AgentHandler
+
+ def initialize(session, data)
+ super(session)
+ @data = data
+ end
+
+ def authorize_query(query, user_id)
+ puts "Authorizing #{user_id}"
+ return true
+ end
+
+ def get_query(context, query, user_id)
+ puts "Get Query"
+ context.response(@data)
+ context.complete
+ end
+
+ def method_call(context, method_name, data_addr, args, user_id)
+ puts "Method: #{method_name}"
+ context._success
+ end
+
+end
+
+
+class Program
+
+ def initialize(url)
+ @url = url
+ @sess_options = "{allow-queries:False, external:True}"
+ end
+
+ def setup_schema(agent)
+ @cls_control = Qmf2::Schema.new(Qmf2::SCHEMA_TYPE_DATA, "org.package", "control")
+ @cls_control.add_property(Qmf2::SchemaProperty.new("state", Qmf2::SCHEMA_DATA_STRING))
+ agent.register_schema(@cls_control)
+ end
+
+ def run
+ connection = Cqpid::Connection.new(@url)
+ connection.open
+
+ session = Qmf2::AgentSession.new(connection, @sess_options)
+ session.set_vendor("package.org")
+ session.set_product("external_agent")
+ setup_schema(session)
+ session.open
+
+ @control = Qmf2::Data.new(@cls_control)
+ @control.state = "OPERATIONAL-EXTERNAL"
+ @control.set_addr(Qmf2::DataAddr.new("singleton"))
+
+ main = MyAgent.new(session, @control)
+ main.run
+ end
+end
+
+prog = Program.new("localhost")
+prog.run
+
+
Added: qpid/trunk/qpid/cpp/bindings/qmf2/examples/ruby/agent_internal.rb
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/bindings/qmf2/examples/ruby/agent_internal.rb?rev=1067095&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/bindings/qmf2/examples/ruby/agent_internal.rb (added)
+++ qpid/trunk/qpid/cpp/bindings/qmf2/examples/ruby/agent_internal.rb Fri Feb 4 06:38:31 2011
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require 'cqpid'
+require 'qmf2'
+
+class MyAgent < Qmf2::AgentHandler
+
+ def initialize(session)
+ super(session)
+ end
+
+ def authorize_query(query, user_id)
+ puts "Authorizing #{user_id}"
+ return true
+ end
+
+ def method_call(context, method_name, data_addr, args, user_id)
+ puts "Method: #{method_name}"
+ context._success
+ end
+
+end
+
+
+class Program
+
+ def initialize(url)
+ @url = url
+ @sess_options = "{allow-queries:False}"
+ end
+
+ def setup_schema(agent)
+ @cls_control = Qmf2::Schema.new(Qmf2::SCHEMA_TYPE_DATA, "org.package", "control")
+ @cls_control.add_property(Qmf2::SchemaProperty.new("state", Qmf2::SCHEMA_DATA_STRING))
+ agent.register_schema(@cls_control)
+ end
+
+ def run
+ connection = Cqpid::Connection.new(@url)
+ connection.open
+
+ session = Qmf2::AgentSession.new(connection, @sess_options)
+ session.set_vendor("package.org")
+ session.set_product("internal_agent")
+ setup_schema(session)
+ session.open
+
+ control = Qmf2::Data.new(@cls_control)
+ control.state = "OPERATIONAL"
+ session.add_data(control)
+
+ main = MyAgent.new(session)
+ main.run
+ end
+end
+
+prog = Program.new("localhost")
+prog.run
+
+
Added: qpid/trunk/qpid/cpp/bindings/qmf2/examples/ruby/find_agents.rb
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/bindings/qmf2/examples/ruby/find_agents.rb?rev=1067095&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/bindings/qmf2/examples/ruby/find_agents.rb (added)
+++ qpid/trunk/qpid/cpp/bindings/qmf2/examples/ruby/find_agents.rb Fri Feb 4 06:38:31 2011
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require 'cqpid'
+require 'qmf2'
+
+class FindAgents < Qmf2::ConsoleHandler
+
+ def initialize(session)
+ super(session)
+ end
+
+ def agent_added(agent)
+ puts "Agent Added: #{agent.to_s}"
+ end
+
+ def agent_deleted(agent, reason)
+ puts "Agent Deleted: #{agent.to_s} reason: #{reason}"
+ end
+
+ def agent_restarted(agent)
+ puts "Agent Restarted: #{agent.to_s} epoch: #{agent.epoch}"
+ end
+
+ def agent_schema_updated(agent)
+ puts "Agent with new Schemata: #{agent.to_s}"
+ end
+end
+
+
+url = "localhost"
+options = ""
+
+connection = Cqpid::Connection.new(url, options)
+connection.open
+
+session = Qmf2::ConsoleSession.new(connection)
+session.open
+session.set_agent_filter("[]")
+
+main = FindAgents.new(session)
+main.run
+
Modified: qpid/trunk/qpid/cpp/bindings/qmf2/python/qmf2.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/bindings/qmf2/python/qmf2.py?rev=1067095&r1=1067094&r2=1067095&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/bindings/qmf2/python/qmf2.py (original)
+++ qpid/trunk/qpid/cpp/bindings/qmf2/python/qmf2.py Fri Feb 4 06:38:31 2011
@@ -137,6 +137,87 @@ class AgentHandler(Thread):
pass
+#===================================================================================================
+# CONSOLE HANDLER
+#===================================================================================================
+class ConsoleHandler(Thread):
+
+ def __init__(self, consoleSession):
+ Thread.__init__(self)
+ self.__session = consoleSession
+ self.__running = True
+
+ def cancel(self):
+ """
+ Stop the handler thread.
+ """
+ self.__running = None
+
+ def run(self):
+ event = cqmf2.ConsoleEvent()
+ while self.__running:
+ valid = self.__session._impl.nextEvent(event, cqpid.Duration.SECOND)
+ if valid and self.__running:
+ if event.getType() == cqmf2.CONSOLE_AGENT_ADD:
+ self.agentAdded(Agent(event.getAgent()))
+
+ elif event.getType() == cqmf2.CONSOLE_AGENT_DEL:
+ reason = 'filter'
+ if event.getAgentDelReason() == cqmf2.AGENT_DEL_AGED:
+ reason = 'aged'
+ self.agentDeleted(Agent(event.getAgent(), reason))
+
+ elif event.getType() == cqmf2.CONSOLE_AGENT_RESTART:
+ self.agentRestarted(Agent(event.getAgent()))
+
+ elif event.getType() == cqmf2.CONSOLE_AGENT_SCHEMA_UPDATE:
+ self.agentSchemaUpdated(Agent(event.getAgent()))
+
+ ##
+ ## The following methods are intended to be overridden in a sub-class. They are
+ ## handlers for events that occur on QMF consoles.
+ ##
+
+ #
+ # A new agent, whose attributes match the console's agent filter, has been discovered.
+ #
+ def agentAdded(self, agent):
+ pass
+
+ #
+ # A known agent has been removed from the agent list. There are two possible reasons
+ # for agent deletion:
+ #
+ # 1) 'aged' - The agent hasn't been heard from for the maximum age interval and is
+ # presumed dead.
+ # 2) 'filter' - The agent no longer matches the console's agent-filter and has been
+ # effectively removed from the agent list. Such occurrences are likely
+ # to be seen immediately after setting the filter to a new value.
+ #
+ def agentDeleted(self, agent, reason):
+ pass
+
+ #
+ # An agent-restart was detected. This occurs when the epoch number advertised by the
+ # agent changes. It indicates that the agent in question was shut-down/crashed and
+ # restarted.
+ #
+ def agentRestarted(self, agent):
+ pass
+
+ #
+ # The agent has registered new schema information which can now be queried, if desired.
+ #
+ def agentSchemaUpdated(self, agent):
+ pass
+
+ #
+ # An agent raised an event. The 'data' argument is a Data object that contains the
+ # content of the event.
+ #
+ def eventRaised(self, agent, data, timestamp, severity):
+ pass
+
#===================================================================================================
# CONSOLE SESSION
@@ -147,6 +228,16 @@ class ConsoleSession(object):
def __init__(self, connection, options=""):
"""
+ ## The options string is of the form "{key:value,key:value}". The following keys are supported:
+ ##
+ ## domain:NAME - QMF Domain to join [default: "default"]
+ ## max-agent-age:N - Maximum time, in minutes, that we will tolerate not hearing from
+ ## an agent before deleting it [default: 5]
+ ## listen-on-direct:{True,False} - If True: Listen on legacy direct-exchange address for backward compatibility [default]
+ ## If False: Listen only on the routable direct address
+ ## strict-security:{True,False} - If True: Cooperate with the broker to enforce strict access control to the network
+ ## - If False: Operate more flexibly with regard to use of messaging facilities [default]
+ ##
"""
self._impl = cqmf2.ConsoleSession(connection, options)
@@ -195,6 +286,24 @@ class AgentSession(object):
def __init__(self, connection, options=""):
"""
+ ## The options string is of the form "{key:value,key:value}". The following keys are supported:
+ ##
+ ## interval:N - Heartbeat interval in seconds [default: 60]
+ ## external:{True,False} - Use external data storage (queries and subscriptions are pass-through) [default: False]
+ ## allow-queries:{True,False} - If True: automatically allow all queries [default]
+ ## If False: generate an AUTH_QUERY event to allow per-query authorization
+ ## allow-methods:{True,False} - If True: automatically allow all methods [default]
+ ## If False: generate an AUTH_METHOD event to allow per-method authorization
+ ## max-subscriptions:N - Maximum number of concurrent subscription queries permitted [default: 64]
+ ## min-sub-interval:N - Minimum publish interval (in milliseconds) permitted for a subscription [default: 3000]
+ ## sub-lifetime:N - Lifetime (in seconds with no keepalive) for a subscription [default: 300]
+ ## public-events:{True,False} - If True: QMF events are sent to the topic exchange [default]
+ ## If False: QMF events are only sent to authorized subscribers
+ ## listen-on-direct:{True,False} - If True: Listen on legacy direct-exchange address for backward compatibility [default]
+ ## If False: Listen only on the routable direct address
+ ## strict-security:{True,False} - If True: Cooperate with the broker to enforce strict access control to the network
+ ## - If False: Operate more flexibly with regard to use of messaging facilities [default]
+ ##
"""
self._impl = cqmf2.AgentSession(connection, options)
@@ -261,8 +370,6 @@ class AgentSession(object):
else:
self._impl.raiseException(handle, data)
- ## TODO: async and external operations
-
#===================================================================================================
# AGENT PROXY
@@ -372,6 +479,29 @@ class Query(object):
if arg1.__class__ == DataAddr:
self._impl = cqmf2.Query(arg1._impl)
+ def getAddr(self):
+ """
+ """
+ return DataAddr(self._impl.getDataAddr())
+
+ def getSchemaId(self):
+ """
+ """
+ return SchemaId(self._impl.getSchemaId())
+
+ def getPredicate(self):
+ """
+ """
+ return self._impl.getPredicate()
+
+ def matches(self, data):
+ """
+ """
+ m = data
+ if data.__class__ == Data:
+ m = data.getProperties()
+ return self._impl.matchesPredicate(m)
+
#===================================================================================================
# DATA
#===================================================================================================
@@ -411,6 +541,17 @@ class Data(object):
"""
return Agent(self._impl.getAgent())
+ def update(self, timeout=5):
+ dur = cqpid.Duration(cqpid.Duration.SECOND.getMilliseconds() * timeout)
+ agent = self._impl.getAgent()
+ query = cqmf2.Query(self._impl.getAddr())
+ result = agent.query(query, dur)
+ if result.getType() != cqmf2.CONSOLE_QUERY_RESPONSE:
+ raise "Update query failed"
+ if result.getDataCount == 0:
+ raise "Object no longer exists on agent"
+ self._impl = cqmf2.Data(result.getData(0))
+
def getProperties(self):
"""
"""
@@ -519,11 +660,13 @@ class DataAddr(object):
"""
"""
- def __init__(self, arg):
+ def __init__(self, arg, agentName=""):
if arg.__class__ == dict:
self._impl = cqmf2.DataAddr(arg)
- else:
+ elif arg.__class__ == cqmf2.DataAddr:
self._impl = arg
+ else:
+ self._impl = cqmf2.DataAddr(arg, agentName)
def __repr__(self):
return "%s:%s" % (self.getAgentName(), self.getName())
Modified: qpid/trunk/qpid/cpp/bindings/qmf2/ruby/qmf2.rb
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/bindings/qmf2/ruby/qmf2.rb?rev=1067095&r1=1067094&r2=1067095&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/bindings/qmf2/ruby/qmf2.rb (original)
+++ qpid/trunk/qpid/cpp/bindings/qmf2/ruby/qmf2.rb Fri Feb 4 06:38:31 2011
@@ -78,12 +78,270 @@ module Qmf2
end
##==============================================================================
- ## AGENT HANDLER TODO
+ ## AGENT HANDLER
##==============================================================================
class AgentHandler
- def get_query(context, query, userId); end
- def method_call(context, name, object_id, args, userId); end
+
+ def initialize(session)
+ @_session = session
+ @_running = false
+ @_thread = nil
+ end
+
+ ##
+ ## Call the "start" method to run the handler on a new thread.
+ ##
+ def start
+ @_thread = Thread.new do
+ run
+ end
+ end
+
+ ##
+ ## Request that the running thread complete and exit.
+ ##
+ def cancel
+ @_running = false
+ @_thread.join if @_thread
+ @_thread = nil
+ end
+
+ ##
+ ## Call the "run" method only if you want the handler to run on your own thread.
+ ##
+ def run
+ @_running = true
+ event = Cqmf2::AgentEvent.new
+ while @_running do
+ valid = @_session.impl.nextEvent(event, Cqpid::Duration.SECOND)
+ if valid and @_running
+ case event.getType
+ when Cqmf2::AGENT_AUTH_QUERY
+ yes = authorize_query(Query.new(event.getQuery()), event.getUserId())
+ if yes == true
+ @_session.impl.authAccept(event)
+ else
+ @_session.impl.authReject(event)
+ end
+
+ when Cqmf2::AGENT_QUERY
+ context = QueryContext.new(@_session, event)
+ get_query(context, Query.new(event.getQuery()), event.getUserId())
+
+ when Cqmf2::AGENT_METHOD
+ context = MethodContext.new(@_session, event)
+ begin
+ method_call(context, event.getMethodName(), event.getDataAddr(), event.getArguments(), event.getUserId())
+ rescue Exception => ex
+ @_session.impl.raiseException(event, "#{ex}")
+ end
+
+ end
+ end
+ end
+ end
+
+
+ ##
+ ## The following methods are intended to be overridden in a sub-class. They are
+ ## handlers for events that occur on QMF consoles.
+ ##
+
+ #
+ # This method will only be invoked if the "allow-queries" option is enabled on the
+ # agent session. When invoked, it provides the query and the authenticated user-id
+ # of the querying client.
+ #
+ # This method must return true if the query is permitted, false otherwise.
+ #
+ def authorize_query(query, user_id); end
+
+ #
+ # This method will only be invoked if the "external" option is "True" on the agent
+ # session. When invoked, the method should begin the process of responding to a data
+ # query. The authenticated user-id of the requestor is provided for informational
+ # purposes. The 'context' variable is used to provide the results back to the requestor.
+ #
+ # For each matching Data object, call context.response(data). When the query is complete,
+ # call context.complete(). After completing the query, you should not use 'context' any
+ # longer.
+ #
+ # Note: It is not necessary to process the query synchronously. If desired, this method
+ # may store the context for asynchronous processing or pass it to another thread for
+ # processing. There is no restriction on the number of contexts that may be in-flight
+ # concurrently.
+ #
+ def get_query(context, query, user_id); end
+
+ #
+ # This method is invoked when a console calls a QMF method on the agent. Supplied are
+ # a context for the response, the method name, the data address of the data object being
+ # called, the input arguments (a dictionary), and the caller's authenticated user-id.
+ #
+ # A method call can end one of two ways: Successful completion, in which the output
+ # arguments (if any) are supplied; and Exceptional completion if there is an error.
+ #
+ # Successful Completion:
+ # For each output argument, assign the value directly to context (context.arg1 = "value")
+ # Once arguments are assigned, call context._success().
+ #
+ # Exceptional Completion:
+ # Method 1: Call context._exception(data) where 'data' is a string or a Data object.
+ # Method 2: Raise an exception (raise "Error Text") synchronously in the method body.
+ #
+ # Note: Like get_query, method_call may process methods synchronously or asynchronously.
+ # This method may store the context for later asynchronous processing. There is no
+ # restriction on the number of contexts that may be in-flight concurrently.
+ #
+ # However, "Method 2" for Exceptional Completion can only be done synchronously.
+ #
+ def method_call(context, method_name, data_addr, args, user_id); end
+ end
+
+ class QueryContext
+ def initialize(agent, context)
+ @agent = agent
+ @context = context
+ end
+
+ def response(data)
+ @agent.impl.response(@context, data.impl)
+ end
+
+ def complete
+ @agent.impl.complete(@context)
+ end
+ end
+
+ class MethodContext
+ def initialize(agent, context)
+ @agent = agent
+ @context = context
+ end
+
+ def _success
+ @agent.impl.methodSuccess(@context)
+ end
+
+ def _exception(ex)
+ if ex.class == Data
+ @agent.impl.raiseException(@context, ex.impl)
+ else
+ @agent.impl.raiseException(@context, ex)
+ end
+ end
+
+ def method_missing(name_in, *args)
+ name = name_in.to_s
+ if name[name.length - 1] == 61
+ name = name[0..name.length - 2]
+ @context.impl.addReturnArgument(name, args[0])
+ else
+ super.method_missing(name_in, args)
+ end
+ end
+ end
+
+ ##==============================================================================
+ ## CONSOLE HANDLER
+ ##==============================================================================
+
+ class ConsoleHandler
+
+ def initialize(session)
+ @_session = session
+ @_running = false
+ @_thread = nil
+ end
+
+ ##
+ ## Call the "start" method to run the handler on a new thread.
+ ##
+ def start
+ @_thread = Thread.new do
+ run
+ end
+ end
+
+ ##
+ ## Request that the running thread complete and exit.
+ ##
+ def cancel
+ @_running = false
+ @_thread.join if @_thread
+ @_thread = nil
+ end
+
+ ##
+ ## Call the "run" method only if you want the handler to run on your own thread.
+ ##
+ def run
+ @_running = true
+ event = Cqmf2::ConsoleEvent.new
+ while @_running do
+ valid = @_session.impl.nextEvent(event, Cqpid::Duration.SECOND)
+ if valid and @_running
+ case event.getType
+ when Cqmf2::CONSOLE_AGENT_ADD
+ agent_added(Agent.new(event.getAgent))
+
+ when Cqmf2::CONSOLE_AGENT_DEL
+ reason = :filter
+ reason = :aged if event.getAgentDelReason == Cqmf2::AGENT_DEL_AGED
+ agent_deleted(Agent.new(event.getAgent), reason)
+
+ when Cqmf2::CONSOLE_AGENT_RESTART
+ agent_restarted(Agent.new(event.getAgent))
+
+ when Cqmf2::CONSOLE_AGENT_SCHEMA_UPDATE
+ agent_schema_updated(Agent.new(event.getAgent))
+
+ end
+ end
+ end
+ end
+
+
+ ##
+ ## The following methods are intended to be overridden in a sub-class. They are
+ ## handlers for events that occur on QMF consoles.
+ ##
+
+ #
+ # A new agent, whose attributes match the console's agent filter, has been discovered.
+ #
+ def agent_added(agent); end
+
+ #
+ # A known agent has been removed from the agent list. There are two possible reasons
+ # for agent deletion:
+ #
+ # 1) :aged - The agent hasn't been heard from for the maximum age interval and is
+ # presumed dead.
+ # 2) :filter - The agent no longer matches the console's agent-filter and has been
+ # effectively removed from the agent list. Such occurrences are likely
+ # to be seen immediately after setting the filter to a new value.
+ #
+ def agent_deleted(agent, reason); end
+
+ #
+ # An agent-restart was detected. This occurs when the epoch number advertised by the
+ # agent changes. It indicates that the agent in question was shut-down/crashed and
+ # restarted.
+ #
+ def agent_restarted(agent); end
+
+ #
+ # The agent has registered new schema information which can now be queried, if desired.
+ #
+ def agent_schema_updated(agent); end
+
+ #
+ # An agent raised an event. The 'data' argument is a Data object that contains the
+ # content of the event.
+ #
+ def event_raised(agent, data, timestamp, severity); end
end
##==============================================================================
@@ -93,6 +351,16 @@ module Qmf2
class ConsoleSession
attr_reader :impl
+ ## The options string is of the form "{key:value,key:value}". The following keys are supported:
+ ##
+ ## domain:NAME - QMF Domain to join [default: "default"]
+ ## max-agent-age:N - Maximum time, in minutes, that we will tolerate not hearing from
+ ## an agent before deleting it [default: 5]
+ ## listen-on-direct:{True,False} - If True: Listen on legacy direct-exchange address for backward compatibility [default]
+ ## If False: Listen only on the routable direct address
+ ## strict-security:{True,False} - If True: Cooperate with the broker to enforce strict access control to the network
+ ## - If False: Operate more flexibly with regard to use of messaging facilities [default]
+ ##
def initialize(connection, options="")
@impl = Cqmf2::ConsoleSession.new(connection, options)
end
@@ -124,6 +392,24 @@ module Qmf2
class AgentSession
attr_reader :impl
+ ## The options string is of the form "{key:value,key:value}". The following keys are supported:
+ ##
+ ## interval:N - Heartbeat interval in seconds [default: 60]
+ ## external:{True,False} - Use external data storage (queries and subscriptions are pass-through) [default: False]
+ ## allow-queries:{True,False} - If True: automatically allow all queries [default]
+ ## If False: generate an AUTH_QUERY event to allow per-query authorization
+ ## allow-methods:{True,False} - If True: automatically allow all methods [default]
+ ## If False: generate an AUTH_METHOD event to allow per-method authorization
+ ## max-subscriptions:N - Maximum number of concurrent subscription queries permitted [default: 64]
+ ## min-sub-interval:N - Minimum publish interval (in milliseconds) permitted for a subscription [default: 3000]
+ ## sub-lifetime:N - Lifetime (in seconds with no keepalive) for a subscription [default: 300]
+ ## public-events:{True,False} - If True: QMF events are sent to the topic exchange [default]
+ ## If False: QMF events are only sent to authorized subscribers
+ ## listen-on-direct:{True,False} - If True: Listen on legacy direct-exchange address for backward compatibility [default]
+ ## If False: Listen only on the routable direct address
+ ## strict-security:{True,False} - If True: Cooperate with the broker to enforce strict access control to the network
+ ## - If False: Operate more flexibly with regard to use of messaging facilities [default]
+ ##
def initialize(connection, options="")
@impl = Cqmf2::AgentSession.new(connection, options)
end
@@ -137,25 +423,13 @@ module Qmf2
def close() @impl.close end
def register_schema(cls) @impl.registerSchema(cls.impl) end
- def add_data(data, name="", persistent=:false)
+ def add_data(data, name="", persistent=false)
DataAddr.new(@impl.addData(data.impl, name, persistent))
end
def del_data(addr)
@impl.del_data(addr.impl)
end
-
- def method_success(handle)
- @impl.methodSuccess(handle)
- end
-
- def raise_exception(handle, data)
- if data.class == Data
- @impl.raiseException(handle, data.impl)
- else
- @impl.raiseException(handle, data)
- end
- end
end
##==============================================================================
@@ -228,7 +502,7 @@ module Qmf2
end
##==============================================================================
- ## QUERY TODO
+ ## QUERY
##==============================================================================
class Query
@@ -238,6 +512,16 @@ module Qmf2
@impl = Cqmf2::Query.new(arg1.impl)
end
end
+
+ def addr() DataAddr.new(@impl.getDataAddr()) end
+ def schema_id() SchemaId.new(@impl.getSchemaId()) end
+ def predicate() @impl.getPredicate() end
+
+ def matches?(data)
+ map = data
+ map = data.properties if data.class == Data
+ @impl.matchesPredicate(map)
+ end
end
##==============================================================================
@@ -248,16 +532,17 @@ module Qmf2
attr_reader :impl
def initialize(arg=nil)
+ @schema = nil
if arg == nil
@impl = Cqmf2::Data.new
elsif arg.class == Cqmf2::Data
@impl = arg
elsif arg.class == Schema
- @impl = Cqmf2::Data(arg.impl)
+ @impl = Cqmf2::Data.new(arg.impl)
+ @schema = arg
else
raise "Unsupported initializer for Data"
end
- @schema = nil
end
def to_s
@@ -271,6 +556,10 @@ module Qmf2
return nil
end
+ def set_addr(addr)
+ @impl.setAddr(addr.impl)
+ end
+
def addr
if @impl.hasAddr
return DataAddr.new(@impl.getAddr)
@@ -282,6 +571,17 @@ module Qmf2
return Agent.new(@impl.getAgent)
end
+ def update(timeout=5)
+ dur = Cqpid::Duration.new(Cqpid::Duration.SECOND.getMilliseconds * timeout)
+ agent = @impl.getAgent
+ query = Cqmf2::Query.new(@impl.getAddr)
+ result = agent.query(query, dur)
+ raise "Update query failed" if result.getType != Cqmf2::CONSOLE_QUERY_RESPONSE
+ raise "Object no longer exists on agent" if result.getDataCount == 0
+ @impl = Cqmf2::Data.new(result.getData(0))
+ return nil
+ end
+
def properties
return @impl.getProperties
end
@@ -392,11 +692,13 @@ module Qmf2
class DataAddr
attr_reader :impl
- def initialize(arg)
+ def initialize(arg, agentName="")
if arg.class == Hash
@impl = Cqmf2::DataAddr.new(arg)
- else
+ elsif arg.class == Cqmf2::DataAddr
@impl = arg
+ else
+ @impl = Cqmf2::DataAddr.new(arg, agentName)
end
end
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org