You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2012/08/02 17:10:00 UTC

svn commit: r1368514 - in /qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol: ProtocolEngine_1_0_0.java ProtocolEngine_1_0_0_SASL.java v1_0/Connection_1_0.java v1_0/Session_1_0.java v1_0/Subscription_1_0.java

Author: rgodfrey
Date: Thu Aug  2 15:10:00 2012
New Revision: 1368514

URL: http://svn.apache.org/viewvc?rev=1368514&view=rev
Log:
QPID-4183 : [Merge from trunk] Implement Session/ConnectionModel interfaces in AMQP 1.0 code to restore functionality

Modified:
    qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java   (contents, props changed)
    qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java   (contents, props changed)
    qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
    qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
    qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java

Modified: qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java?rev=1368514&r1=1368513&r2=1368514&view=diff
==============================================================================
--- qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java (original)
+++ qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java Thu Aug  2 15:10:00 2012
@@ -146,7 +146,7 @@ public class ProtocolEngine_1_0_0 implem
 
         _conn = new ConnectionEndpoint(container, asSaslServerProvider(_appRegistry.getAuthenticationManager(
                 getLocalAddress())));
-        _conn.setConnectionEventListener(new Connection_1_0(_appRegistry));
+        _conn.setConnectionEventListener(new Connection_1_0(_appRegistry, _conn, _connectionId));
         _conn.setFrameOutputHandler(this);
         _conn.setRemoteAddress(_network.getRemoteAddress());
 

Propchange: qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java:r1368414

Modified: qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java?rev=1368514&r1=1368513&r2=1368514&view=diff
==============================================================================
--- qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java (original)
+++ qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java Thu Aug  2 15:10:00 2012
@@ -166,7 +166,7 @@ public class ProtocolEngine_1_0_0_SASL i
 
         _conn = new ConnectionEndpoint(container, asSaslServerProvider(ApplicationRegistry.getInstance()
                 .getAuthenticationManager(getLocalAddress())));
-        _conn.setConnectionEventListener(new Connection_1_0(_appRegistry));
+        _conn.setConnectionEventListener(new Connection_1_0(_appRegistry, _conn, _connectionId));
         _conn.setRemoteAddress(getRemoteAddress());
 
 

Propchange: qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java:r1368414

Modified: qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1368514&r1=1368513&r2=1368514&view=diff
==============================================================================
--- qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original)
+++ qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Thu Aug  2 15:10:00 2012
@@ -20,21 +20,35 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
+import java.text.MessageFormat;
+import java.util.Collection;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
 import org.apache.qpid.amqp_1_0.transport.ConnectionEventListener;
 import org.apache.qpid.amqp_1_0.transport.SessionEndpoint;
 
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
+
 public class Connection_1_0 implements ConnectionEventListener
 {
 
     private IApplicationRegistry _appRegistry;
     private VirtualHost _vhost;
+    private final ConnectionEndpoint _conn;
+    private final long _connectionId;
+    private final Collection<Session_1_0> _sessions = Collections.synchronizedCollection(new ArrayList<Session_1_0>());
 
 
     public static interface Task
@@ -48,18 +62,27 @@ public class Connection_1_0 implements C
 
 
 
-    public Connection_1_0(IApplicationRegistry appRegistry)
+    public Connection_1_0(IApplicationRegistry appRegistry, ConnectionEndpoint conn, long connectionId)
     {
         _appRegistry = appRegistry;
         _vhost = _appRegistry.getVirtualHostRegistry().getDefaultVirtualHost();
+        _conn = conn;
+        _connectionId = connectionId;
+        _vhost.getConnectionRegistry().registerConnection(_model);
+
     }
 
     public void remoteSessionCreation(SessionEndpoint endpoint)
     {
         Session_1_0 session = new Session_1_0(_vhost, _appRegistry, this);
+        _sessions.add(session);
         endpoint.setSessionEventListener(session);
     }
 
+    void sessionEnded(Session_1_0 session)
+    {
+        _sessions.remove(session);
+    }
 
     void removeConnectionCloseTask(final Task task)
     {
@@ -86,6 +109,8 @@ public class Connection_1_0 implements C
         {
             _closeTasks.clear();
         }
+        _vhost.getConnectionRegistry().deregisterConnection(_model);
+
 
     }
 
@@ -94,5 +119,174 @@ public class Connection_1_0 implements C
         closeReceived();
     }
 
+    private final AMQConnectionModel _model = new AMQConnectionModel()
+    {
+        private final StatisticsCounter _messageDeliveryStatistics = new StatisticsCounter();
+        private final StatisticsCounter _messageReceiptStatistics = new StatisticsCounter();
+        private final StatisticsCounter _dataDeliveryStatistics = new StatisticsCounter();
+        private final StatisticsCounter _dataReceiptStatistics = new StatisticsCounter();
+
+        private final LogSubject _logSubject = new LogSubject()
+        {
+            @Override
+            public String toLogString()
+            {
+                return "[" +
+                        MessageFormat.format(CONNECTION_FORMAT,
+                                             getConnectionId(),
+                                             getClientId(),
+                                             getRemoteAddressString(),
+                                             _vhost.getName())
+                     + "] ";
+
+            }
+        };
+
+        @Override
+        public void close(AMQConstant cause, String message) throws AMQException
+        {
+            // TODO
+        }
+
+        @Override
+        public void block()
+        {
+            // TODO
+        }
+
+        @Override
+        public void unblock()
+        {
+            // TODO
+        }
+
+        @Override
+        public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
+        {
+            // TODO
+        }
+
+        @Override
+        public long getConnectionId()
+        {
+            return _connectionId;
+        }
+
+        @Override
+        public List<AMQSessionModel> getSessionModels()
+        {
+            return new ArrayList<AMQSessionModel>(_sessions);
+        }
+
+        @Override
+        public LogSubject getLogSubject()
+        {
+            return _logSubject;
+        }
+
+        @Override
+        public String getUserName()
+        {
+            return getPrincipalAsString();
+        }
+
+        @Override
+        public boolean isSessionNameUnique(byte[] name)
+        {
+            return true;  // TODO
+        }
+
+        @Override
+        public String getRemoteAddressString()
+        {
+            return String.valueOf(_conn.getRemoteAddress());
+        }
+
+        @Override
+        public String getClientId()
+        {
+            return _conn.getRemoteContainerId();
+        }
+
+        @Override
+        public String getClientVersion()
+        {
+            return "";  //TODO
+        }
+
+        @Override
+        public String getPrincipalAsString()
+        {
+            return String.valueOf(_conn.getUser());
+        }
+
+        @Override
+        public long getSessionCountLimit()
+        {
+            return 0;  // TODO
+        }
+
+        @Override
+        public long getLastIoTime()
+        {
+            return 0;  // TODO
+        }
+
+        @Override
+        public void initialiseStatistics()
+        {
+            // TODO
+        }
+
+        @Override
+        public void registerMessageReceived(long messageSize, long timestamp)
+        {
+            // TODO
+        }
+
+        @Override
+        public void registerMessageDelivered(long messageSize)
+        {
+            // TODO
+        }
+
+        @Override
+        public StatisticsCounter getMessageDeliveryStatistics()
+        {
+            return _messageDeliveryStatistics;
+        }
+
+        @Override
+        public StatisticsCounter getMessageReceiptStatistics()
+        {
+            return _messageReceiptStatistics;
+        }
+
+        @Override
+        public StatisticsCounter getDataDeliveryStatistics()
+        {
+            return _dataDeliveryStatistics;
+        }
+
+        @Override
+        public StatisticsCounter getDataReceiptStatistics()
+        {
+            return _dataReceiptStatistics;
+        }
+
+        @Override
+        public void resetStatistics()
+        {
+            // TODO
+        }
+
+
+    };
+
+    AMQConnectionModel getModel()
+    {
+        return _model;
+    }
+
 
 }

Modified: qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1368514&r1=1368513&r2=1368514&view=diff
==============================================================================
--- qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Thu Aug  2 15:10:00 2012
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
+import java.text.MessageFormat;
 import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
 import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
 import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
@@ -35,18 +36,26 @@ import org.apache.qpid.amqp_1_0.type.tra
 import org.apache.qpid.amqp_1_0.type.transport.Error;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.protocol.ProtocolEngine;
 import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.message.InboundMessage;
 import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.transport.ServerConnection;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.*;
 
-public class Session_1_0 implements SessionEventListener
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
+
+public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSubject
 {
     private static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
     private IApplicationRegistry _appRegistry;
@@ -56,6 +65,7 @@ public class Session_1_0 implements Sess
     private final LinkedHashMap<Integer, ServerTransaction> _openTransactions =
             new LinkedHashMap<Integer, ServerTransaction>();
     private final Connection_1_0 _connection;
+    private UUID _id = UUID.randomUUID();
 
 
     public Session_1_0(VirtualHost vhost, IApplicationRegistry appRegistry, final Connection_1_0 connection)
@@ -405,6 +415,8 @@ public class Session_1_0 implements Sess
             iter.remove();
         }
 
+        _connection.sessionEnded(this);
+
     }
 
     Integer binaryToInteger(final Binary txnId)
@@ -443,4 +455,153 @@ public class Session_1_0 implements Sess
     public void forceEnd()
     {
     }
+
+    @Override
+    public UUID getQMFId()
+    {
+        return _id;
+    }
+
+    @Override
+    public AMQConnectionModel getConnectionModel()
+    {
+        return _connection.getModel();
+    }
+
+    @Override
+    public String getClientID()
+    {
+        // TODO
+        return "";
+    }
+
+    @Override
+    public void close() throws AMQException
+    {
+        // TODO - required for AMQSessionModel / management initiated closing
+    }
+
+    @Override
+    public LogSubject getLogSubject()
+    {
+        return this;
+    }
+
+    @Override
+    public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
+    {
+        // TODO - required for AMQSessionModel / long running transaction detection
+    }
+
+    @Override
+    public void block(AMQQueue queue)
+    {
+        // TODO - required for AMQSessionModel / producer side flow control
+    }
+
+    @Override
+    public void unblock(AMQQueue queue)
+    {
+        // TODO - required for AMQSessionModel / producer side flow control
+    }
+
+    @Override
+    public void block()
+    {
+        // TODO - required for AMQSessionModel / producer side flow control
+    }
+
+    @Override
+    public void unblock()
+    {
+        // TODO - required for AMQSessionModel / producer side flow control
+    }
+
+    @Override
+    public boolean getBlocking()
+    {
+        // TODO
+        return false;
+    }
+
+    @Override
+    public boolean onSameConnection(InboundMessage inbound)
+    {
+        // TODO
+        return false;
+    }
+
+    @Override
+    public int getUnacknowledgedMessageCount()
+    {
+        // TODO
+        return 0;
+    }
+
+    @Override
+    public Long getTxnCount()
+    {
+        // TODO
+        return 0l;
+    }
+
+    @Override
+    public Long getTxnStart()
+    {
+        // TODO
+        return 0l;
+    }
+
+    @Override
+    public Long getTxnCommits()
+    {
+        // TODO
+        return 0l;
+    }
+
+    @Override
+    public Long getTxnRejects()
+    {
+        // TODO
+        return 0l;
+    }
+
+    @Override
+    public int getChannelId()
+    {
+        // TODO
+        return 0;
+    }
+
+    @Override
+    public int getConsumerCount()
+    {
+        // TODO
+        return 0;
+    }
+
+    @Override
+    public int compareTo(AMQSessionModel o)
+    {
+        return getQMFId().compareTo(o.getQMFId());
+    }
+
+
+
+    public String toLogString()
+    {
+        long connectionId = getConnectionModel().getConnectionId();
+
+        String remoteAddress = getConnectionModel().getRemoteAddressString();
+
+        return "[" +
+               MessageFormat.format(CHANNEL_FORMAT,
+                                    connectionId,
+                                    getClientID(),
+                                    remoteAddress,
+                                    _vhost.getName(), // TODO - virtual host
+                                    0) // TODO - channel)
+            + "] ";
+    }
+
 }

Modified: qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java?rev=1368514&r1=1368513&r2=1368514&view=diff
==============================================================================
--- qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java (original)
+++ qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java Thu Aug  2 15:10:00 2012
@@ -637,7 +637,7 @@ class Subscription_1_0 implements Subscr
     public AMQSessionModel getSessionModel()
     {
         // TODO
-        return null;
+        return getSession();
     }
 
     @Override



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org