You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2012/08/02 17:10:00 UTC
svn commit: r1368514 - in
/qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol:
ProtocolEngine_1_0_0.java ProtocolEngine_1_0_0_SASL.java
v1_0/Connection_1_0.java v1_0/Session_1_0.java v1_0/Subscription_1_0.java
Author: rgodfrey
Date: Thu Aug 2 15:10:00 2012
New Revision: 1368514
URL: http://svn.apache.org/viewvc?rev=1368514&view=rev
Log:
QPID-4183 : [Merge from trunk] Implement Session/ConnectionModel interfaces in AMQP 1.0 code to restore functionality
Modified:
qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java (contents, props changed)
qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java (contents, props changed)
qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
Modified: qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java?rev=1368514&r1=1368513&r2=1368514&view=diff
==============================================================================
--- qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java (original)
+++ qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java Thu Aug 2 15:10:00 2012
@@ -146,7 +146,7 @@ public class ProtocolEngine_1_0_0 implem
_conn = new ConnectionEndpoint(container, asSaslServerProvider(_appRegistry.getAuthenticationManager(
getLocalAddress())));
- _conn.setConnectionEventListener(new Connection_1_0(_appRegistry));
+ _conn.setConnectionEventListener(new Connection_1_0(_appRegistry, _conn, _connectionId));
_conn.setFrameOutputHandler(this);
_conn.setRemoteAddress(_network.getRemoteAddress());
Propchange: qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java:r1368414
Modified: qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java?rev=1368514&r1=1368513&r2=1368514&view=diff
==============================================================================
--- qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java (original)
+++ qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java Thu Aug 2 15:10:00 2012
@@ -166,7 +166,7 @@ public class ProtocolEngine_1_0_0_SASL i
_conn = new ConnectionEndpoint(container, asSaslServerProvider(ApplicationRegistry.getInstance()
.getAuthenticationManager(getLocalAddress())));
- _conn.setConnectionEventListener(new Connection_1_0(_appRegistry));
+ _conn.setConnectionEventListener(new Connection_1_0(_appRegistry, _conn, _connectionId));
_conn.setRemoteAddress(getRemoteAddress());
Propchange: qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java:r1368414
Modified: qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1368514&r1=1368513&r2=1368514&view=diff
==============================================================================
--- qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original)
+++ qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Thu Aug 2 15:10:00 2012
@@ -20,21 +20,35 @@
*/
package org.apache.qpid.server.protocol.v1_0;
+import java.text.MessageFormat;
+import java.util.Collection;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
import org.apache.qpid.amqp_1_0.transport.ConnectionEventListener;
import org.apache.qpid.amqp_1_0.transport.SessionEndpoint;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
+
public class Connection_1_0 implements ConnectionEventListener
{
private IApplicationRegistry _appRegistry;
private VirtualHost _vhost;
+ private final ConnectionEndpoint _conn;
+ private final long _connectionId;
+ private final Collection<Session_1_0> _sessions = Collections.synchronizedCollection(new ArrayList<Session_1_0>());
public static interface Task
@@ -48,18 +62,27 @@ public class Connection_1_0 implements C
- public Connection_1_0(IApplicationRegistry appRegistry)
+ public Connection_1_0(IApplicationRegistry appRegistry, ConnectionEndpoint conn, long connectionId)
{
_appRegistry = appRegistry;
_vhost = _appRegistry.getVirtualHostRegistry().getDefaultVirtualHost();
+ _conn = conn;
+ _connectionId = connectionId;
+ _vhost.getConnectionRegistry().registerConnection(_model);
+
}
public void remoteSessionCreation(SessionEndpoint endpoint)
{
Session_1_0 session = new Session_1_0(_vhost, _appRegistry, this);
+ _sessions.add(session);
endpoint.setSessionEventListener(session);
}
+ void sessionEnded(Session_1_0 session)
+ {
+ _sessions.remove(session);
+ }
void removeConnectionCloseTask(final Task task)
{
@@ -86,6 +109,8 @@ public class Connection_1_0 implements C
{
_closeTasks.clear();
}
+ _vhost.getConnectionRegistry().deregisterConnection(_model);
+
}
@@ -94,5 +119,174 @@ public class Connection_1_0 implements C
closeReceived();
}
+ private final AMQConnectionModel _model = new AMQConnectionModel()
+ {
+ private final StatisticsCounter _messageDeliveryStatistics = new StatisticsCounter();
+ private final StatisticsCounter _messageReceiptStatistics = new StatisticsCounter();
+ private final StatisticsCounter _dataDeliveryStatistics = new StatisticsCounter();
+ private final StatisticsCounter _dataReceiptStatistics = new StatisticsCounter();
+
+ private final LogSubject _logSubject = new LogSubject()
+ {
+ @Override
+ public String toLogString()
+ {
+ return "[" +
+ MessageFormat.format(CONNECTION_FORMAT,
+ getConnectionId(),
+ getClientId(),
+ getRemoteAddressString(),
+ _vhost.getName())
+ + "] ";
+
+ }
+ };
+
+ @Override
+ public void close(AMQConstant cause, String message) throws AMQException
+ {
+ // TODO
+ }
+
+ @Override
+ public void block()
+ {
+ // TODO
+ }
+
+ @Override
+ public void unblock()
+ {
+ // TODO
+ }
+
+ @Override
+ public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
+ {
+ // TODO
+ }
+
+ @Override
+ public long getConnectionId()
+ {
+ return _connectionId;
+ }
+
+ @Override
+ public List<AMQSessionModel> getSessionModels()
+ {
+ return new ArrayList<AMQSessionModel>(_sessions);
+ }
+
+ @Override
+ public LogSubject getLogSubject()
+ {
+ return _logSubject;
+ }
+
+ @Override
+ public String getUserName()
+ {
+ return getPrincipalAsString();
+ }
+
+ @Override
+ public boolean isSessionNameUnique(byte[] name)
+ {
+ return true; // TODO
+ }
+
+ @Override
+ public String getRemoteAddressString()
+ {
+ return String.valueOf(_conn.getRemoteAddress());
+ }
+
+ @Override
+ public String getClientId()
+ {
+ return _conn.getRemoteContainerId();
+ }
+
+ @Override
+ public String getClientVersion()
+ {
+ return ""; //TODO
+ }
+
+ @Override
+ public String getPrincipalAsString()
+ {
+ return String.valueOf(_conn.getUser());
+ }
+
+ @Override
+ public long getSessionCountLimit()
+ {
+ return 0; // TODO
+ }
+
+ @Override
+ public long getLastIoTime()
+ {
+ return 0; // TODO
+ }
+
+ @Override
+ public void initialiseStatistics()
+ {
+ // TODO
+ }
+
+ @Override
+ public void registerMessageReceived(long messageSize, long timestamp)
+ {
+ // TODO
+ }
+
+ @Override
+ public void registerMessageDelivered(long messageSize)
+ {
+ // TODO
+ }
+
+ @Override
+ public StatisticsCounter getMessageDeliveryStatistics()
+ {
+ return _messageDeliveryStatistics;
+ }
+
+ @Override
+ public StatisticsCounter getMessageReceiptStatistics()
+ {
+ return _messageReceiptStatistics;
+ }
+
+ @Override
+ public StatisticsCounter getDataDeliveryStatistics()
+ {
+ return _dataDeliveryStatistics;
+ }
+
+ @Override
+ public StatisticsCounter getDataReceiptStatistics()
+ {
+ return _dataReceiptStatistics;
+ }
+
+ @Override
+ public void resetStatistics()
+ {
+ // TODO
+ }
+
+
+ };
+
+ AMQConnectionModel getModel()
+ {
+ return _model;
+ }
+
}
Modified: qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1368514&r1=1368513&r2=1368514&view=diff
==============================================================================
--- qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Thu Aug 2 15:10:00 2012
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol.v1_0;
+import java.text.MessageFormat;
import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
@@ -35,18 +36,26 @@ import org.apache.qpid.amqp_1_0.type.tra
import org.apache.qpid.amqp_1_0.type.transport.Error;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.transport.ServerConnection;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.*;
-public class Session_1_0 implements SessionEventListener
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
+
+public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSubject
{
private static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
private IApplicationRegistry _appRegistry;
@@ -56,6 +65,7 @@ public class Session_1_0 implements Sess
private final LinkedHashMap<Integer, ServerTransaction> _openTransactions =
new LinkedHashMap<Integer, ServerTransaction>();
private final Connection_1_0 _connection;
+ private UUID _id = UUID.randomUUID();
public Session_1_0(VirtualHost vhost, IApplicationRegistry appRegistry, final Connection_1_0 connection)
@@ -405,6 +415,8 @@ public class Session_1_0 implements Sess
iter.remove();
}
+ _connection.sessionEnded(this);
+
}
Integer binaryToInteger(final Binary txnId)
@@ -443,4 +455,153 @@ public class Session_1_0 implements Sess
public void forceEnd()
{
}
+
+ @Override
+ public UUID getQMFId()
+ {
+ return _id;
+ }
+
+ @Override
+ public AMQConnectionModel getConnectionModel()
+ {
+ return _connection.getModel();
+ }
+
+ @Override
+ public String getClientID()
+ {
+ // TODO
+ return "";
+ }
+
+ @Override
+ public void close() throws AMQException
+ {
+ // TODO - required for AMQSessionModel / management initiated closing
+ }
+
+ @Override
+ public LogSubject getLogSubject()
+ {
+ return this;
+ }
+
+ @Override
+ public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
+ {
+ // TODO - required for AMQSessionModel / long running transaction detection
+ }
+
+ @Override
+ public void block(AMQQueue queue)
+ {
+ // TODO - required for AMQSessionModel / producer side flow control
+ }
+
+ @Override
+ public void unblock(AMQQueue queue)
+ {
+ // TODO - required for AMQSessionModel / producer side flow control
+ }
+
+ @Override
+ public void block()
+ {
+ // TODO - required for AMQSessionModel / producer side flow control
+ }
+
+ @Override
+ public void unblock()
+ {
+ // TODO - required for AMQSessionModel / producer side flow control
+ }
+
+ @Override
+ public boolean getBlocking()
+ {
+ // TODO
+ return false;
+ }
+
+ @Override
+ public boolean onSameConnection(InboundMessage inbound)
+ {
+ // TODO
+ return false;
+ }
+
+ @Override
+ public int getUnacknowledgedMessageCount()
+ {
+ // TODO
+ return 0;
+ }
+
+ @Override
+ public Long getTxnCount()
+ {
+ // TODO
+ return 0l;
+ }
+
+ @Override
+ public Long getTxnStart()
+ {
+ // TODO
+ return 0l;
+ }
+
+ @Override
+ public Long getTxnCommits()
+ {
+ // TODO
+ return 0l;
+ }
+
+ @Override
+ public Long getTxnRejects()
+ {
+ // TODO
+ return 0l;
+ }
+
+ @Override
+ public int getChannelId()
+ {
+ // TODO
+ return 0;
+ }
+
+ @Override
+ public int getConsumerCount()
+ {
+ // TODO
+ return 0;
+ }
+
+ @Override
+ public int compareTo(AMQSessionModel o)
+ {
+ return getQMFId().compareTo(o.getQMFId());
+ }
+
+
+
+ public String toLogString()
+ {
+ long connectionId = getConnectionModel().getConnectionId();
+
+ String remoteAddress = getConnectionModel().getRemoteAddressString();
+
+ return "[" +
+ MessageFormat.format(CHANNEL_FORMAT,
+ connectionId,
+ getClientID(),
+ remoteAddress,
+ _vhost.getName(), // TODO - virtual host
+ 0) // TODO - channel)
+ + "] ";
+ }
+
}
Modified: qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java?rev=1368514&r1=1368513&r2=1368514&view=diff
==============================================================================
--- qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java (original)
+++ qpid/branches/0.18/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java Thu Aug 2 15:10:00 2012
@@ -637,7 +637,7 @@ class Subscription_1_0 implements Subscr
public AMQSessionModel getSessionModel()
{
// TODO
- return null;
+ return getSession();
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org