You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/06/27 23:13:27 UTC
svn commit: r1687962 [2/4] - in /qpid/java/trunk:
bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/
broker-codegen/src/main/java/org/apache/qpid/server/model/validation/
broker-core/src/main/java/org/apache/qpid/server/connection/ b...
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java Sat Jun 27 21:13:25 2015
@@ -48,13 +48,14 @@ import org.apache.qpid.server.model.Stat
import org.apache.qpid.server.model.VirtualHostAlias;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.transport.AMQPConnection;
+import org.apache.qpid.server.transport.AbstractAMQPConnection;
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.virtualhost.*;
@@ -502,7 +503,7 @@ class RedirectingVirtualHostImpl
}
@Override
- public boolean authoriseCreateConnection(final AMQConnectionModel<?, ?> connection)
+ public boolean authoriseCreateConnection(final AMQPConnection<?> connection)
{
return false;
}
@@ -549,13 +550,13 @@ class RedirectingVirtualHostImpl
}
@Override
- public void registerConnection(final Connection<?> connection)
+ public void registerConnection(final AMQPConnection<?> connection)
{
throwUnsupportedForRedirector();
}
@Override
- public void deregisterConnection(final Connection<?> connection)
+ public void deregisterConnection(final AMQPConnection<?> connection)
{
throwUnsupportedForRedirector();
}
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/connection/ConnectionVersionValidatorTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/connection/ConnectionVersionValidatorTest.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/connection/ConnectionVersionValidatorTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/connection/ConnectionVersionValidatorTest.java Sat Jun 27 21:13:25 2015
@@ -40,14 +40,14 @@ import org.apache.qpid.server.logging.Lo
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.test.utils.QpidTestCase;
public class ConnectionVersionValidatorTest extends QpidTestCase
{
private VirtualHost _virtualHostMock;
- private AMQConnectionModel _connectionMock;
+ private AMQPConnection _connectionMock;
private EventLogger _eventLoggerMock;
private ConnectionVersionValidator _connectionValidator;
@@ -57,7 +57,7 @@ public class ConnectionVersionValidatorT
_connectionValidator = new ConnectionVersionValidator();
_virtualHostMock = mock(VirtualHost.class);
- _connectionMock = mock(AMQConnectionModel.class);
+ _connectionMock = mock(AMQPConnection.class);
_eventLoggerMock = mock(EventLogger.class);
Broker brokerMock = mock(Broker.class);
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java Sat Jun 27 21:13:25 2015
@@ -24,15 +24,21 @@ package org.apache.qpid.server.consumer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.lang.reflect.Type;
import java.net.SocketAddress;
+import java.security.AccessControlException;
import java.security.Principal;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
@@ -43,20 +49,23 @@ import org.apache.qpid.server.logging.Lo
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.BrokerModel;
+import org.apache.qpid.server.model.ConfigurationChangeListener;
+import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.Consumer;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Model;
import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.ConsumerListener;
-import org.apache.qpid.server.transport.ProtocolEngine;
-import org.apache.qpid.server.protocol.SessionModelListener;
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
+import org.apache.qpid.server.transport.AMQPConnection;
+import org.apache.qpid.server.transport.AbstractAMQPConnection;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.transport.NetworkConnectionScheduler;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.StateChangeListener;
@@ -314,7 +323,7 @@ public class MockConsumer implements Con
}
- private static class MockSessionModel implements AMQSessionModel
+ private static class MockSessionModel implements AMQSessionModel<MockSessionModel>
{
private final UUID _id = UUID.randomUUID();
private Session _modelObject;
@@ -338,9 +347,9 @@ public class MockConsumer implements Con
}
@Override
- public AMQConnectionModel getConnectionModel()
+ public AMQPConnection<?> getAMQPConnection()
{
- return new MockConnectionModel();
+ return null;
}
@Override
@@ -499,11 +508,6 @@ public class MockConsumer implements Con
}
- @Override
- public int compareTo(final Object o)
- {
- return 0;
- }
@Override
public void transportStateChanged()
@@ -528,9 +532,15 @@ public class MockConsumer implements Con
{
}
+
+ @Override
+ public int compareTo(final AMQSessionModel o)
+ {
+ return 0;
+ }
}
- private static class MockConnectionModel implements AMQConnectionModel
+ private static class MockConnectionModel implements AMQPConnection<MockConnectionModel>
{
@Override
@@ -544,81 +554,112 @@ public class MockConsumer implements Con
}
@Override
- public StatisticsCounter getMessageDeliveryStatistics()
+ public void closeAsync(AMQConstant cause, String message)
+ {
+ }
+
+ @Override
+ public void closeSessionAsync(AMQSessionModel<?> session, AMQConstant cause,
+ String message)
+ {
+ }
+
+ @Override
+ public long getConnectionId()
+ {
+ return 0;
+ }
+
+ @Override
+ public void block()
{
- return null;
}
@Override
- public StatisticsCounter getMessageReceiptStatistics()
+ public void unblock()
+ {
+
+ }
+
+ @Override
+ public String getRemoteAddressString()
+ {
+ return "remoteAddress:1234";
+ }
+
+ public SocketAddress getRemoteSocketAddress()
{
return null;
}
@Override
- public StatisticsCounter getDataDeliveryStatistics()
+ public String getClientId()
{
return null;
}
@Override
- public StatisticsCounter getDataReceiptStatistics()
+ public String getRemoteContainerName()
{
return null;
}
@Override
- public void resetStatistics()
+ public void notifyWork()
{
}
@Override
- public void closeAsync(AMQConstant cause, String message)
+ public boolean isMessageAssignmentSuspended()
{
+ return false;
}
@Override
- public void closeSessionAsync(AMQSessionModel session, AMQConstant cause,
- String message)
+ public boolean hasSessionWithName(final byte[] name)
{
+ return false;
}
@Override
- public long getConnectionId()
+ public void setScheduler(final NetworkConnectionScheduler networkConnectionScheduler)
{
- return 0;
+
}
@Override
- public List<AMQSessionModel> getSessionModels()
+ public String getClientVersion()
{
return null;
}
@Override
- public void block()
+ public boolean isIncoming()
{
+ return false;
}
@Override
- public void unblock()
+ public String getLocalAddress()
{
+ return null;
}
@Override
- public LogSubject getLogSubject()
+ public String getPrincipal()
{
return null;
}
@Override
- public String getRemoteAddressString()
+ public String getRemoteAddress()
{
- return "remoteAddress:1234";
+ return null;
}
- public SocketAddress getRemoteAddress()
+ @Override
+ public String getRemoteProcessName()
{
return null;
}
@@ -630,130 +671,395 @@ public class MockConsumer implements Con
}
@Override
- public String getClientId()
+ public long getSessionCountLimit()
+ {
+ return 0;
+ }
+
+ @Override
+ public Principal getAuthorizedPrincipal()
{
return null;
}
@Override
- public String getRemoteContainerName()
+ public AmqpPort<?> getPort()
{
return null;
}
@Override
- public void addSessionListener(final SessionModelListener listener)
+ public long getBytesIn()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getBytesOut()
{
+ return 0;
+ }
+ @Override
+ public long getMessagesIn()
+ {
+ return 0;
}
@Override
- public void removeSessionListener(final SessionModelListener listener)
+ public long getMessagesOut()
{
+ return 0;
+ }
+ @Override
+ public long getLastIoTime()
+ {
+ return 0;
}
@Override
- public void notifyWork()
+ public int getSessionCount()
{
+ return 0;
+ }
+ @Override
+ public Collection<Session> getSessions()
+ {
+ return null;
}
@Override
- public boolean isMessageAssignmentSuspended()
+ public AbstractAMQPConnection<?> getUnderlyingConnection()
+ {
+ return null;
+ }
+
+ @Override
+ public Transport getTransport()
+ {
+ return null;
+ }
+
+ @Override
+ public boolean isConnectionStopped()
{
return false;
}
@Override
- public ProtocolEngine getProtocolEngine()
+ public String getVirtualHostName()
{
return null;
}
@Override
- public void setScheduler(final NetworkConnectionScheduler networkConnectionScheduler)
+ public VirtualHost<?, ?, ?> getVirtualHost()
+ {
+ return null;
+ }
+
+ @Override
+ public void addDeleteTask(final Action task)
{
}
@Override
- public String getClientVersion()
+ public void removeDeleteTask(final Action task)
+ {
+
+ }
+
+
+ @Override
+ public UUID getId()
{
return null;
}
@Override
- public String getClientProduct()
+ public String getName()
{
return null;
}
@Override
- public Principal getAuthorizedPrincipal()
+ public String getDescription()
{
return null;
}
@Override
- public long getSessionCountLimit()
+ public String getType()
+ {
+ return null;
+ }
+
+ @Override
+ public Map<String, String> getContext()
+ {
+ return null;
+ }
+
+ @Override
+ public <T> T getContextValue(final Class<T> clazz, final String propertyName)
+ {
+ return null;
+ }
+
+ @Override
+ public <T> T getContextValue(final Class<T> clazz, final Type t, final String propertyName)
+ {
+ return null;
+ }
+
+ @Override
+ public Set<String> getContextKeys(final boolean excludeSystem)
+ {
+ return null;
+ }
+
+ @Override
+ public String getLastUpdatedBy()
+ {
+ return null;
+ }
+
+ @Override
+ public long getLastUpdatedTime()
{
return 0;
}
@Override
- public long getLastIoTime()
+ public String getCreatedBy()
+ {
+ return null;
+ }
+
+ @Override
+ public long getCreatedTime()
{
return 0;
}
@Override
- public AmqpPort<?> getPort()
+ public org.apache.qpid.server.model.State getDesiredState()
{
return null;
}
@Override
- public Transport getTransport()
+ public org.apache.qpid.server.model.State getState()
{
return null;
}
@Override
- public void stop()
+ public void addChangeListener(final ConfigurationChangeListener listener)
{
+
}
@Override
- public boolean isStopped()
+ public boolean removeChangeListener(final ConfigurationChangeListener listener)
{
return false;
}
@Override
- public String getVirtualHostName()
+ public <T extends ConfiguredObject> T getParent(final Class<T> clazz)
{
return null;
}
@Override
- public VirtualHost<?, ?, ?> getVirtualHost()
+ public boolean isDurable()
+ {
+ return false;
+ }
+
+ @Override
+ public LifetimePolicy getLifetimePolicy()
{
return null;
}
@Override
- public void addDeleteTask(final Action task)
+ public Collection<String> getAttributeNames()
{
+ return null;
+ }
+ @Override
+ public Object getAttribute(final String name)
+ {
+ return null;
}
@Override
- public void removeDeleteTask(final Action task)
+ public Map<String, Object> getActualAttributes()
+ {
+ return null;
+ }
+
+ @Override
+ public Object setAttribute(final String name, final Object expected, final Object desired)
+ throws IllegalStateException, AccessControlException, IllegalArgumentException
+ {
+ return null;
+ }
+
+ @Override
+ public Map<String, Number> getStatistics()
+ {
+ return null;
+ }
+
+ @Override
+ public <C extends ConfiguredObject> Collection<C> getChildren(final Class<C> clazz)
+ {
+ return null;
+ }
+
+ @Override
+ public <C extends ConfiguredObject> C getChildById(final Class<C> clazz, final UUID id)
+ {
+ return null;
+ }
+
+ @Override
+ public <C extends ConfiguredObject> C getChildByName(final Class<C> clazz, final String name)
+ {
+ return null;
+ }
+
+ @Override
+ public <C extends ConfiguredObject> C createChild(final Class<C> childClass,
+ final Map<String, Object> attributes,
+ final ConfiguredObject... otherParents)
+ {
+ return null;
+ }
+
+ @Override
+ public <C extends ConfiguredObject> ListenableFuture<C> createChildAsync(final Class<C> childClass,
+ final Map<String, Object> attributes,
+ final ConfiguredObject... otherParents)
+ {
+ return null;
+ }
+
+ @Override
+ public void setAttributes(final Map<String, Object> attributes)
+ throws IllegalStateException, AccessControlException, IllegalArgumentException
{
}
+ @Override
+ public ListenableFuture<Void> setAttributesAsync(final Map<String, Object> attributes)
+ throws IllegalStateException, AccessControlException, IllegalArgumentException
+ {
+ return null;
+ }
+
+ @Override
+ public Class<? extends ConfiguredObject> getCategoryClass()
+ {
+ return null;
+ }
+ @Override
+ public Class<? extends ConfiguredObject> getTypeClass()
+ {
+ return null;
+ }
+
+ @Override
+ public boolean managesChildStorage()
+ {
+ return false;
+ }
+
+ @Override
+ public <C extends ConfiguredObject<C>> C findConfiguredObject(final Class<C> clazz, final String name)
+ {
+ return null;
+ }
+
+ @Override
+ public ConfiguredObjectRecord asObjectRecord()
+ {
+ return null;
+ }
+
+ @Override
+ public void open()
+ {
+
+ }
+
+ @Override
+ public ListenableFuture<Void> openAsync()
+ {
+ return null;
+ }
+
+ @Override
+ public void close()
+ {
+
+ }
+
+ @Override
+ public ListenableFuture<Void> closeAsync()
+ {
+ return null;
+ }
+
+ @Override
+ public ListenableFuture<Void> deleteAsync()
+ {
+ return null;
+ }
+
+ @Override
+ public TaskExecutor getTaskExecutor()
+ {
+ return null;
+ }
+
+ @Override
+ public TaskExecutor getChildExecutor()
+ {
+ return null;
+ }
+
+ @Override
+ public ConfiguredObjectFactory getObjectFactory()
+ {
+ return null;
+ }
+
+ @Override
+ public Model getModel()
+ {
+ return null;
+ }
+
+ @Override
+ public void delete()
+ {
+
+ }
+
+ @Override
+ public void decryptSecrets()
+ {
+
+ }
}
}
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/CompositeFilterTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/CompositeFilterTest.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/CompositeFilterTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/CompositeFilterTest.java Sat Jun 27 21:13:25 2015
@@ -27,8 +27,6 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.util.Arrays;
-
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.filter.Filter;
import ch.qos.logback.core.spi.FilterReply;
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java Sat Jun 27 21:13:25 2015
@@ -21,13 +21,13 @@
package org.apache.qpid.server.logging.actors;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public abstract class BaseConnectionActorTestCase extends BaseActorTestCase
{
- private AMQConnectionModel _session;
+ private AMQPConnection<?> _connection;
private VirtualHostImpl _virtualHost;
@Override
@@ -35,7 +35,7 @@ public abstract class BaseConnectionActo
{
super.setUp();
BrokerTestHelper.setUp();
- _session = BrokerTestHelper.createConnection();
+ _connection = BrokerTestHelper.createConnection();
_virtualHost = BrokerTestHelper.createVirtualHost("test");
}
@@ -53,9 +53,9 @@ public abstract class BaseConnectionActo
{
_virtualHost.close();
}
- if (_session != null)
+ if (_connection != null)
{
- _session.closeAsync(AMQConstant.CONNECTION_FORCED, "");
+ _connection.closeAsync(AMQConstant.CONNECTION_FORCED, "");
}
}
finally
@@ -65,9 +65,9 @@ public abstract class BaseConnectionActo
}
}
- public AMQConnectionModel getConnection()
+ public AMQPConnection<?> getConnection()
{
- return _session;
+ return _connection;
}
}
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java Sat Jun 27 21:13:25 2015
@@ -38,7 +38,7 @@ public class ChannelLogSubjectTest exten
super.setUp();
AMQSessionModel session = mock(AMQSessionModel.class);
- when(session.getConnectionModel()).thenReturn(getConnection());
+ when(session.getAMQPConnection()).thenReturn(getConnection());
when(session.getChannelId()).thenReturn(_channelID);
_subject = new ChannelLogSubject(session);
}
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java Sat Jun 27 21:13:25 2015
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.logging.subjects;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.transport.AMQPConnection;
import java.security.Principal;
@@ -38,7 +38,7 @@ public class ConnectionLogSubjectTest ex
private static final String IP_STRING = "127.0.0.1:1";
private static final String VHOST = "test";
- private AMQConnectionModel _connection;
+ private AMQPConnection _connection;
@Override
public void setUp() throws Exception
@@ -48,7 +48,7 @@ public class ConnectionLogSubjectTest ex
final Principal principal = mock(Principal.class);
when(principal.getName()).thenReturn(USER);
- _connection = mock(AMQConnectionModel.class);
+ _connection = mock(AMQPConnection.class);
when(_connection.getConnectionId()).thenReturn(CONNECTION_ID);
when(_connection.getAuthorizedPrincipal()).thenReturn(principal);
when(_connection.getRemoteAddressString()).thenReturn("/"+IP_STRING);
@@ -66,7 +66,7 @@ public class ConnectionLogSubjectTest ex
verifyConnection(CONNECTION_ID, USER, IP_STRING, VHOST, message);
}
- public AMQConnectionModel getConnection()
+ public AMQPConnection getConnection()
{
return _connection;
}
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java Sat Jun 27 21:13:25 2015
@@ -53,12 +53,12 @@ import org.apache.qpid.protocol.AMQConst
import org.apache.qpid.server.configuration.store.StoreConfigurationChangeListener;
import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
-import org.apache.qpid.server.model.adapter.ConnectionAdapter;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.server.transport.AMQPConnection;
+import org.apache.qpid.server.transport.AbstractAMQPConnection;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.TestMemoryVirtualHost;
@@ -230,10 +230,10 @@ public class VirtualHostTest extends Qpi
VirtualHost<?, ?, ?> virtualHost = createVirtualHost(virtualHostName);
assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState());
- AMQConnectionModel connection = createMockProtocolConnection(virtualHost);
+ AbstractAMQPConnection connection = createMockProtocolConnection(virtualHost);
assertEquals("Unexpected number of connections before connection registered", 0, virtualHost.getConnectionCount());
- Connection modelConnection = mock(Connection.class);
+ AMQPConnection modelConnection = mock(AMQPConnection.class);
when(modelConnection.getUnderlyingConnection()).thenReturn(connection);
when(modelConnection.closeAsync()).thenReturn(Futures.immediateFuture(null));
virtualHost.registerConnection(modelConnection);
@@ -257,12 +257,12 @@ public class VirtualHostTest extends Qpi
VirtualHost<?, ?, ?> virtualHost = createVirtualHost(virtualHostName);
assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState());
- AMQConnectionModel connection = createMockProtocolConnection(virtualHost);
+ AbstractAMQPConnection connection = createMockProtocolConnection(virtualHost);
assertEquals("Unexpected number of connections before connection registered",
0,
virtualHost.getConnectionCount());
- Connection modelConnection = mock(Connection.class);
+ AMQPConnection modelConnection = mock(AMQPConnection.class);
when(modelConnection.getUnderlyingConnection()).thenReturn(connection);
virtualHost.registerConnection(modelConnection);
@@ -397,9 +397,9 @@ public class VirtualHostTest extends Qpi
return host;
}
- private AMQConnectionModel createMockProtocolConnection(final VirtualHost<?, ?, ?> virtualHost)
+ private AbstractAMQPConnection createMockProtocolConnection(final VirtualHost<?, ?, ?> virtualHost)
{
- final AMQConnectionModel connection = mock(AMQConnectionModel.class);
+ final AbstractAMQPConnection connection = mock(AbstractAMQPConnection.class);
final List<Action<?>> tasks = new ArrayList<>();
final ArgumentCaptor<Action> deleteTaskCaptor = ArgumentCaptor.forClass(Action.class);
Answer answer = new Answer()
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/SecurityManagerTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/SecurityManagerTest.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/SecurityManagerTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/SecurityManagerTest.java Sat Jun 27 21:13:25 2015
@@ -59,13 +59,13 @@ import org.apache.qpid.server.model.Virt
import org.apache.qpid.server.model.VirtualHostLogger;
import org.apache.qpid.server.model.VirtualHostLoggerFilter;
import org.apache.qpid.server.model.VirtualHostNode;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.queue.QueueConsumer;
import org.apache.qpid.server.security.access.ObjectProperties;
import org.apache.qpid.server.security.access.ObjectProperties.Property;
import org.apache.qpid.server.security.access.ObjectType;
import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.security.access.OperationLoggingDetails;
+import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.test.utils.QpidTestCase;
public class SecurityManagerTest extends QpidTestCase
@@ -190,7 +190,7 @@ public class SecurityManagerTest extends
public void testAuthoriseCreateConnection()
{
- AMQConnectionModel<?,?> connection = mock(AMQConnectionModel.class);
+ AMQPConnection<?> connection = mock(AMQPConnection.class);
when(connection.getVirtualHostName()).thenReturn(TEST_VIRTUAL_HOST);
ObjectProperties properties = new ObjectProperties();
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java Sat Jun 27 21:13:25 2015
@@ -43,12 +43,12 @@ import org.apache.qpid.server.model.Syst
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
import org.apache.qpid.server.virtualhost.QueueExistsException;
import org.apache.qpid.server.virtualhost.TestMemoryVirtualHost;
@@ -181,34 +181,34 @@ public class BrokerTestHelper
return createVirtualHost(attributes, broker, defaultVHN);
}
- public static AMQSessionModel<?,?> createSession(int channelId, AMQConnectionModel<?,?> connection)
+ public static AMQSessionModel<?> createSession(int channelId, AMQPConnection<?> connection)
{
@SuppressWarnings("rawtypes")
AMQSessionModel session = mock(AMQSessionModel.class);
- when(session.getConnectionModel()).thenReturn(connection);
+ when(session.getAMQPConnection()).thenReturn(connection);
when(session.getChannelId()).thenReturn(channelId);
return session;
}
- public static AMQSessionModel<?,?> createSession(int channelId) throws Exception
+ public static AMQSessionModel<?> createSession(int channelId) throws Exception
{
- AMQConnectionModel<?,?> session = createConnection();
+ AMQPConnection<?> session = createConnection();
return createSession(channelId, session);
}
- public static AMQSessionModel<?,?> createSession() throws Exception
+ public static AMQSessionModel<?> createSession() throws Exception
{
return createSession(1);
}
- public static AMQConnectionModel<?,?> createConnection() throws Exception
+ public static AMQPConnection<?> createConnection() throws Exception
{
return createConnection("test");
}
- public static AMQConnectionModel<?,?> createConnection(String hostName) throws Exception
+ public static AMQPConnection<?> createConnection(String hostName) throws Exception
{
- return mock(AMQConnectionModel.class);
+ return mock(AMQPConnection.class);
}
public static ExchangeImpl<?> createExchange(String hostName, final boolean durable, final EventLogger eventLogger) throws Exception
Modified: qpid/java/trunk/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControl.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControl.java (original)
+++ qpid/java/trunk/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControl.java Sat Jun 27 21:13:25 2015
@@ -175,7 +175,7 @@ public class DefaultAccessControl implem
Set<ConnectionPrincipal> principals = subject.getPrincipals(ConnectionPrincipal.class);
if(!principals.isEmpty())
{
- SocketAddress address = principals.iterator().next().getConnection().getRemoteAddress();
+ SocketAddress address = principals.iterator().next().getConnection().getRemoteSocketAddress();
if(address instanceof InetSocketAddress)
{
addressOfClient = ((InetSocketAddress) address).getAddress();
Modified: qpid/java/trunk/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControlTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControlTest.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControlTest.java (original)
+++ qpid/java/trunk/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControlTest.java Sat Jun 27 21:13:25 2015
@@ -35,7 +35,6 @@ import org.apache.qpid.server.connection
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.EventLoggerProvider;
import org.apache.qpid.server.logging.UnitTestMessageLogger;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.security.Result;
import org.apache.qpid.server.security.access.ObjectProperties;
import org.apache.qpid.server.security.access.ObjectType;
@@ -44,6 +43,7 @@ import org.apache.qpid.server.security.a
import org.apache.qpid.server.security.access.config.Rule;
import org.apache.qpid.server.security.access.config.RuleSet;
import org.apache.qpid.server.security.auth.TestPrincipalUtils;
+import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.test.utils.QpidTestCase;
/**
@@ -239,8 +239,8 @@ public class DefaultAccessControlTest ex
final InetAddress inetAddress = InetAddress.getLocalHost();
final InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, 1);
- AMQConnectionModel connectionModel = mock(AMQConnectionModel.class);
- when(connectionModel.getRemoteAddress()).thenReturn(inetSocketAddress);
+ AMQPConnection connectionModel = mock(AMQPConnection.class);
+ when(connectionModel.getRemoteSocketAddress()).thenReturn(inetSocketAddress);
subject.getPrincipals().add(new ConnectionPrincipal(connectionModel));
@@ -269,8 +269,8 @@ public class DefaultAccessControlTest ex
final InetAddress inetAddress = InetAddress.getLocalHost();
final InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, 1);
- AMQConnectionModel connectionModel = mock(AMQConnectionModel.class);
- when(connectionModel.getRemoteAddress()).thenReturn(inetSocketAddress);
+ AMQPConnection connectionModel = mock(AMQPConnection.class);
+ when(connectionModel.getRemoteSocketAddress()).thenReturn(inetSocketAddress);
subject.getPrincipals().add(new ConnectionPrincipal(connectionModel));
Added: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java?rev=1687962&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java Sat Jun 27 21:13:25 2015
@@ -0,0 +1,371 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_10;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.security.PrivilegedAction;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.security.auth.Subject;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.security.SubjectCreator;
+import org.apache.qpid.server.transport.AbstractAMQPConnection;
+import org.apache.qpid.server.transport.ProtocolEngine;
+import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.logging.messages.ConnectionMessages;
+import org.apache.qpid.server.model.Consumer;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.transport.ConnectionDelegate;
+import org.apache.qpid.transport.Constant;
+import org.apache.qpid.transport.network.AggregateTicker;
+import org.apache.qpid.transport.network.InputHandler;
+import org.apache.qpid.transport.network.NetworkConnection;
+
+
+public class AMQPConnection_0_10 extends AbstractAMQPConnection<AMQPConnection_0_10>
+{
+ private static final Logger _logger = LoggerFactory.getLogger(AMQPConnection_0_10.class);
+ private final InputHandler _inputHandler;
+
+
+ private final NetworkConnection _network;
+ private ServerConnection _connection;
+
+ private long _createTime = System.currentTimeMillis();
+ private volatile long _lastReadTime = _createTime;
+ private volatile long _lastWriteTime = _createTime;
+ private volatile boolean _transportBlockedForWriting;
+
+ private final AtomicReference<Thread> _messageAssignmentSuspended = new AtomicReference<>();
+
+ private final AtomicBoolean _stateChanged = new AtomicBoolean();
+ private final AtomicReference<Action<ProtocolEngine>> _workListener = new AtomicReference<>();
+
+
+ public AMQPConnection_0_10(final Broker<?> broker,
+ NetworkConnection network,
+ final AmqpPort<?> port,
+ final Transport transport,
+ final long id,
+ final AggregateTicker aggregateTicker)
+ {
+ super(broker, network, port, transport, id, aggregateTicker);
+
+ _connection = new ServerConnection(id, broker, port, transport);
+ _connection.setAmqpConnection(this);
+ SocketAddress address = network.getLocalAddress();
+ String fqdn = null;
+
+ if (address instanceof InetSocketAddress)
+ {
+ fqdn = ((InetSocketAddress) address).getHostName();
+ }
+ SubjectCreator subjectCreator = port.getAuthenticationProvider().getSubjectCreator(transport.isSecure());
+ ConnectionDelegate connDelegate = new ServerConnectionDelegate(broker, fqdn, subjectCreator);
+
+ _connection.setConnectionDelegate(connDelegate);
+ _connection.setRemoteAddress(network.getRemoteAddress());
+ _connection.setLocalAddress(network.getLocalAddress());
+
+ _inputHandler = new InputHandler(new ServerAssembler(_connection));
+ _network = network;
+
+ Subject.doAs(getSubject(), new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ _connection.getEventLogger().message(ConnectionMessages.OPEN(null, null, null, null, false, false, false, false));
+
+ _connection.setNetworkConnection(_network);
+ ServerDisassembler disassembler = new ServerDisassembler(wrapSender(_network.getSender()), Constant.MIN_MAX_FRAME_SIZE);
+ _connection.setSender(disassembler);
+ _connection.addFrameSizeObserver(disassembler);
+ // FIXME Two log messages to maintain compatibility with earlier protocol versions
+ _connection.getEventLogger().message(ConnectionMessages.OPEN(null, "0-10", null, null, false, true, false, false));
+
+ return null;
+ }
+ });
+ }
+
+ @Override
+ public boolean isMessageAssignmentSuspended()
+ {
+ Thread lock = _messageAssignmentSuspended.get();
+ return lock != null && _messageAssignmentSuspended.get() != Thread.currentThread();
+ }
+
+ @Override
+ public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended)
+ {
+ _messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread() : null);
+
+ for(AMQSessionModel<?> session : _connection.getSessionModels())
+ {
+ for (Consumer<?> consumer : session.getConsumers())
+ {
+ ConsumerImpl consumerImpl = (ConsumerImpl) consumer;
+ if (!messageAssignmentSuspended)
+ {
+ consumerImpl.getTarget().notifyCurrentState();
+ }
+ else
+ {
+ // ensure that by the time the method returns, no consumer can be in the process of
+ // delivering a message.
+ consumerImpl.getSendLock();
+ consumerImpl.releaseSendLock();
+ }
+ }
+ }
+ }
+
+
+ private ByteBufferSender wrapSender(final ByteBufferSender sender)
+ {
+ return new ByteBufferSender()
+ {
+ @Override
+ public void send(ByteBuffer msg)
+ {
+ _lastWriteTime = System.currentTimeMillis();
+ sender.send(msg);
+ }
+
+ @Override
+ public void flush()
+ {
+ sender.flush();
+
+ }
+
+ @Override
+ public void close()
+ {
+ sender.close();
+ }
+ };
+ }
+
+ @Override
+ public long getLastReadTime()
+ {
+ return _lastReadTime;
+ }
+
+ @Override
+ public long getLastWriteTime()
+ {
+ return _lastWriteTime;
+ }
+
+ public void received(final ByteBuffer buf)
+ {
+ Subject.doAs(_connection.getAuthorizedSubject(), new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ _lastReadTime = System.currentTimeMillis();
+ if (_connection.getAuthorizedPrincipal() == null &&
+ (_lastReadTime - _createTime) > _connection.getPort().getContextValue(Long.class,
+ Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY))
+ {
+
+ _logger.warn("Connection has taken more than "
+ + _connection.getPort()
+ .getContextValue(Long.class, Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY)
+ + "ms to establish identity. Closing as possible DoS.");
+ _connection.getEventLogger().message(ConnectionMessages.IDLE_CLOSE());
+ _network.close();
+
+ }
+ _inputHandler.received(buf);
+ _connection.receivedComplete();
+ return null;
+ }
+ });
+ }
+
+ @Override
+ public void encryptedTransport()
+ {
+ }
+
+ public void writerIdle()
+ {
+ _connection.doHeartBeat();
+ }
+
+ public void readerIdle()
+ {
+ Subject.doAs(_connection.getAuthorizedSubject(), new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ _connection.getEventLogger().message(ConnectionMessages.IDLE_CLOSE());
+ _network.close();
+ return null;
+ }
+ });
+
+ }
+
+ public String getAddress()
+ {
+ return _network.getRemoteAddress().toString();
+ }
+
+ @Override
+ public void closed()
+ {
+ _inputHandler.closed();
+ }
+
+ @Override
+ protected void performDeleteTasks()
+ {
+ super.performDeleteTasks();
+ }
+
+ @Override
+ public boolean isTransportBlockedForWriting()
+ {
+ return _transportBlockedForWriting;
+ }
+
+ @Override
+ public void setTransportBlockedForWriting(final boolean blocked)
+ {
+ _transportBlockedForWriting = blocked;
+ _connection.transportStateChanged();
+ }
+
+ @Override
+ public void processPending()
+ {
+ _connection.processPending();
+ }
+
+ @Override
+ public boolean hasWork()
+ {
+ return _stateChanged.get();
+ }
+
+ @Override
+ public void notifyWork()
+ {
+ _stateChanged.set(true);
+
+ final Action<ProtocolEngine> listener = _workListener.get();
+ if(listener != null)
+ {
+ listener.performAction(this);
+ }
+ }
+
+ public void clearWork()
+ {
+ _stateChanged.set(false);
+ }
+
+ public void setWorkListener(final Action<ProtocolEngine> listener)
+ {
+ _workListener.set(listener);
+ }
+
+ public boolean hasSessionWithName(final byte[] name)
+ {
+ return _connection.hasSessionWithName(name);
+ }
+
+ public void closeAsync(final AMQConstant cause, final String message)
+ {
+ _connection.closeAsync(cause, message);
+ }
+
+ public Principal getAuthorizedPrincipal()
+ {
+ return _connection.getAuthorizedPrincipal();
+ }
+
+ public void closeSessionAsync(final AMQSessionModel<?> session,
+ final AMQConstant cause, final String message)
+ {
+ _connection.closeSessionAsync((ServerSession)session, cause, message);
+ }
+
+ public void block()
+ {
+ _connection.block();
+ }
+
+ public String getRemoteContainerName()
+ {
+ return _connection.getRemoteContainerName();
+ }
+
+ public VirtualHost<?, ?, ?> getVirtualHost()
+ {
+ return _connection.getVirtualHost();
+ }
+
+ public List<ServerSession> getSessionModels()
+ {
+ return _connection.getSessionModels();
+ }
+
+ public void unblock()
+ {
+ _connection.unblock();
+ }
+
+ public LogSubject getLogSubject()
+ {
+ return _connection.getLogSubject();
+ }
+
+ public long getSessionCountLimit()
+ {
+ return _connection.getSessionCountLimit();
+ }
+
+}
Propchange: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
------------------------------------------------------------------------------
svn:executable = *
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Sat Jun 27 21:13:25 2015
@@ -107,7 +107,7 @@ public class ConsumerTarget_0_10 extends
@Override
public boolean doIsSuspended()
{
- return getState()!=State.ACTIVE || _deleted.get() || _session.isClosing() || _session.getConnectionModel().isStopped(); // TODO check for Session suspension
+ return getState()!=State.ACTIVE || _deleted.get() || _session.isClosing() || _session.getAMQPConnection().isConnectionStopped(); // TODO check for Session suspension
}
public boolean close()
@@ -559,10 +559,10 @@ public class ConsumerTarget_0_10 extends
switch(flowMode)
{
case CREDIT:
- _creditManager = new CreditCreditManager(0l, 0l, _session.getConnection().getProtocolEngine());
+ _creditManager = new CreditCreditManager(0l, 0l, _session.getConnection().getAmqpConnection());
break;
case WINDOW:
- _creditManager = new WindowCreditManager(0l, 0l, _session.getConnection().getProtocolEngine());
+ _creditManager = new WindowCreditManager(0l, 0l, _session.getConnection().getAmqpConnection());
break;
default:
// this should never happen, as 0-10 is finalised and so the enum should never change
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java Sat Jun 27 21:13:25 2015
@@ -20,9 +20,6 @@
*/
package org.apache.qpid.server.protocol.v0_10;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-
import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
@@ -30,8 +27,6 @@ import org.apache.qpid.server.model.Tran
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.PluggableService;
import org.apache.qpid.server.plugin.ProtocolEngineCreator;
-import org.apache.qpid.server.security.SubjectCreator;
-import org.apache.qpid.transport.ConnectionDelegate;
import org.apache.qpid.transport.network.AggregateTicker;
import org.apache.qpid.transport.network.NetworkConnection;
@@ -72,25 +67,11 @@ public class ProtocolEngineCreator_0_10
Transport transport,
long id, final AggregateTicker aggregateTicker)
{
- String fqdn = null;
- SocketAddress address = network.getLocalAddress();
- if (address instanceof InetSocketAddress)
- {
- fqdn = ((InetSocketAddress) address).getHostName();
- }
- SubjectCreator subjectCreator = port.getAuthenticationProvider().getSubjectCreator(transport.isSecure());
- ConnectionDelegate connDelegate = new ServerConnectionDelegate(broker, fqdn, subjectCreator);
-
- ServerConnection conn = new ServerConnection(id, broker, port, transport);
-
- conn.setConnectionDelegate(connDelegate);
- conn.setRemoteAddress(network.getRemoteAddress());
- conn.setLocalAddress(network.getLocalAddress());
-
- ProtocolEngine_0_10 protocolEngine = new ProtocolEngine_0_10(conn, network, aggregateTicker);
- conn.setProtocolEngine(protocolEngine);
- return protocolEngine;
+ final AMQPConnection_0_10 protocolEngine_0_10 =
+ new AMQPConnection_0_10(broker, network, port, transport, id, aggregateTicker);
+ protocolEngine_0_10.create();
+ return protocolEngine_0_10;
}
@Override
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Sat Jun 27 21:13:25 2015
@@ -34,7 +34,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -44,23 +43,16 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.model.adapter.ConnectionAdapter;
import org.apache.qpid.server.protocol.ConnectionClosingTicker;
-import org.apache.qpid.server.transport.ProtocolEngine;
-import org.apache.qpid.server.connection.ConnectionPrincipal;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.protocol.SessionModelListener;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
-import org.apache.qpid.server.stats.StatisticsCounter;
-import org.apache.qpid.server.transport.NetworkConnectionScheduler;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -75,7 +67,7 @@ import org.apache.qpid.transport.Option;
import org.apache.qpid.transport.ProtocolEvent;
import org.apache.qpid.transport.Session;
-public class ServerConnection extends Connection implements AMQConnectionModel<ServerConnection, ServerSession>,
+public class ServerConnection extends Connection implements //AMQConnectionModel<ServerConnection, ServerSession>,
LogSubject, AuthorizationHolder
{
private static final Logger LOGGER = LoggerFactory.getLogger(ServerConnection.class);
@@ -83,9 +75,7 @@ public class ServerConnection extends Co
private final Broker<?> _broker;
private AtomicBoolean _logClosed = new AtomicBoolean(false);
- private final Subject _authorizedSubject = new Subject();
private Principal _authorizedPrincipal = null;
- private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
private final long _connectionId;
private final Object _reference = new Object();
private VirtualHostImpl<?,?,?> _virtualHost;
@@ -94,23 +84,15 @@ public class ServerConnection extends Co
private boolean _blocking;
private final Transport _transport;
- private final CopyOnWriteArrayList<Action<? super ServerConnection>> _connectionCloseTaskList =
- new CopyOnWriteArrayList<Action<? super ServerConnection>>();
-
private final Queue<Action<? super ServerConnection>> _asyncTaskList =
new ConcurrentLinkedQueue<>();
- private final CopyOnWriteArrayList<SessionModelListener> _sessionListeners =
- new CopyOnWriteArrayList<SessionModelListener>();
-
- private volatile boolean _stopped;
private int _messageCompressionThreshold;
private final int _maxMessageSize;
- private ProtocolEngine_0_10 _protocolEngine;
+ private AMQPConnection_0_10 _amqpConnection;
private boolean _ignoreFutureInput;
private boolean _ignoreAllButConnectionCloseOk;
- private ConnectionAdapter _adapter;
public ServerConnection(final long connectionId,
Broker<?> broker,
@@ -118,7 +100,6 @@ public class ServerConnection extends Co
final Transport transport)
{
_connectionId = connectionId;
- _authorizedSubject.getPrincipals().add(new ConnectionPrincipal(this));
_broker = broker;
_port = port;
@@ -127,13 +108,6 @@ public class ServerConnection extends Co
int maxMessageSize = port.getContextValue(Integer.class, AmqpPort.PORT_MAX_MESSAGE_SIZE);
_maxMessageSize = (maxMessageSize > 0) ? maxMessageSize : Integer.MAX_VALUE;
-
- _messagesDelivered = new StatisticsCounter("messages-delivered-" + getConnectionId());
- _dataDelivered = new StatisticsCounter("data-delivered-" + getConnectionId());
- _messagesReceived = new StatisticsCounter("messages-received-" + getConnectionId());
- _dataReceived = new StatisticsCounter("data-received-" + getConnectionId());
- _adapter = new ConnectionAdapter(this);
- _adapter.create();
}
public Object getReference()
@@ -177,19 +151,19 @@ public class ServerConnection extends Co
true,
true));
- _adapter.virtualHostAssociated();
+ _amqpConnection.virtualHostAssociated();
}
if (state == State.CLOSE_RCVD || state == State.CLOSED || state == State.CLOSING)
{
if(_virtualHost != null)
{
- _virtualHost.deregisterConnection(_adapter);
+ _virtualHost.deregisterConnection(_amqpConnection);
}
}
if(state == State.CLOSING)
{
- getProtocolEngine().getAggregateTicker().addTicker(new ConnectionClosingTicker(System.currentTimeMillis() + CLOSE_OK_TIMEOUT, getNetworkConnection()));
+ getAmqpConnection().getAggregateTicker().addTicker(new ConnectionClosingTicker(System.currentTimeMillis() + CLOSE_OK_TIMEOUT, getNetworkConnection()));
}
if (state == State.CLOSED)
{
@@ -211,26 +185,14 @@ public class ServerConnection extends Co
return (ServerConnectionDelegate) super.getConnectionDelegate();
}
- public void setConnectionDelegate(ServerConnectionDelegate delegate)
- {
- super.setConnectionDelegate(delegate);
- }
-
- @Override
- public ProtocolEngine getProtocolEngine()
- {
- return _protocolEngine;
- }
-
- @Override
- public void setScheduler(final NetworkConnectionScheduler networkConnectionScheduler)
+ public AMQPConnection_0_10 getAmqpConnection()
{
- _protocolEngine.setScheduler(networkConnectionScheduler);
+ return _amqpConnection;
}
- public void setProtocolEngine(final ProtocolEngine_0_10 serverProtocolEngine)
+ public void setAmqpConnection(final AMQPConnection_0_10 serverProtocolEngine)
{
- _protocolEngine = serverProtocolEngine;
+ _amqpConnection = serverProtocolEngine;
}
public VirtualHostImpl<?,?,?> getVirtualHost()
@@ -249,39 +211,19 @@ public class ServerConnection extends Co
{
_messageCompressionThreshold = Integer.MAX_VALUE;
}
- _authorizedSubject.getPrincipals().add(_virtualHost.getPrincipal());
+ _amqpConnection.getSubject().getPrincipals().add(_virtualHost.getPrincipal());
}
- @Override
- public String getVirtualHostName()
- {
- return _virtualHost == null ? null : _virtualHost.getName();
- }
-
- @Override
public AmqpPort<?> getPort()
{
return _port;
}
- @Override
public Transport getTransport()
{
return _transport;
}
- @Override
- public void stop()
- {
- _stopped = true;
- }
-
- @Override
- public boolean isStopped()
- {
- return _stopped;
- }
-
public void closeSessionAsync(final ServerSession session, final AMQConstant cause, final String message)
{
addAsyncTask(new Action<ServerConnection>()
@@ -345,7 +287,7 @@ public class ServerConnection extends Co
Subject subject;
if (event.isConnectionControl())
{
- subject = _authorizedSubject;
+ subject = _amqpConnection.getSubject();
}
else
{
@@ -356,7 +298,7 @@ public class ServerConnection extends Co
}
else
{
- subject = _authorizedSubject;
+ subject = _amqpConnection.getSubject();
}
}
@@ -446,10 +388,7 @@ public class ServerConnection extends Co
protected void performDeleteTasks()
{
- for(Action<? super ServerConnection> task : _connectionCloseTaskList)
- {
- task.performAction(this);
- }
+ _amqpConnection.performDeleteTasks();
}
public synchronized void block()
@@ -480,7 +419,7 @@ public class ServerConnection extends Co
public synchronized void registerSession(final Session ssn)
{
super.registerSession(ssn);
- sessionAdded((ServerSession)ssn);
+ _amqpConnection.sessionAdded((ServerSession) ssn);
if(_blocking)
{
((ServerSession)ssn).block();
@@ -490,13 +429,13 @@ public class ServerConnection extends Co
@Override
public synchronized void removeSession(final Session ssn)
{
- sessionRemoved((ServerSession)ssn);
+ _amqpConnection.sessionRemoved((ServerSession) ssn);
super.removeSession(ssn);
}
public List<ServerSession> getSessionModels()
{
- List<ServerSession> sessions = new ArrayList<ServerSession>();
+ List<ServerSession> sessions = new ArrayList<>();
for (Session ssn : getChannels())
{
sessions.add((ServerSession) ssn);
@@ -504,54 +443,13 @@ public class ServerConnection extends Co
return sessions;
}
- public void registerMessageDelivered(long messageSize)
- {
- _messagesDelivered.registerEvent(1L);
- _dataDelivered.registerEvent(messageSize);
- _virtualHost.registerMessageDelivered(messageSize);
- }
-
- public void registerMessageReceived(long messageSize, long timestamp)
- {
- _messagesReceived.registerEvent(1L, timestamp);
- _dataReceived.registerEvent(messageSize, timestamp);
- _virtualHost.registerMessageReceived(messageSize, timestamp);
- }
-
- public StatisticsCounter getMessageReceiptStatistics()
- {
- return _messagesReceived;
- }
-
- public StatisticsCounter getDataReceiptStatistics()
- {
- return _dataReceived;
- }
-
- public StatisticsCounter getMessageDeliveryStatistics()
- {
- return _messagesDelivered;
- }
-
- public StatisticsCounter getDataDeliveryStatistics()
- {
- return _dataDelivered;
- }
-
- public void resetStatistics()
- {
- _messagesDelivered.reset();
- _dataDelivered.reset();
- _messagesReceived.reset();
- _dataReceived.reset();
- }
/**
* @return authorizedSubject
*/
public Subject getAuthorizedSubject()
{
- return _authorizedSubject;
+ return _amqpConnection.getSubject();
}
/**
@@ -568,7 +466,7 @@ public class ServerConnection extends Co
}
else
{
- _authorizedSubject.getPrincipals().addAll(authorizedSubject.getPrincipals());
+ getAuthorizedSubject().getPrincipals().addAll(authorizedSubject.getPrincipals());
_authorizedPrincipal = AuthenticatedPrincipal.getAuthenticatedPrincipalFromSubject(authorizedSubject);
}
@@ -586,13 +484,7 @@ public class ServerConnection extends Co
public String getRemoteAddressString()
{
- return String.valueOf(getRemoteAddress());
- }
-
- @Override
- public String getRemoteProcessPid()
- {
- return getConnectionDelegate().getRemoteProcessPid();
+ return String.valueOf(getRemoteSocketAddress());
}
@Override
@@ -636,59 +528,22 @@ public class ServerConnection extends Co
super.send(event);
}
- public long getLastIoTime()
- {
- return _lastIoTime.longValue();
- }
-
- @Override
public String getClientId()
{
return getConnectionDelegate().getClientId();
}
- @Override
public String getRemoteContainerName()
{
return getConnectionDelegate().getClientId();
}
- @Override
- public void addSessionListener(final SessionModelListener listener)
- {
- _sessionListeners.add(listener);
- }
-
- @Override
- public void removeSessionListener(final SessionModelListener listener)
- {
- _sessionListeners.remove(listener);
- }
-
- private void sessionAdded(final AMQSessionModel<?,?> session)
- {
- for(SessionModelListener l : _sessionListeners)
- {
- l.sessionAdded(session);
- }
- }
- private void sessionRemoved(final AMQSessionModel<?,?> session)
- {
- for(SessionModelListener l : _sessionListeners)
- {
- l.sessionRemoved(session);
- }
- }
-
-
- @Override
public String getClientVersion()
{
return getConnectionDelegate().getClientVersion();
}
- @Override
public String getClientProduct()
{
return getConnectionDelegate().getClientProduct();
@@ -721,24 +576,12 @@ public class ServerConnection extends Co
super.doHeartBeat();
}
- @Override
- public void addDeleteTask(final Action<? super ServerConnection> task)
- {
- _connectionCloseTaskList.add(task);
- }
-
private void addAsyncTask(final Action<ServerConnection> action)
{
_asyncTaskList.add(action);
notifyWork();
}
- @Override
- public void removeDeleteTask(final Action<? super ServerConnection> task)
- {
- _connectionCloseTaskList.remove(task);
- }
-
public int getMessageCompressionThreshold()
{
return _messageCompressionThreshold;
@@ -757,26 +600,18 @@ public class ServerConnection extends Co
}
}
- @Override
public void notifyWork()
{
- _protocolEngine.notifyWork();
- }
-
-
- @Override
- public boolean isMessageAssignmentSuspended()
- {
- return _protocolEngine.isMessageAssignmentSuspended();
+ _amqpConnection.notifyWork();
}
public void processPending()
{
- List<? extends AMQSessionModel<?,?>> sessionsWithPending = new ArrayList<>(getSessionModels());
+ List<? extends AMQSessionModel<?>> sessionsWithPending = new ArrayList<>(getSessionModels());
while(!sessionsWithPending.isEmpty())
{
- final Iterator<? extends AMQSessionModel<?, ?>> iter = sessionsWithPending.iterator();
- AMQSessionModel<?, ?> session;
+ final Iterator<? extends AMQSessionModel<?>> iter = sessionsWithPending.iterator();
+ AMQSessionModel<?> session;
while(iter.hasNext())
{
session = iter.next();
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java Sat Jun 27 21:13:25 2015
@@ -45,10 +45,10 @@ import org.apache.qpid.server.configurat
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
+import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.transport.*;
import org.apache.qpid.transport.network.NetworkConnection;
@@ -211,7 +211,7 @@ public class ServerConnectionDelegate ex
sconn.setVirtualHost(vhost);
try
{
- if(!vhost.authoriseCreateConnection(sconn))
+ if(!vhost.authoriseCreateConnection(sconn.getAmqpConnection()))
{
sconn.setState(Connection.State.CLOSING);
sconn.sendConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Connection not authorized");
@@ -367,23 +367,19 @@ public class ServerConnectionDelegate ex
final Principal authorizedPrincipal = sconn.getAuthorizedPrincipal();
final String userId = authorizedPrincipal == null ? "" : authorizedPrincipal.getName();
- final Iterator<org.apache.qpid.server.model.Connection<?>> connections =
+ final Iterator<? extends org.apache.qpid.server.model.Connection<?>> connections =
((ServerConnection)conn).getVirtualHost().getConnections().iterator();
while(connections.hasNext())
{
final org.apache.qpid.server.model.Connection<?> modelConnnection = connections.next();
- final AMQConnectionModel amqConnectionModel = modelConnnection.getUnderlyingConnection();
- if (amqConnectionModel instanceof ServerConnection)
- {
- ServerConnection otherConnection = (ServerConnection)amqConnectionModel;
+ final AMQPConnection<?> amqConnectionModel = modelConnnection.getUnderlyingConnection();
- final String userName = amqConnectionModel.getAuthorizedPrincipal() == null
- ? ""
- : amqConnectionModel.getAuthorizedPrincipal().getName();
- if (userId.equals(userName) && otherConnection.hasSessionWithName(name))
- {
- return false;
- }
+ final String userName = amqConnectionModel.getAuthorizedPrincipal() == null
+ ? ""
+ : amqConnectionModel.getAuthorizedPrincipal().getName();
+ if (userId.equals(userName) && amqConnectionModel.hasSessionWithName(name))
+ {
+ return false;
}
}
return true;
@@ -402,10 +398,20 @@ public class ServerConnectionDelegate ex
_compressionSupported = Boolean.parseBoolean(String.valueOf(compressionSupported));
}
+ final AMQPConnection_0_10 protocolEngine = ((ServerConnection) conn).getAmqpConnection();
+ protocolEngine.setClientId(getStringClientProperty(ConnectionStartProperties.CLIENT_ID_0_10));
+ protocolEngine.setClientProduct(getStringClientProperty(ConnectionStartProperties.PRODUCT));
+ protocolEngine.setClientVersion(getStringClientProperty(ConnectionStartProperties.VERSION_0_10));
+ protocolEngine.setRemoteProcessPid(getStringClientProperty(ConnectionStartProperties.PID));
}
super.connectionStartOk(conn, ok);
}
+ private String getStringClientProperty(final String name)
+ {
+ return (_clientProperties == null || _clientProperties.get(name) == null) ? null : String.valueOf(_clientProperties.get(name));
+ }
+
public Map<String,Object> getClientProperties()
{
return _clientProperties;
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Sat Jun 27 21:13:25 2015
@@ -75,6 +75,7 @@ import org.apache.qpid.server.queue.AMQQ
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
@@ -113,7 +114,7 @@ import org.apache.qpid.transport.network
public class ServerSession extends Session
implements AuthorizationHolder,
- AMQSessionModel<ServerSession,ServerConnection>, LogSubject, AsyncAutoCommitTransaction.FutureRecorder,
+ AMQSessionModel<ServerSession>, LogSubject, AsyncAutoCommitTransaction.FutureRecorder,
Deletable<ServerSession>
{
@@ -192,7 +193,7 @@ public class ServerSession extends Sessi
@Override
public void doTimeoutAction(String reason)
{
- getConnectionModel().closeSessionAsync(ServerSession.this, AMQConstant.RESOURCE_ERROR, reason);
+ getAMQPConnection().closeSessionAsync(ServerSession.this, AMQConstant.RESOURCE_ERROR, reason);
}
}, getVirtualHost());
@@ -275,7 +276,7 @@ public class ServerSession extends Sessi
message.getInitialRoutingAddress(),
instanceProperties, _transaction, _checkCapacityAction
);
- getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime());
+ getAMQPConnection().registerMessageReceived(message.getSize(), message.getArrivalTime());
incrementOutstandingTxnsIfNecessary();
incrementUncommittedMessageSize(message.getStoredMessage());
return enqueues;
@@ -321,7 +322,7 @@ public class ServerSession extends Sessi
public void sendMessage(MessageTransfer xfr,
Runnable postIdSettingAction)
{
- getConnectionModel().registerMessageDelivered(xfr.getBodySize());
+ getAMQPConnection().registerMessageDelivered(xfr.getBodySize());
invoke(xfr, postIdSettingAction);
}
@@ -787,9 +788,10 @@ public class ServerSession extends Sessi
return _id;
}
- public ServerConnection getConnectionModel()
+ @Override
+ public AMQPConnection<?> getAMQPConnection()
{
- return getConnection();
+ return getConnection().getAmqpConnection();
}
public String getClientID()
@@ -897,7 +899,7 @@ public class ServerSession extends Sessi
? getConnection().getConnectionId()
: -1;
- String remoteAddress = String.valueOf(getConnection().getRemoteAddress());
+ String remoteAddress = String.valueOf(getConnection().getRemoteSocketAddress());
return "[" +
MessageFormat.format(CHANNEL_FORMAT,
connectionId,
@@ -1188,13 +1190,13 @@ public class ServerSession extends Sessi
@Override
public void addTicker(final Ticker ticker)
{
- getConnection().getProtocolEngine().getAggregateTicker().addTicker(ticker);
+ getConnection().getAmqpConnection().getAggregateTicker().addTicker(ticker);
}
@Override
public void removeTicker(final Ticker ticker)
{
- getConnection().getProtocolEngine().getAggregateTicker().removeTicker(ticker);
+ getConnection().getAmqpConnection().getAggregateTicker().removeTicker(ticker);
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Sat Jun 27 21:13:25 2015
@@ -255,7 +255,7 @@ public class ServerSessionDelegate exten
}
else
{
- ProtocolEngine protocolEngine = getServerConnection(session).getProtocolEngine();
+ ProtocolEngine protocolEngine = getServerConnection(session).getAmqpConnection();
FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L, protocolEngine);
FilterManager filterManager = null;
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java?rev=1687962&r1=1687961&r2=1687962&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java Sat Jun 27 21:13:25 2015
@@ -25,7 +25,8 @@ import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.List;
-import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
+import javax.security.auth.Subject;
+
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutorImpl;
import org.apache.qpid.server.model.Broker;
@@ -80,15 +81,17 @@ public class ServerSessionTest extends Q
AmqpPort amqpPort = createMockPort(AmqpPort.DEFAULT_MAX_MESSAGE_SIZE);
ServerConnection connection = new ServerConnection(1, broker, amqpPort, Transport.TCP);
- final ProtocolEngine_0_10 protocolEngine = mock(ProtocolEngine_0_10.class);
- connection.setProtocolEngine(protocolEngine);
+ final AMQPConnection_0_10 protocolEngine = mock(AMQPConnection_0_10.class);
+ Subject subject = new Subject();
+ when(protocolEngine.getSubject()).thenReturn(subject);
+ connection.setAmqpConnection(protocolEngine);
connection.setVirtualHost(_virtualHost);
ServerSession session1 = new ServerSession(connection, new ServerSessionDelegate(),
new Binary(getName().getBytes()), 0);
// create a session with the same name but on a different connection
ServerConnection connection2 = new ServerConnection(2, broker, amqpPort, Transport.TCP);
- connection2.setProtocolEngine(protocolEngine);
+ connection2.setAmqpConnection(protocolEngine);
connection2.setVirtualHost(_virtualHost);
ServerSession session2 = new ServerSession(connection2, new ServerSessionDelegate(),
new Binary(getName().getBytes()), 0);
@@ -105,8 +108,11 @@ public class ServerSessionTest extends Q
AmqpPort port = createMockPort(1024);
ServerConnection connection = new ServerConnection(1, broker, port, Transport.TCP);
- final ProtocolEngine_0_10 protocolEngine = mock(ProtocolEngine_0_10.class);
- connection.setProtocolEngine(protocolEngine);
+ final AMQPConnection_0_10 protocolEngine = mock(AMQPConnection_0_10.class);
+ Subject subject = new Subject();
+ when(protocolEngine.getSubject()).thenReturn(subject);
+
+ connection.setAmqpConnection(protocolEngine);
connection.setVirtualHost(_virtualHost);
final List<Method> invokedMethods = new ArrayList<>();
ServerSession session = new ServerSession(connection, new ServerSessionDelegate(),
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org