You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/19 16:36:59 UTC

svn commit: r1770505 - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ broker-core/src/main/java/org/apache/qpid/server/model/ broker-core/src/main/java/org/apache/qpid/server/transport/ broker-core/src/test/java...

Author: rgodfrey
Date: Sat Nov 19 16:36:59 2016
New Revision: 1770505

URL: http://svn.apache.org/viewvc?rev=1770505&view=rev
Log:
QPID-7531 : Improve AMQP 1.0 implementation

Removed:
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/DeliveryStateHandler.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrorHandler.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Predicate.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StateChangeListener.java
Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/AbstractLogSubject.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/FrameOutputHandler.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java
    qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/AbstractLogSubject.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/AbstractLogSubject.java?rev=1770505&r1=1770504&r2=1770505&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/AbstractLogSubject.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/AbstractLogSubject.java Sat Nov 19 16:36:59 2016
@@ -20,10 +20,10 @@
  */
 package org.apache.qpid.server.logging.subjects;
 
-import org.apache.qpid.server.logging.LogSubject;
-
 import java.text.MessageFormat;
 
+import org.apache.qpid.server.logging.LogSubject;
+
 /**
  * The LogSubjects all have a similar requirement to format their output and
  * provide the String value.
@@ -69,4 +69,10 @@ public abstract class AbstractLogSubject
     {
         _logString = logString;
     }
+
+    @Override
+    public String toString()
+    {
+        return toLogString();
+    }
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java?rev=1770505&r1=1770504&r2=1770505&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java Sat Nov 19 16:36:59 2016
@@ -24,6 +24,7 @@ import java.security.Principal;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
 import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageSource;
@@ -36,6 +37,8 @@ import org.apache.qpid.server.txn.DtxReg
 public interface NamedAddressSpace extends Named
 {
 
+    UUID getId();
+
     MessageSource getAttainedMessageSource(String name);
 
     MessageDestination getAttainedMessageDestination(String name);

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java?rev=1770505&r1=1770504&r2=1770505&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java Sat Nov 19 16:36:59 2016
@@ -72,7 +72,13 @@ public interface AMQPConnection<C extend
 
     boolean hasSessionWithName(byte[] name);
 
-    void sendConnectionCloseAsync(AMQConstant connectionForced, String reason);
+    enum ConnectionCloseReason
+    {
+        MANAGEMENT,
+        TRANSACTION_TIMEOUT
+    }
+
+    void sendConnectionCloseAsync(ConnectionCloseReason reason, String description);
 
     boolean isIOThread();
     ListenableFuture<Void> doOnIOThreadAsync(final Runnable task);

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1770505&r1=1770504&r2=1770505&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java Sat Nov 19 16:36:59 2016
@@ -45,7 +45,6 @@ import com.google.common.util.concurrent
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
 import org.apache.qpid.server.connection.ConnectionPrincipal;
 import org.apache.qpid.server.logging.EventLogger;
@@ -630,7 +629,7 @@ public abstract class AbstractAMQPConnec
     {
         if (_modelClosing.compareAndSet(false, true))
         {
-            sendConnectionCloseAsync(AMQConstant.CONNECTION_FORCED, "Connection closed by external action");
+            sendConnectionCloseAsync(ConnectionCloseReason.MANAGEMENT, "Connection closed by external action");
         }
         return _modelClosedFuture;
     }

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java?rev=1770505&r1=1770504&r2=1770505&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java Sat Nov 19 16:36:59 2016
@@ -20,10 +20,9 @@
  */
 package org.apache.qpid.server.logging.actors;
 
-import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.model.BrokerTestHelper;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.transport.AMQPConnection;
-import org.apache.qpid.server.model.BrokerTestHelper;
 
 public abstract class BaseConnectionActorTestCase extends BaseActorTestCase
 {
@@ -55,7 +54,7 @@ public abstract class BaseConnectionActo
             }
             if (_connection != null)
             {
-                _connection.sendConnectionCloseAsync(AMQConstant.CONNECTION_FORCED, "");
+                _connection.sendConnectionCloseAsync(AMQPConnection.ConnectionCloseReason.MANAGEMENT, "");
             }
         }
         finally

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java?rev=1770505&r1=1770504&r2=1770505&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java Sat Nov 19 16:36:59 2016
@@ -49,7 +49,6 @@ import org.mockito.invocation.Invocation
 import org.mockito.stubbing.Answer;
 
 import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.configuration.store.StoreConfigurationChangeListener;
 import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
@@ -580,7 +579,7 @@ public class VirtualHostTest extends Qpi
                 }
                 return null;
             }
-        }).when(connection).sendConnectionCloseAsync(any(AMQConstant.class), anyString());
+        }).when(connection).sendConnectionCloseAsync(any(AMQPConnection.ConnectionCloseReason.class), anyString());
         when(connection.getRemoteAddressString()).thenReturn("peer:1234");
         return connection;
     }

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java?rev=1770505&r1=1770504&r2=1770505&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java Sat Nov 19 16:36:59 2016
@@ -37,24 +37,25 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.logging.messages.ConnectionMessages;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.Protocol;
 import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.security.SubjectCreator;
 import org.apache.qpid.server.store.StoreException;
 import org.apache.qpid.server.transport.AbstractAMQPConnection;
+import org.apache.qpid.server.transport.AggregateTicker;
 import org.apache.qpid.server.transport.ProtocolEngine;
-import org.apache.qpid.server.logging.messages.ConnectionMessages;
-import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.transport.ServerNetworkConnection;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.transport.ConnectionCloseCode;
 import org.apache.qpid.transport.ConnectionDelegate;
 import org.apache.qpid.transport.Constant;
-import org.apache.qpid.server.transport.AggregateTicker;
 
 
 public class AMQPConnection_0_10 extends AbstractAMQPConnection<AMQPConnection_0_10, ServerConnection>
@@ -304,10 +305,13 @@ public class AMQPConnection_0_10 extends
         return _connection.hasSessionWithName(name);
     }
 
-    public void sendConnectionCloseAsync(final AMQConstant cause, final String message)
+    @Override
+    public void sendConnectionCloseAsync(final ConnectionCloseReason reason, final String description)
     {
         stopConnection();
-        _connection.sendConnectionCloseAsync(cause, message);
+        // Best mapping for all reasons is "forced"
+        _connection.sendConnectionCloseAsync(ConnectionCloseCode.CONNECTION_FORCED, description);
+
     }
 
     public void closeSessionAsync(final AMQSessionModel<?> session,

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1770505&r1=1770504&r2=1770505&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Sat Nov 19 16:36:59 2016
@@ -251,7 +251,7 @@ public class ServerConnection extends Co
         }
     }
 
-    public void sendConnectionCloseAsync(final AMQConstant cause, final String message)
+    void sendConnectionCloseAsync(final ConnectionCloseCode replyCode, final String message)
     {
         addAsyncTask(new Action<ServerConnection>()
         {
@@ -263,15 +263,6 @@ public class ServerConnection extends Co
                     markAllSessionsClosed();
 
                     setState(CLOSING);
-                    ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL;
-                    try
-                    {
-                        replyCode = ConnectionCloseCode.get(cause.getCode());
-                    }
-                    catch (IllegalArgumentException iae)
-                    {
-                        // Ignore
-                    }
                     sendConnectionClose(replyCode, message);
                 }
             }

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1770505&r1=1770504&r2=1770505&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Sat Nov 19 16:36:59 2016
@@ -519,7 +519,7 @@ public class ServerSessionDelegate exten
                 }
                 catch (VirtualHostUnavailableException e)
                 {
-                    getServerConnection(serverSession).sendConnectionCloseAsync(AMQConstant.CONNECTION_FORCED, e.getMessage());
+                    getServerConnection(serverSession).sendConnectionCloseAsync(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage());
                 }
                 finally
                 {

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1770505&r1=1770504&r2=1770505&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Sat Nov 19 16:36:59 2016
@@ -104,6 +104,7 @@ import org.apache.qpid.server.store.Mess
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor;
@@ -286,7 +287,7 @@ public class AMQChannel
     @Override
     public void doTimeoutAction(String reason)
     {
-        _connection.sendConnectionCloseAsync(AMQConstant.RESOURCE_ERROR, reason);
+        _connection.sendConnectionCloseAsync(AMQPConnection.ConnectionCloseReason.TRANSACTION_TIMEOUT, reason);
     }
 
     private void message(final LogMessage message)

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java?rev=1770505&r1=1770504&r2=1770505&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java Sat Nov 19 16:36:59 2016
@@ -545,7 +545,7 @@ public class AMQPConnection_0_8Impl
     }
 
     public void sendConnectionClose(AMQConstant errorCode,
-                             String message, int channelId)
+                                    String message, int channelId)
     {
         sendConnectionClose(channelId, new AMQFrame(0, new ConnectionCloseBody(getProtocolVersion(), errorCode.getCode(), AMQShortString.validValueOf(message), _currentClassId, _currentMethodId)));
     }
@@ -799,17 +799,29 @@ public class AMQPConnection_0_8Impl
     }
 
     @Override
-    public void sendConnectionCloseAsync(final AMQConstant cause, final String message)
+    public void sendConnectionCloseAsync(final ConnectionCloseReason reason, final String description)
     {
         stopConnection();
+        final AMQConstant cause;
+        switch(reason)
+        {
+            case MANAGEMENT:
+                cause = AMQConstant.CONNECTION_FORCED;
+                break;
+            case TRANSACTION_TIMEOUT:
+                cause = AMQConstant.RESOURCE_ERROR;
+                break;
+            default:
+                cause = AMQConstant.INTERNAL_ERROR;
+        }
         Action<AMQPConnection_0_8Impl> action = new Action<AMQPConnection_0_8Impl>()
         {
             @Override
             public void performAction(final AMQPConnection_0_8Impl object)
             {
-                AMQConnectionException e = new AMQConnectionException(cause, message, 0, 0,
-                        getMethodRegistry(),
-                        null);
+                AMQConnectionException e = new AMQConnectionException(cause, description, 0, 0,
+                                                                      getMethodRegistry(),
+                                                                      null);
                 sendConnectionClose(0, e.getCloseFrame());
             }
         };

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1770505&r1=1770504&r2=1770505&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java Sat Nov 19 16:36:59 2016
@@ -37,6 +37,7 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Queue;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -48,9 +49,20 @@ import javax.security.sasl.SaslServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.common.ServerPropertyNames;
+import org.apache.qpid.configuration.CommonProperties;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.logging.messages.ConnectionMessages;
 import org.apache.qpid.server.model.AuthenticationProvider;
+import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.Connection;
 import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.ConnectionClosingTicker;
 import org.apache.qpid.server.protocol.v1_0.codec.DescribedTypeConstructorRegistry;
 import org.apache.qpid.server.protocol.v1_0.codec.FrameWriter;
 import org.apache.qpid.server.protocol.v1_0.codec.ProtocolHandler;
@@ -61,20 +73,10 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.framing.SASLFrame;
 import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.ErrorCondition;
 import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
 import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.common.ServerPropertyNames;
-import org.apache.qpid.configuration.CommonProperties;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.logging.messages.ConnectionMessages;
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Protocol;
-import org.apache.qpid.server.model.Transport;
-import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.protocol.ConnectionClosingTicker;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
 import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
@@ -104,6 +106,7 @@ import org.apache.qpid.server.security.a
 import org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerImpl;
 import org.apache.qpid.server.store.StoreException;
 import org.apache.qpid.server.transport.AbstractAMQPConnection;
+import org.apache.qpid.server.transport.AggregateTicker;
 import org.apache.qpid.server.transport.ProtocolEngine;
 import org.apache.qpid.server.transport.ServerNetworkConnection;
 import org.apache.qpid.server.util.Action;
@@ -111,12 +114,11 @@ import org.apache.qpid.server.util.Conne
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
 import org.apache.qpid.transport.ByteBufferSender;
-import org.apache.qpid.server.transport.AggregateTicker;
+import org.apache.qpid.transport.util.Functions;
 
 public class AMQPConnection_1_0 extends AbstractAMQPConnection<AMQPConnection_1_0, ConnectionHandler>
         implements FrameOutputHandler, DescribedTypeConstructorRegistry.Source,
                    ValueWriter.Registry.Source,
-                   ErrorHandler,
                    SASLEndpoint,
                    ConnectionHandler
 {
@@ -156,6 +158,8 @@ public class AMQPConnection_1_0 extends
     private ProtocolHandler _frameHandler;
     private volatile boolean _transportBlockedForWriting;
     private volatile SubjectAuthenticationResult _successfulAuthenticationResult;
+    private boolean _blocking;
+    private final Object _blockingLock = new Object();
 
     private enum FrameReceivingState
     {
@@ -178,10 +182,6 @@ public class AMQPConnection_1_0 extends
 
     private AmqpPort<?> _port;
     private SubjectCreator _subjectCreator;
-    private Transport _transport;
-    private long _connectionId;
-
-    private Container _container;
 
     private int _channelMax = DEFAULT_CHANNEL_MAX;
     private int _maxFrameSize = 4096;
@@ -202,10 +202,10 @@ public class AMQPConnection_1_0 extends
     private ConnectionState _connectionState = ConnectionState.UNOPENED;
 
     private AMQPDescribedTypeRegistry _describedTypeRegistry = AMQPDescribedTypeRegistry.newInstance()
-            .registerTransportLayer()
-            .registerMessagingLayer()
-            .registerTransactionLayer()
-            .registerSecurityLayer();
+                                                                                        .registerTransportLayer()
+                                                                                        .registerMessagingLayer()
+                                                                                        .registerTransactionLayer()
+                                                                                        .registerSecurityLayer();
 
 
     private Map _properties;
@@ -215,7 +215,7 @@ public class AMQPConnection_1_0 extends
     private SaslServer _saslServer;
     private String _localHostname;
     private long _desiredIdleTimeout;
-    private UnsignedInteger _handleMax = UnsignedInteger.MAX_VALUE;
+
     private Error _remoteError;
 
     private static final long MINIMUM_SUPPORTED_IDLE_TIMEOUT = 1000L;
@@ -239,21 +239,20 @@ public class AMQPConnection_1_0 extends
 
     AMQPConnection_1_0(final Broker<?> broker,
                        final ServerNetworkConnection network,
-                       AmqpPort<?> port, Transport transport, long id,
+                       AmqpPort<?> port,
+                       Transport transport,
+                       long id,
                        final AggregateTicker aggregateTicker,
                        final boolean useSASL)
     {
         super(broker, network, port, transport, Protocol.AMQP_1_0, id, aggregateTicker);
-        _container = new Container(broker.getId().toString());
 
         _subjectCreator = port.getAuthenticationProvider().getSubjectCreator(transport.isSecure());
 
         _saslServerProvider = useSASL ? asSaslServerProvider(_subjectCreator, network) : null;
         _port = port;
-        _transport = transport;
-        _connectionId = id;
 
-        Map<Symbol,Object> serverProperties = new LinkedHashMap<>();
+        Map<Symbol, Object> serverProperties = new LinkedHashMap<>();
         serverProperties.put(Symbol.valueOf(ServerPropertyNames.PRODUCT), CommonProperties.getProductName());
         serverProperties.put(Symbol.valueOf(ServerPropertyNames.VERSION), CommonProperties.getReleaseVersion());
         serverProperties.put(Symbol.valueOf(ServerPropertyNames.QPID_BUILD), CommonProperties.getBuildVersion());
@@ -265,9 +264,7 @@ public class AMQPConnection_1_0 extends
 
         setDesiredIdleTimeout(1000L * broker.getConnection_heartBeatDelay());
 
-        _frameWriter =  new FrameWriter(getDescribedTypeRegistry(), getSender());
-
-
+        _frameWriter = new FrameWriter(getDescribedTypeRegistry(), getSender());
     }
 
 
@@ -299,7 +296,7 @@ public class AMQPConnection_1_0 extends
         }
         else
         {
-            // TODO - error
+            closeConnection(AmqpError.INVALID_FIELD, "Channel " + channel + " is not associated with a session");
         }
     }
 
@@ -321,12 +318,12 @@ public class AMQPConnection_1_0 extends
         _saslComplete = true;
         _frameReceivingState = FrameReceivingState.CLOSED;
         setClosedForInput(true);
-        close();
+        addCloseTicker();
     }
 
     public void receiveSaslChallenge(final SaslChallenge saslChallenge)
     {
-        // TODO - log unexpected frame
+        LOGGER.info("{} : Unexpected frame sasl-challenge", getLogSubject());
         closeSaslWithFailure();
     }
 
@@ -340,17 +337,22 @@ public class AMQPConnection_1_0 extends
         {
             case UNOPENED:
             case AWAITING_OPEN:
-                Error error = new Error();
-                error.setCondition(ConnectionError.CONNECTION_FORCED);
-                error.setDescription("Connection close sent before connection was opened");
-                closeConnection(error);
+                closeConnection(ConnectionError.CONNECTION_FORCED,
+                                "Connection close sent before connection was opened");
                 break;
             case OPEN:
                 _connectionState = ConnectionState.CLOSE_RECEIVED;
-                // TODO - we should log the error we received from the client if present
+                if(close.getError() != null)
+                {
+                    ErrorCondition condition = _remoteError.getCondition();
+                    Symbol errorCondition = condition == null ? null : condition.getValue();
+                    LOGGER.info("{} : Connection closed with error : {} - {}", getLogSubject(),
+                                errorCondition, _remoteError.getDescription());
+                }
                 sendClose(new Close());
                 _connectionState = ConnectionState.CLOSED;
                 _orderlyClose.set(true);
+                addCloseTicker();
                 break;
             case CLOSE_SENT:
                 _connectionState = ConnectionState.CLOSED;
@@ -359,14 +361,13 @@ public class AMQPConnection_1_0 extends
             default:
         }
         _remoteError = close.getError();
-
     }
 
     private void closeReceived()
     {
         Collection<Session_1_0> sessions = new ArrayList<>(_sessions);
 
-        for(final Session_1_0 session : sessions)
+        for (final Session_1_0 session : sessions)
         {
             AccessController.doPrivileged(new PrivilegedAction<Object>()
             {
@@ -387,7 +388,7 @@ public class AMQPConnection_1_0 extends
 
     public void receiveSaslMechanisms(final SaslMechanisms saslMechanisms)
     {
-        // TODO - log unexpected frame
+        LOGGER.info("{} : Unexpected frame sasl-mechanisms", getLogSubject());
         closeSaslWithFailure();
     }
 
@@ -437,7 +438,7 @@ public class AMQPConnection_1_0 extends
 
     void sessionEnded(final Session_1_0 session)
     {
-        if(!_closedOnOpen)
+        if (!_closedOnOpen)
         {
             _sessions.remove(session);
             sessionRemoved(session);
@@ -451,8 +452,6 @@ public class AMQPConnection_1_0 extends
 
     private void inputClosed()
     {
-        List<Runnable> postLockActions;
-
         if (!_closedForInput)
         {
             _closedForInput = true;
@@ -474,14 +473,11 @@ public class AMQPConnection_1_0 extends
             }
             closeReceived();
         }
-
-
     }
 
     private void closeSender()
     {
         setClosedForOutput(true);
-        close();
     }
 
     String getRemoteContainerId()
@@ -506,12 +502,11 @@ public class AMQPConnection_1_0 extends
         {
             _sendingSessions[channel] = null;
         }
-
     }
 
     public void receiveSaslOutcome(final SaslOutcome saslOutcome)
     {
-        // TODO - log unexpected frame
+        LOGGER.info("{} : Unexpected frame sasl-outcome", getLogSubject());
         closeSaslWithFailure();
     }
 
@@ -528,10 +523,15 @@ public class AMQPConnection_1_0 extends
         }
         else
         {
-            // TODO error
+            closeConnectionWithInvalidChannel(channel, end);
         }
     }
 
+    private void closeConnectionWithInvalidChannel(final short channel, final FrameBody frame)
+    {
+        closeConnection(AmqpError.INVALID_FIELD, String.format("%s frame received on channel %d which is not mapped", frame.getClass().getSimpleName().toLowerCase(), channel));
+    }
+
     public void receiveDisposition(final short channel,
                                    final Disposition disposition)
     {
@@ -551,7 +551,7 @@ public class AMQPConnection_1_0 extends
         }
         else
         {
-            // TODO - error
+            closeConnectionWithInvalidChannel(channel, disposition);
         }
 
     }
@@ -563,11 +563,12 @@ public class AMQPConnection_1_0 extends
         short myChannelId;
         if (begin.getRemoteChannel() != null)
         {
-            final Error error = new Error();
-            error.setCondition(ConnectionError.FRAMING_ERROR);
-            error.setDescription("BEGIN received on channel " + channel + " with given remote-channel "
-                                 + begin.getRemoteChannel() + ". Since the broker does not spontaneously start channels, this must be an error.");
-            closeConnection(error);
+            closeConnection(ConnectionError.FRAMING_ERROR,
+                            "BEGIN received on channel "
+                            + channel
+                            + " with given remote-channel "
+                            + begin.getRemoteChannel()
+                            + ". Since the broker does not spontaneously start channels, this must be an error.");
 
         }
         else // Peer requesting session creation
@@ -578,10 +579,11 @@ public class AMQPConnection_1_0 extends
                 myChannelId = getFirstFreeChannel();
                 if (myChannelId == -1)
                 {
-                    final Error error = new Error();
-                    error.setCondition(ConnectionError.FRAMING_ERROR);
-                    error.setDescription("BEGIN received on channel " + channel + ". There are no free channels for the broker to responsd on.");
-                    closeConnection(error);
+
+                    closeConnection(ConnectionError.FRAMING_ERROR,
+                                    "BEGIN received on channel "
+                                    + channel
+                                    + ". There are no free channels for the broker to responsd on.");
 
                 }
                 Session_1_0 session = new Session_1_0(this, begin);
@@ -605,10 +607,8 @@ public class AMQPConnection_1_0 extends
             }
             else
             {
-                final Error error = new Error();
-                error.setCondition(ConnectionError.FRAMING_ERROR);
-                error.setDescription("BEGIN received on channel " + channel + " which is already in use.");
-                closeConnection(error);
+                closeConnection(ConnectionError.FRAMING_ERROR,
+                                "BEGIN received on channel " + channel + " which is already in use.");
             }
 
         }
@@ -658,7 +658,7 @@ public class AMQPConnection_1_0 extends
         }
         else
         {
-            // TODO - error
+            closeConnectionWithInvalidChannel(channel, transfer);
         }
     }
 
@@ -680,7 +680,7 @@ public class AMQPConnection_1_0 extends
         }
         else
         {
-            // TODO - error
+            closeConnectionWithInvalidChannel(channel, flow);
         }
 
     }
@@ -720,12 +720,11 @@ public class AMQPConnection_1_0 extends
         }
         if (_idleTimeout != 0L && _idleTimeout < MINIMUM_SUPPORTED_IDLE_TIMEOUT)
         {
-            closeConnection(new Error(ConnectionError.CONNECTION_FORCED,
-                                      "Requested idle timeout of "
-                                      + _idleTimeout
-                                      + " is too low. The minimum supported timeout is"
-                                      + MINIMUM_SUPPORTED_IDLE_TIMEOUT));
-            close();
+            closeConnection(ConnectionError.CONNECTION_FORCED,
+                            "Requested idle timeout of "
+                            + _idleTimeout
+                            + " is too low. The minimum supported timeout is"
+                            + MINIMUM_SUPPORTED_IDLE_TIMEOUT);
             _closedOnOpen = true;
         }
         else
@@ -741,18 +740,14 @@ public class AMQPConnection_1_0 extends
             {
                 if (!addressSpace.isActive())
                 {
-                    final Error err = new Error();
-                    err.setCondition(AmqpError.NOT_FOUND);
-                    closeConnection(err);
-
                     _closedOnOpen = true;
 
+                    final Error err = new Error();
+                    err.setCondition(AmqpError.NOT_FOUND);
                     populateConnectionRedirect(addressSpace, err);
 
                     closeConnection(err);
 
-                    close();
-
                     _closedOnOpen = true;
 
                 }
@@ -863,7 +858,7 @@ public class AMQPConnection_1_0 extends
         }
         else
         {
-            // TODO - error
+            closeConnectionWithInvalidChannel(channel, detach);
         }
     }
 
@@ -996,19 +991,9 @@ public class AMQPConnection_1_0 extends
         }
     }
 
-    private void closeConnection()
+    private void closeConnection(ErrorCondition errorCondition, String description)
     {
-        switch (_connectionState)
-        {
-            case AWAITING_OPEN:
-            case OPEN:
-                Close closeToSend = new Close();
-                sendClose(closeToSend);
-                _connectionState = ConnectionState.CLOSE_SENT;
-                break;
-            case CLOSE_SENT:
-            default:
-        }
+        closeConnection(new Error(errorCondition, description));
     }
 
     private void closeConnection(final Error error)
@@ -1026,12 +1011,13 @@ public class AMQPConnection_1_0 extends
             case OPEN:
                 sendClose(close);
                 _connectionState = ConnectionState.CLOSE_SENT;
+                addCloseTicker();
             case CLOSE_SENT:
             case CLOSED:
                 // already sent our close - too late to do anything more
                 break;
             default:
-                // TODO Unknown state
+                throw new ServerScopedRuntimeException("Unknown state: " + _connectionState);
         }
     }
 
@@ -1045,7 +1031,7 @@ public class AMQPConnection_1_0 extends
             int payloadSent = _maxFrameSize - (size + 9);
             try
             {
-                if (payloadSent < (payload == null ? 0 : payload.remaining()))
+                if (payload != null && payloadSent < payload.remaining())
                 {
 
                     if (body instanceof Transfer)
@@ -1218,7 +1204,7 @@ public class AMQPConnection_1_0 extends
                 getSender().send(QpidByteBuffer.wrap(SASL_HEADER));
 
                 SaslMechanisms mechanisms = new SaslMechanisms();
-                ArrayList<Symbol> mechanismsList = new ArrayList<Symbol>();
+                ArrayList<Symbol> mechanismsList = new ArrayList<>();
                 for (String name :  subjectCreator.getMechanisms())
                 {
                     mechanismsList.add(Symbol.valueOf(name));
@@ -1245,7 +1231,8 @@ public class AMQPConnection_1_0 extends
                     }
                     else
                     {
-                        // TODO - log auth failure / close
+                        LOGGER.warn("{} : attempt to initiate AMQP connection without correctly authenticating", getLogSubject());
+                        _connectionState = ConnectionState.CLOSED;
                         getNetwork().close();
                     }
 
@@ -1257,7 +1244,9 @@ public class AMQPConnection_1_0 extends
             }
             else
             {
-                throw new ConnectionScopedRuntimeException("Unknown protocol header");
+                LOGGER.warn("{} : unknown AMQP header {}", getLogSubject(), Functions.str(header));
+                _connectionState = ConnectionState.CLOSED;
+                getNetwork().close();
             }
 
         }
@@ -1288,11 +1277,6 @@ public class AMQPConnection_1_0 extends
         }
     }
 
-    public boolean canSend()
-    {
-        return true;
-    }
-
     public void send(final AMQFrame amqFrame)
     {
         send(amqFrame, null);
@@ -1322,7 +1306,7 @@ public class AMQPConnection_1_0 extends
 
     }
 
-    public void close()
+    private void addCloseTicker()
     {
         long timeoutTime = System.currentTimeMillis() + getContextValue(Long.class, Connection.CLOSE_RESPONSE_TIMEOUT);
 
@@ -1403,15 +1387,29 @@ public class AMQPConnection_1_0 extends
         return false;
     }
 
-    public void sendConnectionCloseAsync(final AMQConstant cause, final String message)
+    @Override
+    public void sendConnectionCloseAsync(final ConnectionCloseReason reason, final String description)
     {
+
         stopConnection();
+        final ErrorCondition cause;
+        switch(reason)
+        {
+            case MANAGEMENT:
+                cause = ConnectionError.CONNECTION_FORCED;
+                break;
+            case TRANSACTION_TIMEOUT:
+                cause = AmqpError.RESOURCE_LIMIT_EXCEEDED;
+                break;
+            default:
+                cause = AmqpError.INTERNAL_ERROR;
+        }
         Action<ConnectionHandler> action = new Action<ConnectionHandler>()
         {
             @Override
             public void performAction(final ConnectionHandler object)
             {
-                closeConnection();
+                closeConnection(cause, description);
 
             }
         };
@@ -1426,7 +1424,30 @@ public class AMQPConnection_1_0 extends
 
     public void block()
     {
-        // TODO
+        synchronized (_blockingLock)
+        {
+            if (!_blocking)
+            {
+                _blocking = true;
+                doOnIOThreadAsync(
+                        new Runnable()
+                        {
+                            @Override
+                            public void run()
+                            {
+                                doBlock();
+                            }
+                        });
+            }
+        }
+    }
+
+    private void doBlock()
+    {
+        for(Session_1_0 session : _sessions)
+        {
+            session.block();
+        }
     }
 
     public String getRemoteContainerName()
@@ -1441,12 +1462,36 @@ public class AMQPConnection_1_0 extends
 
     public void unblock()
     {
-        // TODO
+        synchronized (_blockingLock)
+        {
+            if(_blocking)
+            {
+                _blocking = false;
+                doOnIOThreadAsync(
+                        new Runnable()
+                        {
+                            @Override
+                            public void run()
+                            {
+                                doUnblock();
+                            }
+                        });
+            }
+        }
     }
 
+    private void doUnblock()
+    {
+        for(Session_1_0 session : _sessions)
+        {
+            session.unblock();
+        }
+    }
+
+    @Override
     public long getSessionCountLimit()
     {
-        return 0;  // TODO
+        return _channelMax+1;
     }
 
     @Override
@@ -1477,7 +1522,7 @@ public class AMQPConnection_1_0 extends
             _channelMax = channelMax;
         }
         open.setChannelMax(UnsignedShort.valueOf((short) channelMax));
-        open.setContainerId(_container.getId());
+        open.setContainerId(getAddressSpace() == null ? UUID.randomUUID().toString() : getAddressSpace().getId().toString());
         open.setMaxFrameSize(UnsignedInteger.valueOf(maxFrameSize));
         // TODO - should we try to set the hostname based on the connection information?
         // open.setHostname();
@@ -1496,7 +1541,6 @@ public class AMQPConnection_1_0 extends
         err.setCondition(amqpError);
         err.setDescription(errorDescription);
         closeConnection(err);
-        close();
         _closedOnOpen = true;
     }
 

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/FrameOutputHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/FrameOutputHandler.java?rev=1770505&r1=1770504&r2=1770505&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/FrameOutputHandler.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/FrameOutputHandler.java Sat Nov 19 16:36:59 2016
@@ -20,17 +20,14 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
-import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
-
 import java.nio.ByteBuffer;
 
+import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
+
 public interface FrameOutputHandler<T>
 {
-    boolean canSend();
 
     void send(AMQFrame<T> frame);
     void send(AMQFrame<T> frame, ByteBuffer payload);
 
-    void close();
-
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java?rev=1770505&r1=1770504&r2=1770505&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java Sat Nov 19 16:36:59 2016
@@ -27,7 +27,6 @@ import java.util.Map;
 
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
-import org.apache.qpid.server.protocol.v1_0.type.Outcome;
 import org.apache.qpid.server.protocol.v1_0.type.Source;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.Target;
@@ -45,7 +44,6 @@ public abstract class LinkEndpoint<T ext
 {
 
     private T _link;
-    private DeliveryStateHandler _deliveryStateHandler;
     private Object _flowTransactionId;
     private SenderSettleMode _sendingSettlementMode;
     private ReceiverSettleMode _receivingSettlementMode;
@@ -84,16 +82,6 @@ public abstract class LinkEndpoint<T ext
 
     private Map<Binary,Delivery> _unsettledTransfers = new HashMap<Binary,Delivery>();
 
-    LinkEndpoint(final Session_1_0 sessionEndpoint, String name, Map<Binary, Outcome> unsettled, DeliveryStateHandler deliveryStateHandler)
-    {
-        _name = name;
-        _session = sessionEndpoint;
-        _linkCredit = UnsignedInteger.valueOf(0);
-        _drain = Boolean.FALSE;
-        _localUnsettled = unsettled;
-        _deliveryStateHandler = deliveryStateHandler;
-    }
-
     LinkEndpoint(final Session_1_0 sessionEndpoint,final Attach attach)
     {
         _session = sessionEndpoint;
@@ -215,9 +203,9 @@ public abstract class LinkEndpoint<T ext
                                      final Boolean settled)
     {
         // TODO
-        if (_deliveryStateHandler != null)
+        if (_link != null)
         {
-            _deliveryStateHandler.handle(unsettled.getDeliveryTag(), state, settled);
+            _link.handle(unsettled.getDeliveryTag(), state, settled);
         }
 
         if (settled)
@@ -478,11 +466,6 @@ public abstract class LinkEndpoint<T ext
         _link = link;
     }
 
-    public void setDeliveryStateHandler(final DeliveryStateHandler deliveryStateHandler)
-    {
-        _deliveryStateHandler = deliveryStateHandler;
-    }
-
     public void setSendingSettlementMode(SenderSettleMode sendingSettlementMode)
     {
         _sendingSettlementMode = sendingSettlementMode;

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java?rev=1770505&r1=1770504&r2=1770505&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java Sat Nov 19 16:36:59 2016
@@ -21,6 +21,8 @@
 package org.apache.qpid.server.protocol.v1_0;
 
 import org.apache.qpid.server.protocol.LinkModel;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
 
 public interface Link_1_0 extends LinkModel
@@ -28,5 +30,7 @@ public interface Link_1_0 extends LinkMo
 
     void remoteDetached(final LinkEndpoint endpoint, Detach detach);
 
+    void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled);
+
     void start();
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java?rev=1770505&r1=1770504&r2=1770505&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java Sat Nov 19 16:36:59 2016
@@ -20,8 +20,6 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
-import org.apache.qpid.server.protocol.v1_0.type.Binary;
-import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
 import org.apache.qpid.server.protocol.v1_0.type.Source;
 import org.apache.qpid.server.protocol.v1_0.type.Target;
 
@@ -51,16 +49,6 @@ public class ReceivingLinkAttachment
         return getEndpoint().getSource();
     }
 
-    public void setDeliveryStateHandler(final DeliveryStateHandler handler)
-    {
-        getEndpoint().setDeliveryStateHandler(handler);
-    }
-
-    public void updateDisposition(final Binary deliveryTag, final DeliveryState state, final boolean settled)
-    {
-        getEndpoint().updateDisposition(deliveryTag, state, settled);
-    }
-
     public Target getTarget()
     {
         return getEndpoint().getTarget();

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java?rev=1770505&r1=1770504&r2=1770505&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java Sat Nov 19 16:36:59 2016
@@ -50,11 +50,6 @@ public class SendingLinkAttachment
         return getEndpoint().getSource();
     }
 
-    public void setDeliveryStateHandler(final DeliveryStateHandler handler)
-    {
-        getEndpoint().setDeliveryStateHandler(handler);
-    }
-
     public void updateDisposition(final Binary deliveryTag, final DeliveryState state, final boolean settled)
     {
         getEndpoint().updateDisposition(deliveryTag, state, settled);

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1770505&r1=1770504&r2=1770505&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Sat Nov 19 16:36:59 2016
@@ -77,7 +77,7 @@ import org.apache.qpid.server.txn.Server
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.QueueExistsException;
 
-public class SendingLink_1_0 implements Link_1_0, DeliveryStateHandler
+public class SendingLink_1_0 implements Link_1_0
 {
     private static final Logger _logger = LoggerFactory.getLogger(SendingLink_1_0.class);
 
@@ -111,7 +111,6 @@ public class SendingLink_1_0 implements
         _linkAttachment = linkAttachment;
         final Source source = (Source) linkAttachment.getSource();
         _durability = source.getDurable();
-        linkAttachment.setDeliveryStateHandler(this);
         QueueDestination qd = null;
 
         EnumSet<ConsumerImpl.Option> options = EnumSet.noneOf(ConsumerImpl.Option.class);
@@ -595,7 +594,6 @@ public class SendingLink_1_0 implements
         if (linkAttachment.getSession() != null)
         {
             SendingLinkEndpoint endpoint = linkAttachment.getEndpoint();
-            endpoint.setDeliveryStateHandler(this);
             Map initialUnsettledMap = endpoint.getInitialUnsettledMap();
             Map<Binary, MessageInstance> unsettledCopy = new HashMap<Binary, MessageInstance>(_unsettledMap);
             _resumeAcceptedTransfers.clear();

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java?rev=1770505&r1=1770504&r2=1770505&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java Sat Nov 19 16:36:59 2016
@@ -48,7 +48,7 @@ import org.apache.qpid.server.store.Stor
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 
-public class StandardReceivingLink_1_0 implements ReceivingLink_1_0, DeliveryStateHandler
+public class StandardReceivingLink_1_0 implements ReceivingLink_1_0
 {
     private NamedAddressSpace _addressSpace;
 
@@ -73,9 +73,6 @@ public class StandardReceivingLink_1_0 i
         _destination = destination;
         _attachment = receivingLinkAttachment;
         _receivingSettlementMode = receivingLinkAttachment.getEndpoint().getReceivingSettlementMode();
-
-        receivingLinkAttachment.setDeliveryStateHandler(this);
-
         _durability = ((Target)receivingLinkAttachment.getTarget()).getDurable();
 
         _sectionDecoder = new SectionDecoderImpl(receivingLinkAttachment.getEndpoint().getSession().getConnection().getDescribedTypeRegistry());

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java?rev=1770505&r1=1770504&r2=1770505&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java Sat Nov 19 16:36:59 2016
@@ -33,6 +33,7 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
 import org.apache.qpid.server.protocol.v1_0.type.Section;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
@@ -179,6 +180,12 @@ public class TxnCoordinatorReceivingLink
         endpoint.detach();
     }
 
+    @Override
+    public void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
+    {
+
+    }
+
     private Error discharge(Integer transactionId, boolean fail)
     {
         Error error = null;

Modified: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java?rev=1770505&r1=1770504&r2=1770505&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java (original)
+++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java Sat Nov 19 16:36:59 2016
@@ -20,12 +20,14 @@
  */
 package org.apache.qpid.server.management.amqp;
 
+import java.nio.charset.StandardCharsets;
 import java.security.AccessControlException;
 import java.security.Principal;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.qpid.exchange.ExchangeDefaults;
@@ -70,6 +72,7 @@ public class ManagementAddressSpace impl
     private final LinkRegistry _linkRegistry = new NonDurableLinkRegistry();
     private final Broker<?> _broker;
     private final Principal _principal;
+    private final UUID _id;
 
     public ManagementAddressSpace(final SystemAddressSpaceCreator.AddressSpaceRegistry addressSpaceRegistry)
     {
@@ -86,10 +89,17 @@ public class ManagementAddressSpace impl
         _propertiesNode = new VirtualHostPropertiesNode(this);
         _messageStore = new MemoryMessageStore();
         _principal = new ManagementAddressSpacePrincipal(this);
+        _id = UUID.nameUUIDFromBytes((_broker.getId().toString()+"/"+name).getBytes(StandardCharsets.UTF_8));
     }
 
 
     @Override
+    public UUID getId()
+    {
+        return _id;
+    }
+
+    @Override
     public MessageSource getAttainedMessageSource(final String name)
     {
         if(_managementNode.getName().equals(name))



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org