You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2008/10/13 11:29:55 UTC

svn commit: r703989 - in /incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management: configuration/QpidDatasource.java domain/model/QpidClass.java domain/services/QpidService.java

Author: arnaudsimon
Date: Mon Oct 13 02:29:54 2008
New Revision: 703989

URL: http://svn.apache.org/viewvc?rev=703989&view=rev
Log:
qpid-1284: on behalf Adnrea: revert to previous revision so to include latest changes from rhs

Modified:
    incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java
    incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidClass.java
    incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java

Modified: incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java?rev=703989&r1=703988&r2=703989&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java (original)
+++ incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java Mon Oct 13 02:29:54 2008
@@ -28,158 +28,95 @@
 import org.apache.commons.pool.ObjectPool;
 import org.apache.commons.pool.impl.GenericObjectPool;
 import org.apache.commons.pool.impl.GenericObjectPoolFactory;
-import org.apache.qpid.ErrorCode;
-import org.apache.qpid.QpidException;
-import org.apache.qpid.nclient.Client;
-import org.apache.qpid.nclient.ClosedListener;
-import org.apache.qpid.nclient.Connection;
-import org.apache.qpid.nclient.DtxSession;
-import org.apache.qpid.nclient.Session;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionException;
 import org.apache.qpid.transport.util.Logger;
 
 /**
  * Qpid datasource.
- * Basically it is a connection pool manager used for optimizing broker connections usage. 
- * 
+ * Basically it is a connection pool manager used for optimizing broker connections usage.
+ *
  * @author Andrea Gazzarini
  */
-public final class QpidDatasource 
+public final class QpidDatasource
 {
     private final static Logger LOGGER = Logger.get(QpidDatasource.class);
-    
+
     /**
      * A connection decorator used for adding pool interaction behaviour to an existing connection.
-     * 
+     *
      * @author Andrea Gazzarini
      */
-    public class ConnectionDecorator implements Connection,ClosedListener 
+    class PooledConnection extends Connection
     {
-        private final Connection _decoratee;
         private final UUID _brokerId;
         private boolean _valid;
-        private BrokerConnectionData _connectionData;
-      
+
         /**
          * Builds a new decorator with the given connection.
-         * 
+         *
          * @param brokerId the broker identifier.
          * @param decoratee the underlying connection.
-         * @param connectionData connection details used for logging purposes.
          */
-        private ConnectionDecorator(UUID brokerId, Connection decoratee, BrokerConnectionData connectionData)
+        private PooledConnection(UUID brokerId)
         {
-            this._decoratee = decoratee;
             this._brokerId = brokerId;
-            this._decoratee.setClosedListener(this);
-            this._connectionData = connectionData;
             _valid = true;
-            LOGGER.debug("<QMAN-200045> : Connection %s for pool %s created.",this,_connectionData);
         }
-        
+
         /**
          * Returns true if the underlying connection is still valid and can be used.
-         * 
+         *
          * @return true if the underlying connection is still valid and can be used.
          */
         boolean isValid()
         {
-            if (_valid)
-            {
-                LOGGER.debug("<QMAN-200013> : Pooled connection %s for %s  seems to be valid.",this,_connectionData);
-            } else 
-            {
-                LOGGER.debug("<QMAN-200013> : Pooled connection %s for %s  has been marked as invalid.",this,_connectionData);                
-            }
             return _valid;
         }
-        
+
+        void reallyClose()
+        {
+            super.close();
+        }
+
         /**
          * Returns the connection to the pool. That is, marks this connections as available.
          * After that, this connection will be available for further operations.
          */
-        public void close () throws QpidException
+        public void close()
         {
             try
             {
                 pools.get(_brokerId).returnObject(this);
-                LOGGER.debug("<QMAN-200012> : <Connection for pool %s released.", _connectionData);
-            } catch (Exception exception)
+                LOGGER.debug("<QMAN-200012> : Connection %s returned to the pool.", this);
+            }
+            catch (Exception e)
             {
-                throw new QpidException("<QMAN-100203> : Error while releasing pooled connection.",ErrorCode.CONNECTION_ERROR,exception);
-            }  
-        }
-
-        /**
-         * Do nothing : underlying connection is already connected.
-         */
-        public void connect (String host, int port, String virtualHost, String username, String password)
-                throws QpidException
-        {
-            // DO NOTHING : DECORATEE CONNECTION IS ALREADY CONNECTED.
-        }
-        
-        /**
-         * Do nothing : underlying connection is already connected.
-         */
-        public void connect (String url) throws QpidException
-        {
-            // DO NOTHING : DECORATEE CONNECTION IS ALREADY CONNECTED.
-        }
-
-        /**
-         * @see Connection#createDTXSession(int)
-         */
-        public DtxSession createDTXSession (int expiryInSeconds)
-        {
-            return _decoratee.createDTXSession(expiryInSeconds);
+                throw new ConnectionException(e);
+            }
         }
 
-        /**
-         * @see Connection#createSession(long)
-         */
-        public Session createSession (long expiryInSeconds)
+        public void exception(Throwable t)
         {
-            return _decoratee.createSession(expiryInSeconds);
+            super.exception(t);
+            _valid = false;
         }
+    }
 
-        /**
-         * Do nothing : closed listener  has been already injected.
-         */
-        public void setClosedListener (ClosedListener exceptionListner)
-        {
-        }
-        
-        /**
-         * Callback method used for error notifications while underlying connection is closing.
-         */
-        public void onClosed (ErrorCode errorCode, String reason, Throwable throwable)
-        {
-            _valid = false;
-            LOGGER.error(
-                    throwable,
-                    "<QMAN-100012> : Error on closing connection. Reason is : %s, error code is %s. Connection %s to %s will be " +
-                    "marked as invalid and therefore will be purged..",
-                    reason,
-                    errorCode.getCode(),
-                    this,
-                    _connectionData);
-        }        
-    };
-        
     /**
-     * This is the connection factory, that is, the factory used to manage the lifecycle (create, validate & destroy) of 
+     * This is the connection factory, that is, the factory used to manage the lifecycle (create, validate & destroy) of
      * the broker connection(s).
-     * 
+     *
      * @author Andrea Gazzarini
      */
     class QpidConnectionFactory extends BasePoolableObjectFactory
-    {    
+    {
         private final BrokerConnectionData _connectionData;
         private final UUID _brokerId;
-        
+
         /**
          * Builds a new connection factory with the given parameters.
-         * 
+         *
          * @param brokerId the broker identifier.
          * @param connectionData the connecton data.
          */
@@ -188,68 +125,67 @@
             this._connectionData = connectionData;
             this._brokerId = brokerId;
         }
-        
+
         /**
          * Creates a new underlying connection.
          */
         @Override
         public Connection makeObject () throws Exception
         {
-            Connection connection = Client.createConnection();
+            PooledConnection connection = new PooledConnection(_brokerId);
             connection.connect(
-                    _connectionData.getHost(), 
-                    _connectionData.getPort(), 
-                    _connectionData.getVirtualHost(), 
-                    _connectionData.getUsername(), 
+                    _connectionData.getHost(),
+                    _connectionData.getPort(),
+                    _connectionData.getVirtualHost(),
+                    _connectionData.getUsername(),
                     _connectionData.getPassword());
-            return new ConnectionDecorator(_brokerId,connection,_connectionData);
+            return connection;
         }
-    
+
         /**
          * Validates the underlying connection.
          */
         @Override
         public boolean validateObject (Object obj)
         {
-            ConnectionDecorator connection = (ConnectionDecorator) obj;
-            return connection.isValid();
+            PooledConnection connection = (PooledConnection) obj;
+            boolean isValid = connection.isValid();
+            LOGGER.debug("<QMAN-200013> : Test connection on reserve. Is valid? %s",isValid);
+            return isValid;
         }
-        
+
         /**
          * Closes the underlying connection.
          */
         @Override
         public void destroyObject (Object obj) throws Exception
         {
-            ConnectionDecorator connection = (ConnectionDecorator) obj;
             try
             {
-                connection._decoratee.close();
-                LOGGER.debug("<QMAN-200014> : Connection for %s has been closed.",connection._connectionData);
-            } catch (Exception exception)
+                PooledConnection connection = (PooledConnection) obj;
+                connection.reallyClose();
+                LOGGER.debug("<QMAN-200014> : Connection has been destroyed.");
+            } catch (Exception e)
             {
-                LOGGER.debug(
-                        exception,
-                        "<QMAN-200015> : Unable to close an underlying qpid connection (target address is %s) .",
-                        connection._connectionData);
+                LOGGER.debug(e, "<QMAN-200015> : Unable to destroy a connection object");
             }
         }
     }
-   
+
     // Singleton instance.
     private static QpidDatasource instance = new QpidDatasource();
 
     // Each entry contains a connection pool for a specific broker.
     private Map<UUID, ObjectPool> pools = new HashMap<UUID, ObjectPool>();
-    
+
     // Private constructor.
     private QpidDatasource()
     {
     }
-    
+
     /**
      * Gets an available connection from the pool of the given broker.
-     * 
+     *
      * @param brokerId the broker identifier.
      * @return a valid connection to the broker associated with the given identifier.
      */
@@ -257,20 +193,20 @@
     {
         return (Connection) pools.get(brokerId).borrowObject();
     }
-    
+
     /**
      * Entry point method for retrieving the singleton instance of this datasource.
-     * 
+     *
      * @return the qpid datasource singleton instance.
      */
-    public static QpidDatasource getInstance() 
+    public static QpidDatasource getInstance()
     {
         return instance;
     }
-    
+
     /**
      * Adds a connection pool to this datasource.
-     * 
+     *
      * @param brokerId the broker identifier that will be associated with the new connection pool.
      * @param connectionData the broker connection data.
      * @throws Exception when the pool cannot be created.
@@ -285,20 +221,9 @@
                 false);
         ObjectPool pool = factory.createPool();
 
-        // Open connections at startup according to initial capacity param value.
-        int howManyConnectionAtStartup = connectionData.getInitialPoolCapacity(); 
-        Object [] openStartupList = new Object[howManyConnectionAtStartup];
-        
-        // Open...
-        for (int index  = 0; index < howManyConnectionAtStartup; index++)
-        {
-            openStartupList[index] = pool.borrowObject();
-        }
-        
-        // ...and immediately return them to pool. In this way the pooled connection has been opened.
-        for (int index = 0; index < howManyConnectionAtStartup; index++)
+        for (int i  = 0; i < connectionData.getInitialPoolCapacity(); i++)
         {
-            pool.returnObject(openStartupList[index]);
+            pool.returnObject(pool.borrowObject());
         }
 
         pools.put(brokerId,pool);

Modified: incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidClass.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidClass.java?rev=703989&r1=703988&r2=703989&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidClass.java (original)
+++ incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/model/QpidClass.java Mon Oct 13 02:29:54 2008
@@ -739,7 +739,7 @@
         try
         {
             _service.connect();
-            _service.requestSchema(_parent.getName(), _name, _hash);
+           // _service.requestSchema(_parent.getName(), _name, _hash);
             _service.sync();
         } finally
         {
@@ -770,7 +770,7 @@
             
             int sequenceNumber = SequenceNumberGenerator.getNextSequenceNumber();
             _methodInvocationListener.operationIsGoingToBeInvoked(new InvocationEvent(this,sequenceNumber,_exchangeChannelForMethodInvocations));
-            _service.invoke(_parent.getName(), _name, _hash,objectId,parameters, method,sequenceNumber);
+           // _service.invoke(_parent.getName(), _name, _hash,objectId,parameters, method,sequenceNumber);
             
             // TODO : Shoudl be configurable?
             InvocationResult result = _exchangeChannelForMethodInvocations.poll(5000,TimeUnit.MILLISECONDS);

Modified: incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java?rev=703989&r1=703988&r2=703989&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java (original)
+++ incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java Mon Oct 13 02:29:54 2008
@@ -20,70 +20,131 @@
  */
 package org.apache.qpid.management.domain.services;
 
-import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.qpid.QpidException;
-import org.apache.qpid.api.Message;
+import org.apache.qpid.management.Constants;
 import org.apache.qpid.management.Names;
+import org.apache.qpid.management.configuration.Configuration;
 import org.apache.qpid.management.configuration.QpidDatasource;
-import org.apache.qpid.management.domain.model.QpidMethod;
-import org.apache.qpid.management.domain.model.type.Binary;
-import org.apache.qpid.management.messages.MethodInvocationRequestMessage;
-import org.apache.qpid.management.messages.SchemaRequestMessage;
-import org.apache.qpid.nclient.Connection;
-import org.apache.qpid.nclient.Session;
 import org.apache.qpid.nclient.util.MessageListener;
 import org.apache.qpid.nclient.util.MessagePartListenerAdapter;
+import org.apache.qpid.transport.Connection;
 import org.apache.qpid.transport.MessageAcceptMode;
 import org.apache.qpid.transport.MessageAcquireMode;
 import org.apache.qpid.transport.MessageCreditUnit;
+import org.apache.qpid.transport.MessageTransfer;
 import org.apache.qpid.transport.Option;
+import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.SessionListener;
 import org.apache.qpid.transport.util.Logger;
 
 /**
  * Qpid Broker facade.
- * 
+ *
  * @author Andrea Gazzarini
  */
-public class QpidService
+public class QpidService implements SessionListener
 {
     private final static Logger LOGGER = Logger.get(QpidService.class);
-    
+
+    // Inner static class used for logging and avoid conditional logic (isDebugEnabled()) duplication.
+    private static class Log
+    {
+        /**
+         * Logs the content f the message.
+         * This will be written on log only if DEBUG level is enabled.
+         *
+         * @param messageContent the raw content of the message.
+         */
+        static void logMessageContent(byte [] messageContent)
+        {
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug(
+                        "<QMAN-200001> : Message has been sent to management exchange. Message content : %s",
+                        Arrays.toString(messageContent));
+            }
+        }
+
+        /**
+         * Logs the content f the message.
+         * This will be written on log only if DEBUG level is enabled.
+         *
+         * @param messageContent the raw content of the message.
+         */
+        static void logMessageContent(ByteBuffer messageContent)
+        {
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug(
+                        "<QMAN-200002> : Message has been sent to management exchange.");
+            }
+        }
+    }
+
     private UUID _brokerId;
     private Connection _connection;
     private Session _session;
-    
+    private Map<String,MessagePartListenerAdapter> _listeners;
+
     /**
      * Builds a new service with the given connection data.
-     * 
+     *
      * @param connectionData the connection data of the broker.
      */
-    public QpidService(UUID brokerId) 
+    public QpidService(UUID brokerId)
     {
         this._brokerId = brokerId;
     }
-    
+
     /**
      * Estabilishes a connection with the broker.
-     * 
+     *
      * @throws QpidException in case of connection failure.
      */
     public void connect() throws Exception
     {
         _connection = QpidDatasource.getInstance().getConnection(_brokerId);
-        _session = _connection.createSession(0);
+        _listeners = new ConcurrentHashMap<String,MessagePartListenerAdapter>();
+        _session = _connection.createSession(Constants.NO_EXPIRATION);
+        _session.setSessionListener(this);
+    }
+
+    public void opened(Session ssn) {}
+
+    public void message(Session ssn, MessageTransfer xfr)
+    {
+        MessagePartListenerAdapter l = _listeners.get(xfr.getDestination());
+        if (l == null)
+        {
+            LOGGER.error("unhandled message: %s", xfr);
+        }
+        else
+        {
+            l.messageTransfer(xfr);
+        }
+    }
+
+    public void exception(Session ssn, SessionException exc)
+    {
+        LOGGER.error(exc, "session %s exception", ssn);
     }
-    
+
+    public void closed(Session ssn) {}
+
     /**
-     * All the previously entered outstanding commands are asynchronous. 
+     * All the previously entered outstanding commands are asynchronous.
      * Synchronous behavior is achieved through invoking this  method.
      */
-    public void sync() 
+    public void sync()
     {
         _session.sync();
     }
-    
+
     /**
      * Closes communication with broker.
      */
@@ -91,51 +152,52 @@
     {
         try
         {
-            
             _session.close();
             _session = null;
+            _listeners = null;
         } catch (Exception e)
         {
         }
         try
         {
-            _connection.close();        
+            _connection.close();
             _connection = null;
         } catch (Exception e)
         {
         }
     }
-    
+
     /**
      * Associate a message listener with a destination therefore creating a new subscription.
-     * 
+     *
      * @param queueName The name of the queue that the subscriber is receiving messages from.
      * @param destinationName the name of the destination, or delivery tag, for the subscriber.
-     * @param listener the listener for this destination. 
-     * 
+     * @param listener the listener for this destination.
+     *
      * @see Session#messageSubscribe(String, String, short, short, org.apache.qpid.nclient.MessagePartListener, java.util.Map, org.apache.qpid.transport.Option...)
      */
-    public void createSubscription(String queueName, String destinationName,MessageListener listener)
+    public void createSubscription(String queueName, String destinationName, MessageListener listener)
     {
-        _session.messageSubscribe(
-                queueName,
-                destinationName,
-                Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
-                Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
-                new MessagePartListenerAdapter(listener), null);
-        
-        _session.messageFlow(destinationName, MessageCreditUnit.BYTE, Session.MESSAGE_FLOW_MAX_BYTES);
-        _session.messageFlow(destinationName, MessageCreditUnit.MESSAGE, Integer.MAX_VALUE);
-        
+        _listeners.put(destinationName, new MessagePartListenerAdapter(listener));
+        _session.messageSubscribe
+            (queueName,
+             destinationName,
+             MessageAcceptMode.NONE,
+             MessageAcquireMode.PRE_ACQUIRED,
+             null, 0, null);
+
+        _session.messageFlow(destinationName, MessageCreditUnit.BYTE, Session.UNLIMITED_CREDIT);
+        _session.messageFlow(destinationName, MessageCreditUnit.MESSAGE, Session.UNLIMITED_CREDIT);
+
         LOGGER.debug(
-                "<QMAN-200003> : New subscription between queue %s and destination %s has been declared.", 
+                "<QMAN-200003> : New subscription between queue %s and destination %s has been declared.",
                 queueName,
                 destinationName);
     }
-    
+
     /**
      * Removes a previously declared consumer from the broker.
-     * 
+     *
      * @param destinationName the name of the destination, or delivery tag, for the subscriber.
      * @see Session#messageCancel(String, Option...)
      */
@@ -143,10 +205,10 @@
     {
         _session.messageCancel(destinationName);
         LOGGER.debug(
-                "<QMAN-200026> : Subscription named %s has been removed from remote broker.", 
+                "<QMAN-200026> : Subscription named %s has been removed from remote broker.",
                 destinationName);
-    }    
-    
+    }
+
     /**
      * Declares a queue on the broker with the given name.
      *
@@ -170,27 +232,27 @@
         _session.queueDelete(queueName);
         LOGGER.debug("<QMAN-2000025> : Queue with name %s has been removed.",queueName);
     }
-    
+
     /**
      * Binds (on the broker) a queue with an exchange.
      *
-     * @param queueName the name of the queue to bind. 
+     * @param queueName the name of the queue to bind.
      * @param exchangeName the exchange name.
-     * @param routingKey the routing key used for the binding. 
+     * @param routingKey the routing key used for the binding.
      * @see Session#exchangeBind(String, String, String, java.util.Map, Option...)
      */
     public void declareBinding(String queueName, String exchangeName, String routingKey)
     {
         _session.exchangeBind(queueName, exchangeName, routingKey, null);
         LOGGER.debug(
-                "<QMAN-200005> : New binding with %s as routing key has been declared between queue %s and exchange %s.", 
+                "<QMAN-200005> : New binding with %s as routing key has been declared between queue %s and exchange %s.",
                 routingKey,queueName,
                 exchangeName);
     }
-    
+
     /**
      * Removes a previously declare binding between an exchange and a queue.
-     * 
+     *
      * @param queueName the name of the queue.
      * @param exchangeName the name of the exchange.
      * @param routingKey the routing key used for binding.
@@ -199,141 +261,42 @@
     {
         _session.exchangeUnbind(queueName, exchangeName, routingKey);
         LOGGER.debug(
-                "<QMAN-200005> : Binding with %s as routing key has been removed between queue %s and exchange %s.", 
+                "<QMAN-200005> : Binding with %s as routing key has been removed between queue %s and exchange %s.",
                 routingKey,queueName,
                 exchangeName);
     }
-    
+
     /**
-     * Sends a command message.
-     * 
-     * @param message the command message.
-     * @throws IOException when the message cannot be sent.
+     * Sends a command message with the given data on the management queue.
+     *
+     * @param messageData the command message content.
      */
-    public void sendMessage(Message message) throws IOException 
+    public void sendCommandMessage(byte [] messageData)
     {
         _session.messageTransfer(
                 Names.MANAGEMENT_EXCHANGE,
-                message,
-                MessageAcceptMode.EXPLICIT.getValue(),
-                MessageAcquireMode.PRE_ACQUIRED.getValue());
-    }    
-    
-    /**
-     * Requests a schema for the given package.class.hash.
-     * 
-     * @param packageName the package name.
-     * @param className the class name.
-     * @param schemaHash the schema hash.
-     * @throws IOException when the schema request cannot be sent.
-     */
-    public void requestSchema(final String packageName, final String className, final Binary schemaHash) throws IOException
-    {
-        Message message = new SchemaRequestMessage()
-        {
-            @Override
-            protected String className ()
-            {
-                return className;
-            }
+                MessageAcceptMode.EXPLICIT,
+                MessageAcquireMode.PRE_ACQUIRED,
+                Configuration.getInstance().getCommandMessageHeader(),
+                messageData);
 
-            @Override
-            protected String packageName ()
-            {
-                return packageName;
-            }
+        Log.logMessageContent (messageData);
+    }
 
-            @Override
-            protected Binary schemaHash ()
-            {
-                return schemaHash;
-            }
-        };
-        
-        sendMessage(message);
-    }
-    
-    /**
-     * Invokes an operation on a broker object instance.
-     * 
-     * @param packageName the package name.
-     * @param className the class name.
-     * @param schemaHash the schema hash of the corresponding class.
-     * @param objectId the object instance identifier.
-     * @param parameters the parameters for this invocation.
-     * @param method the method (definition) invoked.
-     * @return the sequence number used for this message.
-     * @throws MethodInvocationException when the invoked method returns an error code.
-     * @throws UnableToComplyException when it wasn't possibile to invoke the requested operation. 
-     */
-    public void invoke(
-            final String packageName, 
-            final String className, 
-            final Binary schemaHash, 
-            final Binary objectId, 
-            final Object[] parameters, 
-            final QpidMethod method,
-            final int sequenceNumber) throws MethodInvocationException, UnableToComplyException 
+    /**
+     * Sends a command message with the given data on the management queue.
+     *
+     * @param messageData the command message content.
+     */
+    public void sendCommandMessage(ByteBuffer messageData)
     {
-        Message message = new MethodInvocationRequestMessage()
-        {
-            
-            @Override
-            protected int sequenceNumber ()
-            {
-                return sequenceNumber;
-            }
-            
-            protected Binary objectId() {
-                return objectId;
-            }
-            
-            protected String packageName()
-            {
-                return packageName;
-            }
-            
-            protected String className() 
-            {
-                return className;
-            }
-            
-            @Override
-            protected QpidMethod method ()
-            {
-                return method;
-            }
-
-            @Override
-            protected Object[] parameters ()
-            {
-                return parameters;
-            }
+        _session.messageTransfer(
+                Names.MANAGEMENT_EXCHANGE,
+                MessageAcceptMode.EXPLICIT,
+                MessageAcquireMode.PRE_ACQUIRED,
+                Configuration.getInstance().getCommandMessageHeader(),
+                messageData);
 
-            @Override
-            protected Binary schemaHash ()
-            {
-                return schemaHash;
-            }
-        };
-        
-        try {
-            sendMessage(message);
-            sync();
-//            ReturnValueObject invocationResult = Configuration.getInstance()._resultExchangeChannel.poll(2000,TimeUnit.MILLISECONDS);
-//            if (invocationResult == null) {
-//                return null;
-//            }
-//            if (invocationResult.isException()) 
-//            {
-//                invocationResult.createAndThrowException();
-//            }   
-//            return invocationResult;
-//        } catch(MethodInvocationException exception) 
-//        {
-//            throw exception;
-        } catch(Exception exception) {
-            throw new UnableToComplyException(exception);
-        }
-    }  
+        Log.logMessageContent (messageData);
+    }
 }
\ No newline at end of file