You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/19 16:36:59 UTC
svn commit: r1770505 - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/logging/subjects/
broker-core/src/main/java/org/apache/qpid/server/model/
broker-core/src/main/java/org/apache/qpid/server/transport/
broker-core/src/test/java...
Author: rgodfrey
Date: Sat Nov 19 16:36:59 2016
New Revision: 1770505
URL: http://svn.apache.org/viewvc?rev=1770505&view=rev
Log:
QPID-7531 : Improve AMQP 1.0 implementation
Removed:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/DeliveryStateHandler.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrorHandler.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Predicate.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StateChangeListener.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/AbstractLogSubject.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/FrameOutputHandler.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/AbstractLogSubject.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/AbstractLogSubject.java?rev=1770505&r1=1770504&r2=1770505&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/AbstractLogSubject.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/AbstractLogSubject.java Sat Nov 19 16:36:59 2016
@@ -20,10 +20,10 @@
*/
package org.apache.qpid.server.logging.subjects;
-import org.apache.qpid.server.logging.LogSubject;
-
import java.text.MessageFormat;
+import org.apache.qpid.server.logging.LogSubject;
+
/**
* The LogSubjects all have a similar requirement to format their output and
* provide the String value.
@@ -69,4 +69,10 @@ public abstract class AbstractLogSubject
{
_logString = logString;
}
+
+ @Override
+ public String toString()
+ {
+ return toLogString();
+ }
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java?rev=1770505&r1=1770504&r2=1770505&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java Sat Nov 19 16:36:59 2016
@@ -24,6 +24,7 @@ import java.security.Principal;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageSource;
@@ -36,6 +37,8 @@ import org.apache.qpid.server.txn.DtxReg
public interface NamedAddressSpace extends Named
{
+ UUID getId();
+
MessageSource getAttainedMessageSource(String name);
MessageDestination getAttainedMessageDestination(String name);
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java?rev=1770505&r1=1770504&r2=1770505&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java Sat Nov 19 16:36:59 2016
@@ -72,7 +72,13 @@ public interface AMQPConnection<C extend
boolean hasSessionWithName(byte[] name);
- void sendConnectionCloseAsync(AMQConstant connectionForced, String reason);
+ enum ConnectionCloseReason
+ {
+ MANAGEMENT,
+ TRANSACTION_TIMEOUT
+ }
+
+ void sendConnectionCloseAsync(ConnectionCloseReason reason, String description);
boolean isIOThread();
ListenableFuture<Void> doOnIOThreadAsync(final Runnable task);
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1770505&r1=1770504&r2=1770505&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java Sat Nov 19 16:36:59 2016
@@ -45,7 +45,6 @@ import com.google.common.util.concurrent
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.connection.ConnectionPrincipal;
import org.apache.qpid.server.logging.EventLogger;
@@ -630,7 +629,7 @@ public abstract class AbstractAMQPConnec
{
if (_modelClosing.compareAndSet(false, true))
{
- sendConnectionCloseAsync(AMQConstant.CONNECTION_FORCED, "Connection closed by external action");
+ sendConnectionCloseAsync(ConnectionCloseReason.MANAGEMENT, "Connection closed by external action");
}
return _modelClosedFuture;
}
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java?rev=1770505&r1=1770504&r2=1770505&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java Sat Nov 19 16:36:59 2016
@@ -20,10 +20,9 @@
*/
package org.apache.qpid.server.logging.actors;
-import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.model.BrokerTestHelper;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.transport.AMQPConnection;
-import org.apache.qpid.server.model.BrokerTestHelper;
public abstract class BaseConnectionActorTestCase extends BaseActorTestCase
{
@@ -55,7 +54,7 @@ public abstract class BaseConnectionActo
}
if (_connection != null)
{
- _connection.sendConnectionCloseAsync(AMQConstant.CONNECTION_FORCED, "");
+ _connection.sendConnectionCloseAsync(AMQPConnection.ConnectionCloseReason.MANAGEMENT, "");
}
}
finally
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java?rev=1770505&r1=1770504&r2=1770505&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java Sat Nov 19 16:36:59 2016
@@ -49,7 +49,6 @@ import org.mockito.invocation.Invocation
import org.mockito.stubbing.Answer;
import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.store.StoreConfigurationChangeListener;
import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
@@ -580,7 +579,7 @@ public class VirtualHostTest extends Qpi
}
return null;
}
- }).when(connection).sendConnectionCloseAsync(any(AMQConstant.class), anyString());
+ }).when(connection).sendConnectionCloseAsync(any(AMQPConnection.ConnectionCloseReason.class), anyString());
when(connection.getRemoteAddressString()).thenReturn("peer:1234");
return connection;
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java?rev=1770505&r1=1770504&r2=1770505&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java Sat Nov 19 16:36:59 2016
@@ -37,24 +37,25 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.transport.AbstractAMQPConnection;
+import org.apache.qpid.server.transport.AggregateTicker;
import org.apache.qpid.server.transport.ProtocolEngine;
-import org.apache.qpid.server.logging.messages.ConnectionMessages;
-import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.transport.ServerNetworkConnection;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.transport.ConnectionCloseCode;
import org.apache.qpid.transport.ConnectionDelegate;
import org.apache.qpid.transport.Constant;
-import org.apache.qpid.server.transport.AggregateTicker;
public class AMQPConnection_0_10 extends AbstractAMQPConnection<AMQPConnection_0_10, ServerConnection>
@@ -304,10 +305,13 @@ public class AMQPConnection_0_10 extends
return _connection.hasSessionWithName(name);
}
- public void sendConnectionCloseAsync(final AMQConstant cause, final String message)
+ @Override
+ public void sendConnectionCloseAsync(final ConnectionCloseReason reason, final String description)
{
stopConnection();
- _connection.sendConnectionCloseAsync(cause, message);
+ // Best mapping for all reasons is "forced"
+ _connection.sendConnectionCloseAsync(ConnectionCloseCode.CONNECTION_FORCED, description);
+
}
public void closeSessionAsync(final AMQSessionModel<?> session,
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1770505&r1=1770504&r2=1770505&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Sat Nov 19 16:36:59 2016
@@ -251,7 +251,7 @@ public class ServerConnection extends Co
}
}
- public void sendConnectionCloseAsync(final AMQConstant cause, final String message)
+ void sendConnectionCloseAsync(final ConnectionCloseCode replyCode, final String message)
{
addAsyncTask(new Action<ServerConnection>()
{
@@ -263,15 +263,6 @@ public class ServerConnection extends Co
markAllSessionsClosed();
setState(CLOSING);
- ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL;
- try
- {
- replyCode = ConnectionCloseCode.get(cause.getCode());
- }
- catch (IllegalArgumentException iae)
- {
- // Ignore
- }
sendConnectionClose(replyCode, message);
}
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1770505&r1=1770504&r2=1770505&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Sat Nov 19 16:36:59 2016
@@ -519,7 +519,7 @@ public class ServerSessionDelegate exten
}
catch (VirtualHostUnavailableException e)
{
- getServerConnection(serverSession).sendConnectionCloseAsync(AMQConstant.CONNECTION_FORCED, e.getMessage());
+ getServerConnection(serverSession).sendConnectionCloseAsync(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage());
}
finally
{
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1770505&r1=1770504&r2=1770505&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Sat Nov 19 16:36:59 2016
@@ -104,6 +104,7 @@ import org.apache.qpid.server.store.Mess
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor;
@@ -286,7 +287,7 @@ public class AMQChannel
@Override
public void doTimeoutAction(String reason)
{
- _connection.sendConnectionCloseAsync(AMQConstant.RESOURCE_ERROR, reason);
+ _connection.sendConnectionCloseAsync(AMQPConnection.ConnectionCloseReason.TRANSACTION_TIMEOUT, reason);
}
private void message(final LogMessage message)
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java?rev=1770505&r1=1770504&r2=1770505&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java Sat Nov 19 16:36:59 2016
@@ -545,7 +545,7 @@ public class AMQPConnection_0_8Impl
}
public void sendConnectionClose(AMQConstant errorCode,
- String message, int channelId)
+ String message, int channelId)
{
sendConnectionClose(channelId, new AMQFrame(0, new ConnectionCloseBody(getProtocolVersion(), errorCode.getCode(), AMQShortString.validValueOf(message), _currentClassId, _currentMethodId)));
}
@@ -799,17 +799,29 @@ public class AMQPConnection_0_8Impl
}
@Override
- public void sendConnectionCloseAsync(final AMQConstant cause, final String message)
+ public void sendConnectionCloseAsync(final ConnectionCloseReason reason, final String description)
{
stopConnection();
+ final AMQConstant cause;
+ switch(reason)
+ {
+ case MANAGEMENT:
+ cause = AMQConstant.CONNECTION_FORCED;
+ break;
+ case TRANSACTION_TIMEOUT:
+ cause = AMQConstant.RESOURCE_ERROR;
+ break;
+ default:
+ cause = AMQConstant.INTERNAL_ERROR;
+ }
Action<AMQPConnection_0_8Impl> action = new Action<AMQPConnection_0_8Impl>()
{
@Override
public void performAction(final AMQPConnection_0_8Impl object)
{
- AMQConnectionException e = new AMQConnectionException(cause, message, 0, 0,
- getMethodRegistry(),
- null);
+ AMQConnectionException e = new AMQConnectionException(cause, description, 0, 0,
+ getMethodRegistry(),
+ null);
sendConnectionClose(0, e.getCloseFrame());
}
};
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1770505&r1=1770504&r2=1770505&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java Sat Nov 19 16:36:59 2016
@@ -37,6 +37,7 @@ import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -48,9 +49,20 @@ import javax.security.sasl.SaslServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.common.ServerPropertyNames;
+import org.apache.qpid.configuration.CommonProperties;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.AuthenticationProvider;
+import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.ConnectionClosingTicker;
import org.apache.qpid.server.protocol.v1_0.codec.DescribedTypeConstructorRegistry;
import org.apache.qpid.server.protocol.v1_0.codec.FrameWriter;
import org.apache.qpid.server.protocol.v1_0.codec.ProtocolHandler;
@@ -61,20 +73,10 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v1_0.framing.SASLFrame;
import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.ErrorCondition;
import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.common.ServerPropertyNames;
-import org.apache.qpid.configuration.CommonProperties;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.logging.messages.ConnectionMessages;
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Protocol;
-import org.apache.qpid.server.model.Transport;
-import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.protocol.ConnectionClosingTicker;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
@@ -104,6 +106,7 @@ import org.apache.qpid.server.security.a
import org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerImpl;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.transport.AbstractAMQPConnection;
+import org.apache.qpid.server.transport.AggregateTicker;
import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.transport.ServerNetworkConnection;
import org.apache.qpid.server.util.Action;
@@ -111,12 +114,11 @@ import org.apache.qpid.server.util.Conne
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
import org.apache.qpid.transport.ByteBufferSender;
-import org.apache.qpid.server.transport.AggregateTicker;
+import org.apache.qpid.transport.util.Functions;
public class AMQPConnection_1_0 extends AbstractAMQPConnection<AMQPConnection_1_0, ConnectionHandler>
implements FrameOutputHandler, DescribedTypeConstructorRegistry.Source,
ValueWriter.Registry.Source,
- ErrorHandler,
SASLEndpoint,
ConnectionHandler
{
@@ -156,6 +158,8 @@ public class AMQPConnection_1_0 extends
private ProtocolHandler _frameHandler;
private volatile boolean _transportBlockedForWriting;
private volatile SubjectAuthenticationResult _successfulAuthenticationResult;
+ private boolean _blocking;
+ private final Object _blockingLock = new Object();
private enum FrameReceivingState
{
@@ -178,10 +182,6 @@ public class AMQPConnection_1_0 extends
private AmqpPort<?> _port;
private SubjectCreator _subjectCreator;
- private Transport _transport;
- private long _connectionId;
-
- private Container _container;
private int _channelMax = DEFAULT_CHANNEL_MAX;
private int _maxFrameSize = 4096;
@@ -202,10 +202,10 @@ public class AMQPConnection_1_0 extends
private ConnectionState _connectionState = ConnectionState.UNOPENED;
private AMQPDescribedTypeRegistry _describedTypeRegistry = AMQPDescribedTypeRegistry.newInstance()
- .registerTransportLayer()
- .registerMessagingLayer()
- .registerTransactionLayer()
- .registerSecurityLayer();
+ .registerTransportLayer()
+ .registerMessagingLayer()
+ .registerTransactionLayer()
+ .registerSecurityLayer();
private Map _properties;
@@ -215,7 +215,7 @@ public class AMQPConnection_1_0 extends
private SaslServer _saslServer;
private String _localHostname;
private long _desiredIdleTimeout;
- private UnsignedInteger _handleMax = UnsignedInteger.MAX_VALUE;
+
private Error _remoteError;
private static final long MINIMUM_SUPPORTED_IDLE_TIMEOUT = 1000L;
@@ -239,21 +239,20 @@ public class AMQPConnection_1_0 extends
AMQPConnection_1_0(final Broker<?> broker,
final ServerNetworkConnection network,
- AmqpPort<?> port, Transport transport, long id,
+ AmqpPort<?> port,
+ Transport transport,
+ long id,
final AggregateTicker aggregateTicker,
final boolean useSASL)
{
super(broker, network, port, transport, Protocol.AMQP_1_0, id, aggregateTicker);
- _container = new Container(broker.getId().toString());
_subjectCreator = port.getAuthenticationProvider().getSubjectCreator(transport.isSecure());
_saslServerProvider = useSASL ? asSaslServerProvider(_subjectCreator, network) : null;
_port = port;
- _transport = transport;
- _connectionId = id;
- Map<Symbol,Object> serverProperties = new LinkedHashMap<>();
+ Map<Symbol, Object> serverProperties = new LinkedHashMap<>();
serverProperties.put(Symbol.valueOf(ServerPropertyNames.PRODUCT), CommonProperties.getProductName());
serverProperties.put(Symbol.valueOf(ServerPropertyNames.VERSION), CommonProperties.getReleaseVersion());
serverProperties.put(Symbol.valueOf(ServerPropertyNames.QPID_BUILD), CommonProperties.getBuildVersion());
@@ -265,9 +264,7 @@ public class AMQPConnection_1_0 extends
setDesiredIdleTimeout(1000L * broker.getConnection_heartBeatDelay());
- _frameWriter = new FrameWriter(getDescribedTypeRegistry(), getSender());
-
-
+ _frameWriter = new FrameWriter(getDescribedTypeRegistry(), getSender());
}
@@ -299,7 +296,7 @@ public class AMQPConnection_1_0 extends
}
else
{
- // TODO - error
+ closeConnection(AmqpError.INVALID_FIELD, "Channel " + channel + " is not associated with a session");
}
}
@@ -321,12 +318,12 @@ public class AMQPConnection_1_0 extends
_saslComplete = true;
_frameReceivingState = FrameReceivingState.CLOSED;
setClosedForInput(true);
- close();
+ addCloseTicker();
}
public void receiveSaslChallenge(final SaslChallenge saslChallenge)
{
- // TODO - log unexpected frame
+ LOGGER.info("{} : Unexpected frame sasl-challenge", getLogSubject());
closeSaslWithFailure();
}
@@ -340,17 +337,22 @@ public class AMQPConnection_1_0 extends
{
case UNOPENED:
case AWAITING_OPEN:
- Error error = new Error();
- error.setCondition(ConnectionError.CONNECTION_FORCED);
- error.setDescription("Connection close sent before connection was opened");
- closeConnection(error);
+ closeConnection(ConnectionError.CONNECTION_FORCED,
+ "Connection close sent before connection was opened");
break;
case OPEN:
_connectionState = ConnectionState.CLOSE_RECEIVED;
- // TODO - we should log the error we received from the client if present
+ if(close.getError() != null)
+ {
+ ErrorCondition condition = _remoteError.getCondition();
+ Symbol errorCondition = condition == null ? null : condition.getValue();
+ LOGGER.info("{} : Connection closed with error : {} - {}", getLogSubject(),
+ errorCondition, _remoteError.getDescription());
+ }
sendClose(new Close());
_connectionState = ConnectionState.CLOSED;
_orderlyClose.set(true);
+ addCloseTicker();
break;
case CLOSE_SENT:
_connectionState = ConnectionState.CLOSED;
@@ -359,14 +361,13 @@ public class AMQPConnection_1_0 extends
default:
}
_remoteError = close.getError();
-
}
private void closeReceived()
{
Collection<Session_1_0> sessions = new ArrayList<>(_sessions);
- for(final Session_1_0 session : sessions)
+ for (final Session_1_0 session : sessions)
{
AccessController.doPrivileged(new PrivilegedAction<Object>()
{
@@ -387,7 +388,7 @@ public class AMQPConnection_1_0 extends
public void receiveSaslMechanisms(final SaslMechanisms saslMechanisms)
{
- // TODO - log unexpected frame
+ LOGGER.info("{} : Unexpected frame sasl-mechanisms", getLogSubject());
closeSaslWithFailure();
}
@@ -437,7 +438,7 @@ public class AMQPConnection_1_0 extends
void sessionEnded(final Session_1_0 session)
{
- if(!_closedOnOpen)
+ if (!_closedOnOpen)
{
_sessions.remove(session);
sessionRemoved(session);
@@ -451,8 +452,6 @@ public class AMQPConnection_1_0 extends
private void inputClosed()
{
- List<Runnable> postLockActions;
-
if (!_closedForInput)
{
_closedForInput = true;
@@ -474,14 +473,11 @@ public class AMQPConnection_1_0 extends
}
closeReceived();
}
-
-
}
private void closeSender()
{
setClosedForOutput(true);
- close();
}
String getRemoteContainerId()
@@ -506,12 +502,11 @@ public class AMQPConnection_1_0 extends
{
_sendingSessions[channel] = null;
}
-
}
public void receiveSaslOutcome(final SaslOutcome saslOutcome)
{
- // TODO - log unexpected frame
+ LOGGER.info("{} : Unexpected frame sasl-outcome", getLogSubject());
closeSaslWithFailure();
}
@@ -528,10 +523,15 @@ public class AMQPConnection_1_0 extends
}
else
{
- // TODO error
+ closeConnectionWithInvalidChannel(channel, end);
}
}
+ private void closeConnectionWithInvalidChannel(final short channel, final FrameBody frame)
+ {
+ closeConnection(AmqpError.INVALID_FIELD, String.format("%s frame received on channel %d which is not mapped", frame.getClass().getSimpleName().toLowerCase(), channel));
+ }
+
public void receiveDisposition(final short channel,
final Disposition disposition)
{
@@ -551,7 +551,7 @@ public class AMQPConnection_1_0 extends
}
else
{
- // TODO - error
+ closeConnectionWithInvalidChannel(channel, disposition);
}
}
@@ -563,11 +563,12 @@ public class AMQPConnection_1_0 extends
short myChannelId;
if (begin.getRemoteChannel() != null)
{
- final Error error = new Error();
- error.setCondition(ConnectionError.FRAMING_ERROR);
- error.setDescription("BEGIN received on channel " + channel + " with given remote-channel "
- + begin.getRemoteChannel() + ". Since the broker does not spontaneously start channels, this must be an error.");
- closeConnection(error);
+ closeConnection(ConnectionError.FRAMING_ERROR,
+ "BEGIN received on channel "
+ + channel
+ + " with given remote-channel "
+ + begin.getRemoteChannel()
+ + ". Since the broker does not spontaneously start channels, this must be an error.");
}
else // Peer requesting session creation
@@ -578,10 +579,11 @@ public class AMQPConnection_1_0 extends
myChannelId = getFirstFreeChannel();
if (myChannelId == -1)
{
- final Error error = new Error();
- error.setCondition(ConnectionError.FRAMING_ERROR);
- error.setDescription("BEGIN received on channel " + channel + ". There are no free channels for the broker to responsd on.");
- closeConnection(error);
+
+ closeConnection(ConnectionError.FRAMING_ERROR,
+ "BEGIN received on channel "
+ + channel
+ + ". There are no free channels for the broker to responsd on.");
}
Session_1_0 session = new Session_1_0(this, begin);
@@ -605,10 +607,8 @@ public class AMQPConnection_1_0 extends
}
else
{
- final Error error = new Error();
- error.setCondition(ConnectionError.FRAMING_ERROR);
- error.setDescription("BEGIN received on channel " + channel + " which is already in use.");
- closeConnection(error);
+ closeConnection(ConnectionError.FRAMING_ERROR,
+ "BEGIN received on channel " + channel + " which is already in use.");
}
}
@@ -658,7 +658,7 @@ public class AMQPConnection_1_0 extends
}
else
{
- // TODO - error
+ closeConnectionWithInvalidChannel(channel, transfer);
}
}
@@ -680,7 +680,7 @@ public class AMQPConnection_1_0 extends
}
else
{
- // TODO - error
+ closeConnectionWithInvalidChannel(channel, flow);
}
}
@@ -720,12 +720,11 @@ public class AMQPConnection_1_0 extends
}
if (_idleTimeout != 0L && _idleTimeout < MINIMUM_SUPPORTED_IDLE_TIMEOUT)
{
- closeConnection(new Error(ConnectionError.CONNECTION_FORCED,
- "Requested idle timeout of "
- + _idleTimeout
- + " is too low. The minimum supported timeout is"
- + MINIMUM_SUPPORTED_IDLE_TIMEOUT));
- close();
+ closeConnection(ConnectionError.CONNECTION_FORCED,
+ "Requested idle timeout of "
+ + _idleTimeout
+ + " is too low. The minimum supported timeout is"
+ + MINIMUM_SUPPORTED_IDLE_TIMEOUT);
_closedOnOpen = true;
}
else
@@ -741,18 +740,14 @@ public class AMQPConnection_1_0 extends
{
if (!addressSpace.isActive())
{
- final Error err = new Error();
- err.setCondition(AmqpError.NOT_FOUND);
- closeConnection(err);
-
_closedOnOpen = true;
+ final Error err = new Error();
+ err.setCondition(AmqpError.NOT_FOUND);
populateConnectionRedirect(addressSpace, err);
closeConnection(err);
- close();
-
_closedOnOpen = true;
}
@@ -863,7 +858,7 @@ public class AMQPConnection_1_0 extends
}
else
{
- // TODO - error
+ closeConnectionWithInvalidChannel(channel, detach);
}
}
@@ -996,19 +991,9 @@ public class AMQPConnection_1_0 extends
}
}
- private void closeConnection()
+ private void closeConnection(ErrorCondition errorCondition, String description)
{
- switch (_connectionState)
- {
- case AWAITING_OPEN:
- case OPEN:
- Close closeToSend = new Close();
- sendClose(closeToSend);
- _connectionState = ConnectionState.CLOSE_SENT;
- break;
- case CLOSE_SENT:
- default:
- }
+ closeConnection(new Error(errorCondition, description));
}
private void closeConnection(final Error error)
@@ -1026,12 +1011,13 @@ public class AMQPConnection_1_0 extends
case OPEN:
sendClose(close);
_connectionState = ConnectionState.CLOSE_SENT;
+ addCloseTicker();
case CLOSE_SENT:
case CLOSED:
// already sent our close - too late to do anything more
break;
default:
- // TODO Unknown state
+ throw new ServerScopedRuntimeException("Unknown state: " + _connectionState);
}
}
@@ -1045,7 +1031,7 @@ public class AMQPConnection_1_0 extends
int payloadSent = _maxFrameSize - (size + 9);
try
{
- if (payloadSent < (payload == null ? 0 : payload.remaining()))
+ if (payload != null && payloadSent < payload.remaining())
{
if (body instanceof Transfer)
@@ -1218,7 +1204,7 @@ public class AMQPConnection_1_0 extends
getSender().send(QpidByteBuffer.wrap(SASL_HEADER));
SaslMechanisms mechanisms = new SaslMechanisms();
- ArrayList<Symbol> mechanismsList = new ArrayList<Symbol>();
+ ArrayList<Symbol> mechanismsList = new ArrayList<>();
for (String name : subjectCreator.getMechanisms())
{
mechanismsList.add(Symbol.valueOf(name));
@@ -1245,7 +1231,8 @@ public class AMQPConnection_1_0 extends
}
else
{
- // TODO - log auth failure / close
+ LOGGER.warn("{} : attempt to initiate AMQP connection without correctly authenticating", getLogSubject());
+ _connectionState = ConnectionState.CLOSED;
getNetwork().close();
}
@@ -1257,7 +1244,9 @@ public class AMQPConnection_1_0 extends
}
else
{
- throw new ConnectionScopedRuntimeException("Unknown protocol header");
+ LOGGER.warn("{} : unknown AMQP header {}", getLogSubject(), Functions.str(header));
+ _connectionState = ConnectionState.CLOSED;
+ getNetwork().close();
}
}
@@ -1288,11 +1277,6 @@ public class AMQPConnection_1_0 extends
}
}
- public boolean canSend()
- {
- return true;
- }
-
public void send(final AMQFrame amqFrame)
{
send(amqFrame, null);
@@ -1322,7 +1306,7 @@ public class AMQPConnection_1_0 extends
}
- public void close()
+ private void addCloseTicker()
{
long timeoutTime = System.currentTimeMillis() + getContextValue(Long.class, Connection.CLOSE_RESPONSE_TIMEOUT);
@@ -1403,15 +1387,29 @@ public class AMQPConnection_1_0 extends
return false;
}
- public void sendConnectionCloseAsync(final AMQConstant cause, final String message)
+ @Override
+ public void sendConnectionCloseAsync(final ConnectionCloseReason reason, final String description)
{
+
stopConnection();
+ final ErrorCondition cause;
+ switch(reason)
+ {
+ case MANAGEMENT:
+ cause = ConnectionError.CONNECTION_FORCED;
+ break;
+ case TRANSACTION_TIMEOUT:
+ cause = AmqpError.RESOURCE_LIMIT_EXCEEDED;
+ break;
+ default:
+ cause = AmqpError.INTERNAL_ERROR;
+ }
Action<ConnectionHandler> action = new Action<ConnectionHandler>()
{
@Override
public void performAction(final ConnectionHandler object)
{
- closeConnection();
+ closeConnection(cause, description);
}
};
@@ -1426,7 +1424,30 @@ public class AMQPConnection_1_0 extends
public void block()
{
- // TODO
+ synchronized (_blockingLock)
+ {
+ if (!_blocking)
+ {
+ _blocking = true;
+ doOnIOThreadAsync(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ doBlock();
+ }
+ });
+ }
+ }
+ }
+
+ private void doBlock()
+ {
+ for(Session_1_0 session : _sessions)
+ {
+ session.block();
+ }
}
public String getRemoteContainerName()
@@ -1441,12 +1462,36 @@ public class AMQPConnection_1_0 extends
public void unblock()
{
- // TODO
+ synchronized (_blockingLock)
+ {
+ if(_blocking)
+ {
+ _blocking = false;
+ doOnIOThreadAsync(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ doUnblock();
+ }
+ });
+ }
+ }
}
+ private void doUnblock()
+ {
+ for(Session_1_0 session : _sessions)
+ {
+ session.unblock();
+ }
+ }
+
+ @Override
public long getSessionCountLimit()
{
- return 0; // TODO
+ return _channelMax+1;
}
@Override
@@ -1477,7 +1522,7 @@ public class AMQPConnection_1_0 extends
_channelMax = channelMax;
}
open.setChannelMax(UnsignedShort.valueOf((short) channelMax));
- open.setContainerId(_container.getId());
+ open.setContainerId(getAddressSpace() == null ? UUID.randomUUID().toString() : getAddressSpace().getId().toString());
open.setMaxFrameSize(UnsignedInteger.valueOf(maxFrameSize));
// TODO - should we try to set the hostname based on the connection information?
// open.setHostname();
@@ -1496,7 +1541,6 @@ public class AMQPConnection_1_0 extends
err.setCondition(amqpError);
err.setDescription(errorDescription);
closeConnection(err);
- close();
_closedOnOpen = true;
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/FrameOutputHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/FrameOutputHandler.java?rev=1770505&r1=1770504&r2=1770505&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/FrameOutputHandler.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/FrameOutputHandler.java Sat Nov 19 16:36:59 2016
@@ -20,17 +20,14 @@
*/
package org.apache.qpid.server.protocol.v1_0;
-import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
-
import java.nio.ByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
+
public interface FrameOutputHandler<T>
{
- boolean canSend();
void send(AMQFrame<T> frame);
void send(AMQFrame<T> frame, ByteBuffer payload);
- void close();
-
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java?rev=1770505&r1=1770504&r2=1770505&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java Sat Nov 19 16:36:59 2016
@@ -27,7 +27,6 @@ import java.util.Map;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
-import org.apache.qpid.server.protocol.v1_0.type.Outcome;
import org.apache.qpid.server.protocol.v1_0.type.Source;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.Target;
@@ -45,7 +44,6 @@ public abstract class LinkEndpoint<T ext
{
private T _link;
- private DeliveryStateHandler _deliveryStateHandler;
private Object _flowTransactionId;
private SenderSettleMode _sendingSettlementMode;
private ReceiverSettleMode _receivingSettlementMode;
@@ -84,16 +82,6 @@ public abstract class LinkEndpoint<T ext
private Map<Binary,Delivery> _unsettledTransfers = new HashMap<Binary,Delivery>();
- LinkEndpoint(final Session_1_0 sessionEndpoint, String name, Map<Binary, Outcome> unsettled, DeliveryStateHandler deliveryStateHandler)
- {
- _name = name;
- _session = sessionEndpoint;
- _linkCredit = UnsignedInteger.valueOf(0);
- _drain = Boolean.FALSE;
- _localUnsettled = unsettled;
- _deliveryStateHandler = deliveryStateHandler;
- }
-
LinkEndpoint(final Session_1_0 sessionEndpoint,final Attach attach)
{
_session = sessionEndpoint;
@@ -215,9 +203,9 @@ public abstract class LinkEndpoint<T ext
final Boolean settled)
{
// TODO
- if (_deliveryStateHandler != null)
+ if (_link != null)
{
- _deliveryStateHandler.handle(unsettled.getDeliveryTag(), state, settled);
+ _link.handle(unsettled.getDeliveryTag(), state, settled);
}
if (settled)
@@ -478,11 +466,6 @@ public abstract class LinkEndpoint<T ext
_link = link;
}
- public void setDeliveryStateHandler(final DeliveryStateHandler deliveryStateHandler)
- {
- _deliveryStateHandler = deliveryStateHandler;
- }
-
public void setSendingSettlementMode(SenderSettleMode sendingSettlementMode)
{
_sendingSettlementMode = sendingSettlementMode;
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java?rev=1770505&r1=1770504&r2=1770505&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java Sat Nov 19 16:36:59 2016
@@ -21,6 +21,8 @@
package org.apache.qpid.server.protocol.v1_0;
import org.apache.qpid.server.protocol.LinkModel;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
public interface Link_1_0 extends LinkModel
@@ -28,5 +30,7 @@ public interface Link_1_0 extends LinkMo
void remoteDetached(final LinkEndpoint endpoint, Detach detach);
+ void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled);
+
void start();
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java?rev=1770505&r1=1770504&r2=1770505&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java Sat Nov 19 16:36:59 2016
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.server.protocol.v1_0;
-import org.apache.qpid.server.protocol.v1_0.type.Binary;
-import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.Source;
import org.apache.qpid.server.protocol.v1_0.type.Target;
@@ -51,16 +49,6 @@ public class ReceivingLinkAttachment
return getEndpoint().getSource();
}
- public void setDeliveryStateHandler(final DeliveryStateHandler handler)
- {
- getEndpoint().setDeliveryStateHandler(handler);
- }
-
- public void updateDisposition(final Binary deliveryTag, final DeliveryState state, final boolean settled)
- {
- getEndpoint().updateDisposition(deliveryTag, state, settled);
- }
-
public Target getTarget()
{
return getEndpoint().getTarget();
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java?rev=1770505&r1=1770504&r2=1770505&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java Sat Nov 19 16:36:59 2016
@@ -50,11 +50,6 @@ public class SendingLinkAttachment
return getEndpoint().getSource();
}
- public void setDeliveryStateHandler(final DeliveryStateHandler handler)
- {
- getEndpoint().setDeliveryStateHandler(handler);
- }
-
public void updateDisposition(final Binary deliveryTag, final DeliveryState state, final boolean settled)
{
getEndpoint().updateDisposition(deliveryTag, state, settled);
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1770505&r1=1770504&r2=1770505&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Sat Nov 19 16:36:59 2016
@@ -77,7 +77,7 @@ import org.apache.qpid.server.txn.Server
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.QueueExistsException;
-public class SendingLink_1_0 implements Link_1_0, DeliveryStateHandler
+public class SendingLink_1_0 implements Link_1_0
{
private static final Logger _logger = LoggerFactory.getLogger(SendingLink_1_0.class);
@@ -111,7 +111,6 @@ public class SendingLink_1_0 implements
_linkAttachment = linkAttachment;
final Source source = (Source) linkAttachment.getSource();
_durability = source.getDurable();
- linkAttachment.setDeliveryStateHandler(this);
QueueDestination qd = null;
EnumSet<ConsumerImpl.Option> options = EnumSet.noneOf(ConsumerImpl.Option.class);
@@ -595,7 +594,6 @@ public class SendingLink_1_0 implements
if (linkAttachment.getSession() != null)
{
SendingLinkEndpoint endpoint = linkAttachment.getEndpoint();
- endpoint.setDeliveryStateHandler(this);
Map initialUnsettledMap = endpoint.getInitialUnsettledMap();
Map<Binary, MessageInstance> unsettledCopy = new HashMap<Binary, MessageInstance>(_unsettledMap);
_resumeAcceptedTransfers.clear();
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java?rev=1770505&r1=1770504&r2=1770505&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java Sat Nov 19 16:36:59 2016
@@ -48,7 +48,7 @@ import org.apache.qpid.server.store.Stor
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
-public class StandardReceivingLink_1_0 implements ReceivingLink_1_0, DeliveryStateHandler
+public class StandardReceivingLink_1_0 implements ReceivingLink_1_0
{
private NamedAddressSpace _addressSpace;
@@ -73,9 +73,6 @@ public class StandardReceivingLink_1_0 i
_destination = destination;
_attachment = receivingLinkAttachment;
_receivingSettlementMode = receivingLinkAttachment.getEndpoint().getReceivingSettlementMode();
-
- receivingLinkAttachment.setDeliveryStateHandler(this);
-
_durability = ((Target)receivingLinkAttachment.getTarget()).getDurable();
_sectionDecoder = new SectionDecoderImpl(receivingLinkAttachment.getEndpoint().getSession().getConnection().getDescribedTypeRegistry());
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java?rev=1770505&r1=1770504&r2=1770505&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java Sat Nov 19 16:36:59 2016
@@ -33,6 +33,7 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.Section;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
@@ -179,6 +180,12 @@ public class TxnCoordinatorReceivingLink
endpoint.detach();
}
+ @Override
+ public void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
+ {
+
+ }
+
private Error discharge(Integer transactionId, boolean fail)
{
Error error = null;
Modified: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java?rev=1770505&r1=1770504&r2=1770505&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java (original)
+++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java Sat Nov 19 16:36:59 2016
@@ -20,12 +20,14 @@
*/
package org.apache.qpid.server.management.amqp;
+import java.nio.charset.StandardCharsets;
import java.security.AccessControlException;
import java.security.Principal;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.qpid.exchange.ExchangeDefaults;
@@ -70,6 +72,7 @@ public class ManagementAddressSpace impl
private final LinkRegistry _linkRegistry = new NonDurableLinkRegistry();
private final Broker<?> _broker;
private final Principal _principal;
+ private final UUID _id;
public ManagementAddressSpace(final SystemAddressSpaceCreator.AddressSpaceRegistry addressSpaceRegistry)
{
@@ -86,10 +89,17 @@ public class ManagementAddressSpace impl
_propertiesNode = new VirtualHostPropertiesNode(this);
_messageStore = new MemoryMessageStore();
_principal = new ManagementAddressSpacePrincipal(this);
+ _id = UUID.nameUUIDFromBytes((_broker.getId().toString()+"/"+name).getBytes(StandardCharsets.UTF_8));
}
@Override
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ @Override
public MessageSource getAttainedMessageSource(final String name)
{
if(_managementNode.getName().equals(name))
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org