You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/19 21:19:12 UTC
svn commit: r1770514 [1/3] - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/protocol/
broker-core/src/main/java/org/apache/qpid/server/transport/
broker-core/src/test/java/org/apache/qpid/server/consumer/
broker-core/src/test/jav...
Author: rgodfrey
Date: Sat Nov 19 21:19:11 2016
New Revision: 1770514
URL: http://svn.apache.org/viewvc?rev=1770514&view=rev
Log:
QPID-7532 : Remove the AMQConstant class
Added:
qpid/java/trunk/common/src/main/java/org/apache/qpid/protocol/ErrorCodes.java
- copied, changed from r1770513, qpid/java/trunk/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
Removed:
qpid/java/trunk/common/src/main/java/org/apache/qpid/AMQChannelException.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQNoConsumersException.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQNoRouteException.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
qpid/java/trunk/client/src/test/java/org/apache/qpid/client/AMQConnectionUnitTest.java
qpid/java/trunk/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
qpid/java/trunk/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/AMQChannelClosedException.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/AMQConnectionException.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/AMQException.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/AMQInternalException.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/AMQInvalidArgumentException.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/AMQInvalidRoutingKeyException.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/AMQProtocolException.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/AMQSecurityException.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/AMQTimeoutException.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/AMQUndeliveredException.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/QpidException.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
qpid/java/trunk/common/src/test/java/org/apache/qpid/QpidExceptionTest.java
qpid/java/trunk/jca/src/main/java/org/apache/qpid/ra/inflow/QpidExceptionHandler.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/session/ExchangeDeleteTest.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/session/QueueDeleteTest.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/logging/AccessControlLoggingTest.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/security/acl/AbstractACLTestCase.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/security/acl/ExhaustiveACLTest.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/security/acl/ExternalACLTest.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/UnroutableMessageTestExceptionListener.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/queue/QueuePolicyTest.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/client/AMQSessionTest.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1770514&r1=1770513&r2=1770514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Sat Nov 19 21:19:11 2016
@@ -24,7 +24,6 @@ import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListSet;
-import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.model.Consumer;
@@ -47,8 +46,6 @@ public interface AMQSessionModel<T exten
void close();
- void close(AMQConstant cause, String message);
-
LogSubject getLogSubject();
void doTimeoutAction(String reason);
@@ -67,10 +64,9 @@ public interface AMQSessionModel<T exten
int getUnacknowledgedMessageCount();
- Long getTxnCount();
- Long getTxnStart();
- Long getTxnCommits();
- Long getTxnRejects();
+ long getTxnStart();
+ long getTxnCommits();
+ long getTxnRejects();
int getChannelId();
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java?rev=1770514&r1=1770513&r2=1770514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java Sat Nov 19 21:19:11 2016
@@ -29,7 +29,6 @@ import javax.security.auth.Subject;
import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.logging.EventLoggerProvider;
import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -58,7 +57,7 @@ public interface AMQPConnection<C extend
void registerMessageDelivered(long size);
- void closeSessionAsync(AMQSessionModel<?> session, AMQConstant cause, String message);
+ void closeSessionAsync(AMQSessionModel<?> session, CloseReason reason, String message);
SocketAddress getRemoteSocketAddress();
@@ -72,13 +71,13 @@ public interface AMQPConnection<C extend
boolean hasSessionWithName(byte[] name);
- enum ConnectionCloseReason
+ enum CloseReason
{
MANAGEMENT,
TRANSACTION_TIMEOUT
}
- void sendConnectionCloseAsync(ConnectionCloseReason reason, String description);
+ void sendConnectionCloseAsync(CloseReason reason, String description);
boolean isIOThread();
ListenableFuture<Void> doOnIOThreadAsync(final Runnable task);
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1770514&r1=1770513&r2=1770514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java Sat Nov 19 21:19:11 2016
@@ -629,7 +629,7 @@ public abstract class AbstractAMQPConnec
{
if (_modelClosing.compareAndSet(false, true))
{
- sendConnectionCloseAsync(ConnectionCloseReason.MANAGEMENT, "Connection closed by external action");
+ sendConnectionCloseAsync(CloseReason.MANAGEMENT, "Connection closed by external action");
}
return _modelClosedFuture;
}
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java?rev=1770514&r1=1770513&r2=1770514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java Sat Nov 19 21:19:11 2016
@@ -31,7 +31,6 @@ import java.util.UUID;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.logging.LogSubject;
@@ -325,27 +324,21 @@ public class TestConsumerTarget implemen
}
@Override
- public Long getTxnCount()
+ public long getTxnStart()
{
- return null;
- }
-
- @Override
- public Long getTxnStart()
- {
- return null;
+ return 0L;
}
@Override
- public Long getTxnCommits()
+ public long getTxnCommits()
{
- return null;
+ return 0L;
}
@Override
- public Long getTxnRejects()
+ public long getTxnRejects()
{
- return null;
+ return 0L;
}
@Override
@@ -403,11 +396,6 @@ public class TestConsumerTarget implemen
}
@Override
- public void close(AMQConstant cause, String message)
- {
- }
-
- @Override
public void addDeleteTask(final Action task)
{
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java?rev=1770514&r1=1770513&r2=1770514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java Sat Nov 19 21:19:11 2016
@@ -54,7 +54,7 @@ public abstract class BaseConnectionActo
}
if (_connection != null)
{
- _connection.sendConnectionCloseAsync(AMQPConnection.ConnectionCloseReason.MANAGEMENT, "");
+ _connection.sendConnectionCloseAsync(AMQPConnection.CloseReason.MANAGEMENT, "");
}
}
finally
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java?rev=1770514&r1=1770513&r2=1770514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java Sat Nov 19 21:19:11 2016
@@ -579,7 +579,7 @@ public class VirtualHostTest extends Qpi
}
return null;
}
- }).when(connection).sendConnectionCloseAsync(any(AMQPConnection.ConnectionCloseReason.class), anyString());
+ }).when(connection).sendConnectionCloseAsync(any(AMQPConnection.CloseReason.class), anyString());
when(connection.getRemoteAddressString()).thenReturn("peer:1234");
return connection;
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java?rev=1770514&r1=1770513&r2=1770514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java Sat Nov 19 21:19:11 2016
@@ -36,7 +36,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
@@ -306,7 +305,7 @@ public class AMQPConnection_0_10 extends
}
@Override
- public void sendConnectionCloseAsync(final ConnectionCloseReason reason, final String description)
+ public void sendConnectionCloseAsync(final CloseReason reason, final String description)
{
stopConnection();
// Best mapping for all reasons is "forced"
@@ -314,12 +313,14 @@ public class AMQPConnection_0_10 extends
}
+ @Override
public void closeSessionAsync(final AMQSessionModel<?> session,
- final AMQConstant cause, final String message)
+ final CloseReason reason, final String message)
{
- _connection.closeSessionAsync((ServerSession) session, cause, message);
+ _connection.closeSessionAsync((ServerSession) session, reason, message);
}
+
@Override
protected void addAsyncTask(final Action<? super ServerConnection> action)
{
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1770514&r1=1770513&r2=1770514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Sat Nov 19 21:19:11 2016
@@ -38,7 +38,7 @@ import java.util.concurrent.atomic.Atomi
import javax.security.auth.Subject;
-import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.ErrorCodes;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.NamedAddressSpace;
@@ -46,6 +46,7 @@ import org.apache.qpid.server.model.Tran
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.ConnectionClosingTicker;
+import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.transport.ServerNetworkConnection;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
@@ -164,8 +165,20 @@ public class ServerConnection extends Co
return _transport;
}
- public void closeSessionAsync(final ServerSession session, final AMQConstant cause, final String message)
+ public void closeSessionAsync(final ServerSession session, final AMQPConnection.CloseReason reason, final String message)
{
+ final int cause;
+ switch (reason)
+ {
+ case MANAGEMENT:
+ cause = ErrorCodes.CONNECTION_FORCED;
+ break;
+ case TRANSACTION_TIMEOUT:
+ cause = ErrorCodes.RESOURCE_ERROR;
+ break;
+ default:
+ cause = ErrorCodes.INTERNAL_ERROR;
+ }
addAsyncTask(new Action<ServerConnection>()
{
@@ -178,7 +191,7 @@ public class ServerConnection extends Co
ExecutionErrorCode code = ExecutionErrorCode.INTERNAL_ERROR;
try
{
- code = ExecutionErrorCode.get(cause.getCode());
+ code = ExecutionErrorCode.get(cause);
}
catch (IllegalArgumentException iae)
{
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1770514&r1=1770513&r2=1770514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Sat Nov 19 21:19:11 2016
@@ -54,7 +54,6 @@ import com.google.common.util.concurrent
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
@@ -82,6 +81,7 @@ import org.apache.qpid.server.store.Mess
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.txn.AlreadyKnownDtxException;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.DistributedTransaction;
@@ -741,12 +741,12 @@ public class ServerSession extends Sessi
}
}
- public Long getTxnCommits()
+ public long getTxnCommits()
{
return _txnCommits.get();
}
- public Long getTxnRejects()
+ public long getTxnRejects()
{
return _txnRejects.get();
}
@@ -756,12 +756,7 @@ public class ServerSession extends Sessi
return getChannel();
}
- public Long getTxnCount()
- {
- return _txnCount.get();
- }
-
- public Long getTxnStart()
+ public long getTxnStart()
{
return _txnStarts.get();
}
@@ -935,20 +930,7 @@ public class ServerSession extends Sessi
+ "] ";
}
- @Override
- public void close(AMQConstant cause, String message)
- {
- if (cause == null)
- {
- close();
- }
- else
- {
- close(cause.getCode(), message);
- }
- }
-
- void close(int cause, String message)
+ public void close(int cause, String message)
{
_forcedCloseLogMessage.compareAndSet(null, ChannelMessages.CLOSE_FORCED(cause, message));
close();
@@ -1260,7 +1242,8 @@ public class ServerSession extends Sessi
@Override
public void doTimeoutAction(final String reason)
{
- getAMQPConnection().closeSessionAsync(ServerSession.this, AMQConstant.RESOURCE_ERROR, reason);
+ getAMQPConnection().closeSessionAsync(ServerSession.this,
+ AMQPConnection.CloseReason.TRANSACTION_TIMEOUT, reason);
}
public final long getMaxUncommittedInMemorySize()
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1770514&r1=1770513&r2=1770514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Sat Nov 19 21:19:11 2016
@@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.ErrorCodes;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.filter.ArrivalTimeFilter;
@@ -421,8 +421,8 @@ public class ServerSessionDelegate exten
{
getEventLogger(ssn).message(ChannelMessages.FLOW_CONTROL_IGNORED());
- serverSession.close(AMQConstant.MESSAGE_TOO_LARGE,
- "Session flow control was requested, but not enforced by sender");
+ serverSession.close(ErrorCodes.MESSAGE_TOO_LARGE,
+ "Session flow control was requested, but not enforced by sender");
}
else if(xfr.getBodySize() > serverSession.getConnection().getMaxMessageSize())
{
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1770514&r1=1770513&r2=1770514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Sat Nov 19 21:19:11 2016
@@ -55,7 +55,7 @@ import org.apache.qpid.bytebuffer.QpidBy
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.*;
-import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.ErrorCodes;
import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
@@ -287,7 +287,7 @@ public class AMQChannel
@Override
public void doTimeoutAction(String reason)
{
- _connection.sendConnectionCloseAsync(AMQPConnection.ConnectionCloseReason.TRANSACTION_TIMEOUT, reason);
+ _connection.sendConnectionCloseAsync(AMQPConnection.CloseReason.TRANSACTION_TIMEOUT, reason);
}
private void message(final LogMessage message)
@@ -377,22 +377,17 @@ public class AMQChannel
}
}
- public Long getTxnCommits()
+ public long getTxnCommits()
{
return _txnCommits.get();
}
- public Long getTxnRejects()
+ public long getTxnRejects()
{
return _txnRejects.get();
}
- public Long getTxnCount()
- {
- return _txnCount.get();
- }
-
- public Long getTxnStart()
+ public long getTxnStart()
{
return _txnStarts.get();
}
@@ -542,7 +537,7 @@ public class AMQChannel
}
catch (AccessControlException e)
{
- _connection.sendConnectionClose(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId());
+ _connection.sendConnectionClose(ErrorCodes.ACCESS_REFUSED, e.getMessage(), getChannelId());
}
}
@@ -614,7 +609,7 @@ public class AMQChannel
@Override
public void run()
{
- _connection.sendConnectionClose(AMQConstant.NO_ROUTE,
+ _connection.sendConnectionClose(ErrorCodes.NO_ROUTE,
"No route for message " + description, _channelId);
}
@@ -628,7 +623,7 @@ public class AMQChannel
{
_connection.writeFrame(new AMQFrame(_channelId, new BasicNackBody(_confirmedMessageCounter, false, false)));
}
- _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE,
+ _transaction.addPostTransactionAction(new WriteReturnAction(ErrorCodes.NO_ROUTE,
"No Route for message "
+ description,
message));
@@ -654,9 +649,9 @@ public class AMQChannel
long currentSize = _currentMessage.addContentBodyFrame(contentBody);
if(currentSize > _currentMessage.getSize())
{
- _connection.sendConnectionClose(AMQConstant.FRAME_ERROR,
- "More message data received than content header defined",
- _channelId);
+ _connection.sendConnectionClose(ErrorCodes.FRAME_ERROR,
+ "More message data received than content header defined",
+ _channelId);
}
else
{
@@ -891,10 +886,10 @@ public class AMQChannel
@Override
public void close()
{
- close(null, null);
+ close(0, null);
}
- public void close(AMQConstant cause, String message)
+ public void close(int cause, String message)
{
if(!_closing.compareAndSet(false, true))
{
@@ -921,9 +916,9 @@ public class AMQChannel
}
finally
{
- LogMessage operationalLogMessage = cause == null ?
+ LogMessage operationalLogMessage = cause == 0?
ChannelMessages.CLOSE() :
- ChannelMessages.CLOSE_FORCED(cause.getCode(), message);
+ ChannelMessages.CLOSE_FORCED(cause, message);
messageWithSubject(operationalLogMessage);
}
}
@@ -1525,7 +1520,7 @@ public class AMQChannel
message.getContentHeaderBody(),
message,
_channelId,
- AMQConstant.NO_CONSUMERS.getCode(),
+ ErrorCodes.NO_CONSUMERS,
IMMEDIATE_DELIVERY_REPLY_TEXT);
}
@@ -1626,11 +1621,11 @@ public class AMQChannel
private class WriteReturnAction implements ServerTransaction.Action
{
- private final AMQConstant _errorCode;
+ private final int _errorCode;
private final String _description;
private final MessageReference<AMQMessage> _reference;
- public WriteReturnAction(AMQConstant errorCode,
+ public WriteReturnAction(int errorCode,
String description,
AMQMessage message)
{
@@ -1646,7 +1641,7 @@ public class AMQChannel
message.getContentHeaderBody(),
message,
_channelId,
- _errorCode.getCode(),
+ _errorCode,
AMQShortString.validValueOf(_description));
_reference.release();
}
@@ -2015,9 +2010,9 @@ public class AMQChannel
if (ProtocolVersion.v0_91.equals(_connection.getProtocolVersion()))
{
- _connection.sendConnectionClose(AMQConstant.COMMAND_INVALID,
- "AccessRequest not present in AMQP versions other than 0-8, 0-9",
- _channelId);
+ _connection.sendConnectionClose(ErrorCodes.COMMAND_INVALID,
+ "AccessRequest not present in AMQP versions other than 0-8, 0-9",
+ _channelId);
}
else
{
@@ -2121,12 +2116,12 @@ public class AMQChannel
}
if (queueName != null)
{
- closeChannel(AMQConstant.NOT_FOUND, "No such queue, '" + queueName + "'");
+ closeChannel(ErrorCodes.NOT_FOUND, "No such queue, '" + queueName + "'");
}
else
{
- _connection.sendConnectionClose(AMQConstant.NOT_ALLOWED,
- "No queue name provided, no default queue defined.", _channelId);
+ _connection.sendConnectionClose(ErrorCodes.NOT_ALLOWED,
+ "No queue name provided, no default queue defined.", _channelId);
}
}
else
@@ -2150,19 +2145,19 @@ public class AMQChannel
catch (ConsumerTagInUseException cte)
{
- _connection.sendConnectionClose(AMQConstant.NOT_ALLOWED,
+ _connection.sendConnectionClose(ErrorCodes.NOT_ALLOWED,
"Non-unique consumer tag, '" + consumerTag1
+ "'", _channelId);
}
catch (AMQInvalidArgumentException ise)
{
- _connection.sendConnectionClose(AMQConstant.ARGUMENT_INVALID, ise.getMessage(), _channelId);
+ _connection.sendConnectionClose(ErrorCodes.ARGUMENT_INVALID, ise.getMessage(), _channelId);
}
catch (Queue.ExistingExclusiveConsumer e)
{
- _connection.sendConnectionClose(AMQConstant.ACCESS_REFUSED,
+ _connection.sendConnectionClose(ErrorCodes.ACCESS_REFUSED,
"Cannot subscribe to queue '"
+ queue1.getName()
+ "' as it already has an existing exclusive consumer", _channelId);
@@ -2170,7 +2165,7 @@ public class AMQChannel
}
catch (Queue.ExistingConsumerPreventsExclusive e)
{
- _connection.sendConnectionClose(AMQConstant.ACCESS_REFUSED,
+ _connection.sendConnectionClose(ErrorCodes.ACCESS_REFUSED,
"Cannot subscribe to queue '"
+ queue1.getName()
+ "' exclusively as it already has a consumer", _channelId);
@@ -2178,14 +2173,14 @@ public class AMQChannel
}
catch (AccessControlException e)
{
- _connection.sendConnectionClose(AMQConstant.ACCESS_REFUSED, "Cannot subscribe to queue '"
- + queue1.getName()
- + "' permission denied", _channelId);
+ _connection.sendConnectionClose(ErrorCodes.ACCESS_REFUSED, "Cannot subscribe to queue '"
+ + queue1.getName()
+ + "' permission denied", _channelId);
}
catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
{
- _connection.sendConnectionClose(AMQConstant.ACCESS_REFUSED,
+ _connection.sendConnectionClose(ErrorCodes.ACCESS_REFUSED,
"Cannot subscribe to queue '"
+ queue1.getName()
+ "' as it already has an incompatible exclusivity policy", _channelId);
@@ -2193,7 +2188,7 @@ public class AMQChannel
}
catch (MessageSource.QueueDeleted queueDeleted)
{
- _connection.sendConnectionClose(AMQConstant.NOT_FOUND,
+ _connection.sendConnectionClose(ErrorCodes.NOT_FOUND,
"Cannot subscribe to queue '"
+ queue1.getName()
+ "' as it has been deleted", _channelId);
@@ -2220,13 +2215,13 @@ public class AMQChannel
}
if (queueName != null)
{
- _connection.sendConnectionClose(AMQConstant.NOT_FOUND, "No such queue, '" + queueName + "'", _channelId);
+ _connection.sendConnectionClose(ErrorCodes.NOT_FOUND, "No such queue, '" + queueName + "'", _channelId);
}
else
{
- _connection.sendConnectionClose(AMQConstant.NOT_ALLOWED,
- "No queue name provided, no default queue defined.", _channelId);
+ _connection.sendConnectionClose(ErrorCodes.NOT_ALLOWED,
+ "No queue name provided, no default queue defined.", _channelId);
}
}
@@ -2246,26 +2241,26 @@ public class AMQChannel
}
catch (AccessControlException e)
{
- _connection.sendConnectionClose(AMQConstant.ACCESS_REFUSED, e.getMessage(), _channelId);
+ _connection.sendConnectionClose(ErrorCodes.ACCESS_REFUSED, e.getMessage(), _channelId);
}
catch (MessageSource.ExistingExclusiveConsumer e)
{
- _connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, "Queue has an exclusive consumer", _channelId);
+ _connection.sendConnectionClose(ErrorCodes.NOT_ALLOWED, "Queue has an exclusive consumer", _channelId);
}
catch (MessageSource.ExistingConsumerPreventsExclusive e)
{
- _connection.sendConnectionClose(AMQConstant.INTERNAL_ERROR,
+ _connection.sendConnectionClose(ErrorCodes.INTERNAL_ERROR,
"The GET request has been evaluated as an exclusive consumer, " +
"this is likely due to a programming error in the Qpid broker", _channelId);
}
catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
{
- _connection.sendConnectionClose(AMQConstant.NOT_ALLOWED,
- "Queue has an incompatible exclusivity policy", _channelId);
+ _connection.sendConnectionClose(ErrorCodes.NOT_ALLOWED,
+ "Queue has an incompatible exclusivity policy", _channelId);
}
catch (MessageSource.QueueDeleted queueDeleted)
{
- _connection.sendConnectionClose(AMQConstant.NOT_FOUND, "Queue has been deleted", _channelId);
+ _connection.sendConnectionClose(ErrorCodes.NOT_FOUND, "Queue has been deleted", _channelId);
}
}
}
@@ -2291,7 +2286,7 @@ public class AMQChannel
if(blockingTimeoutExceeded())
{
message(ChannelMessages.FLOW_CONTROL_IGNORED());
- closeChannel(AMQConstant.MESSAGE_TOO_LARGE,
+ closeChannel(ErrorCodes.MESSAGE_TOO_LARGE,
"Channel flow control was requested, but not enforced by sender");
}
else
@@ -2310,7 +2305,7 @@ public class AMQChannel
// if the exchange does not exist we raise a channel exception
if (destination == null)
{
- closeChannel(AMQConstant.NOT_FOUND, "Unknown exchange name: '" + exchangeName + "'");
+ closeChannel(ErrorCodes.NOT_FOUND, "Unknown exchange name: '" + exchangeName + "'");
}
else
{
@@ -2326,7 +2321,7 @@ public class AMQChannel
}
catch (AccessControlException e)
{
- _connection.sendConnectionClose(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId());
+ _connection.sendConnectionClose(ErrorCodes.ACCESS_REFUSED, e.getMessage(), getChannelId());
}
}
@@ -2519,9 +2514,9 @@ public class AMQChannel
}
else
{
- _connection.sendConnectionClose(AMQConstant.COMMAND_INVALID,
- "Attempt to send a content header without first sending a publish frame",
- _channelId);
+ _connection.sendConnectionClose(ErrorCodes.COMMAND_INVALID,
+ "Attempt to send a content header without first sending a publish frame",
+ _channelId);
}
}
@@ -2537,16 +2532,16 @@ public class AMQChannel
{
if(bodySize > _connection.getMaxMessageSize())
{
- closeChannel(AMQConstant.MESSAGE_TOO_LARGE,
+ closeChannel(ErrorCodes.MESSAGE_TOO_LARGE,
"Message size of " + bodySize + " greater than allowed maximum of " + _connection.getMaxMessageSize());
}
publishContentHeader(new ContentHeaderBody(properties, bodySize));
}
else
{
- _connection.sendConnectionClose(AMQConstant.COMMAND_INVALID,
- "Attempt to send a content header without first sending a publish frame",
- _channelId);
+ _connection.sendConnectionClose(ErrorCodes.COMMAND_INVALID,
+ "Attempt to send a content header without first sending a publish frame",
+ _channelId);
}
}
@@ -2883,10 +2878,10 @@ public class AMQChannel
{
if (!new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_CLASS).equals(type))
{
- _connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, "Attempt to redeclare default exchange: "
- + " of type "
- + ExchangeDefaults.DIRECT_EXCHANGE_CLASS
- + " to " + type + ".", getChannelId());
+ _connection.sendConnectionClose(ErrorCodes.NOT_ALLOWED, "Attempt to redeclare default exchange: "
+ + " of type "
+ + ExchangeDefaults.DIRECT_EXCHANGE_CLASS
+ + " to " + type + ".", getChannelId());
}
else if (!nowait)
{
@@ -2902,18 +2897,18 @@ public class AMQChannel
exchange = getExchange(exchangeName.toString());
if (exchange == null)
{
- closeChannel(AMQConstant.NOT_FOUND, "Unknown exchange: '" + exchangeName + "'");
+ closeChannel(ErrorCodes.NOT_FOUND, "Unknown exchange: '" + exchangeName + "'");
}
else if (!(type == null || type.length() == 0) && !exchange.getType().equals(type.toString()))
{
- _connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: '"
- + exchangeName
- + "' of type "
- + exchange.getType()
- + " to "
- + type
- + ".", getChannelId());
+ _connection.sendConnectionClose(ErrorCodes.NOT_ALLOWED, "Attempt to redeclare exchange: '"
+ + exchangeName
+ + "' of type "
+ + exchange.getType()
+ + " to "
+ + type
+ + ".", getChannelId());
}
else if (!nowait)
{
@@ -2957,7 +2952,7 @@ public class AMQChannel
Exchange existing = getExchange(name);
if (existing == null || !existing.getType().equals(typeString))
{
- _connection.sendConnectionClose(AMQConstant.NOT_ALLOWED,
+ _connection.sendConnectionClose(ErrorCodes.NOT_ALLOWED,
"Attempt to declare exchange: '" + exchangeName +
"' which begins with reserved prefix.", getChannelId());
}
@@ -2972,10 +2967,10 @@ public class AMQChannel
exchange = e.getExistingExchange();
if (!exchange.getType().equals(typeString))
{
- _connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: '"
- + exchangeName + "' of type "
- + exchange.getType()
- + " to " + type + ".", getChannelId());
+ _connection.sendConnectionClose(ErrorCodes.NOT_ALLOWED, "Attempt to redeclare exchange: '"
+ + exchangeName + "' of type "
+ + exchange.getType()
+ + " to " + type + ".", getChannelId());
}
else
{
@@ -2988,16 +2983,16 @@ public class AMQChannel
}
catch (NoFactoryForTypeException e)
{
- _connection.sendConnectionClose(AMQConstant.COMMAND_INVALID, "Unknown exchange type '"
- + e.getType()
- + "' for exchange '"
- + exchangeName
- + "'", getChannelId());
+ _connection.sendConnectionClose(ErrorCodes.COMMAND_INVALID, "Unknown exchange type '"
+ + e.getType()
+ + "' for exchange '"
+ + exchangeName
+ + "'", getChannelId());
}
catch (AccessControlException e)
{
- _connection.sendConnectionClose(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId());
+ _connection.sendConnectionClose(ErrorCodes.ACCESS_REFUSED, e.getMessage(), getChannelId());
}
catch (UnknownConfiguredObjectException e)
@@ -3007,15 +3002,15 @@ public class AMQChannel
+ (e.getName() != null
? "name: '" + e.getName() + "'"
: "id: " + e.getId());
- _connection.sendConnectionClose(AMQConstant.NOT_FOUND, message, getChannelId());
+ _connection.sendConnectionClose(ErrorCodes.NOT_FOUND, message, getChannelId());
}
catch (IllegalArgumentException e)
{
- _connection.sendConnectionClose(AMQConstant.COMMAND_INVALID, "Error creating exchange '"
- + exchangeName
- + "': "
- + e.getMessage(), getChannelId());
+ _connection.sendConnectionClose(ErrorCodes.COMMAND_INVALID, "Error creating exchange '"
+ + exchangeName
+ + "': "
+ + e.getMessage(), getChannelId());
}
}
@@ -3037,8 +3032,8 @@ public class AMQChannel
if (isDefaultExchange(exchangeStr))
{
- _connection.sendConnectionClose(AMQConstant.NOT_ALLOWED,
- "Default Exchange cannot be deleted", getChannelId());
+ _connection.sendConnectionClose(ErrorCodes.NOT_ALLOWED,
+ "Default Exchange cannot be deleted", getChannelId());
}
@@ -3049,13 +3044,13 @@ public class AMQChannel
final Exchange<?> exchange = getExchange(exchangeName);
if (exchange == null)
{
- closeChannel(AMQConstant.NOT_FOUND, "No such exchange: '" + exchangeStr + "'");
+ closeChannel(ErrorCodes.NOT_FOUND, "No such exchange: '" + exchangeStr + "'");
}
else
{
if (ifUnused && exchange.hasBindings())
{
- closeChannel(AMQConstant.IN_USE, "Exchange has bindings");
+ closeChannel(ErrorCodes.IN_USE, "Exchange has bindings");
}
else
{
@@ -3072,15 +3067,15 @@ public class AMQChannel
}
catch (ExchangeIsAlternateException e)
{
- closeChannel(AMQConstant.NOT_ALLOWED, "Exchange in use as an alternate exchange");
+ closeChannel(ErrorCodes.NOT_ALLOWED, "Exchange in use as an alternate exchange");
}
catch (RequiredExchangeException e)
{
- closeChannel(AMQConstant.NOT_ALLOWED, "Exchange '" + exchangeStr + "' cannot be deleted");
+ closeChannel(ErrorCodes.NOT_ALLOWED, "Exchange '" + exchangeStr + "' cannot be deleted");
}
catch (AccessControlException e)
{
- _connection.sendConnectionClose(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId());
+ _connection.sendConnectionClose(ErrorCodes.ACCESS_REFUSED, e.getMessage(), getChannelId());
}
}
}
@@ -3128,11 +3123,11 @@ public class AMQChannel
String message = queueName == null
? "No default queue defined on channel and queue was null"
: "Queue " + queueName + " does not exist.";
- closeChannel(AMQConstant.NOT_FOUND, message);
+ closeChannel(ErrorCodes.NOT_FOUND, message);
}
else if (isDefaultExchange(exchange))
{
- _connection.sendConnectionClose(AMQConstant.NOT_ALLOWED,
+ _connection.sendConnectionClose(ErrorCodes.NOT_ALLOWED,
"Cannot bind the queue '" + queueName + "' to the default exchange", getChannelId());
}
@@ -3144,7 +3139,7 @@ public class AMQChannel
final Exchange<?> exch = getExchange(exchangeName);
if (exch == null)
{
- closeChannel(AMQConstant.NOT_FOUND,
+ closeChannel(ErrorCodes.NOT_FOUND,
"Exchange '" + exchangeName + "' does not exist.");
}
else
@@ -3187,7 +3182,7 @@ public class AMQChannel
}
catch (AccessControlException e)
{
- _connection.sendConnectionClose(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId());
+ _connection.sendConnectionClose(ErrorCodes.ACCESS_REFUSED, e.getMessage(), getChannelId());
}
}
}
@@ -3235,7 +3230,7 @@ public class AMQChannel
queue = getQueue(queueName.toString());
if (queue == null)
{
- closeChannel(AMQConstant.NOT_FOUND,
+ closeChannel(ErrorCodes.NOT_FOUND,
"Queue: '"
+ queueName
+ "' not found on VirtualHost '"
@@ -3246,9 +3241,9 @@ public class AMQChannel
{
if (!queue.verifySessionAccess(this))
{
- _connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, "Queue '"
- + queue.getName()
- + "' is exclusive, but not created on this Connection.", getChannelId());
+ _connection.sendConnectionClose(ErrorCodes.NOT_ALLOWED, "Queue '"
+ + queue.getName()
+ + "' is exclusive, but not created on this Connection.", getChannelId());
}
else
{
@@ -3335,15 +3330,15 @@ public class AMQChannel
if (!queue.verifySessionAccess(this))
{
- _connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, "Queue '"
- + queue.getName()
- + "' is exclusive, but not created on this Connection.", getChannelId());
+ _connection.sendConnectionClose(ErrorCodes.NOT_ALLOWED, "Queue '"
+ + queue.getName()
+ + "' is exclusive, but not created on this Connection.", getChannelId());
}
else if (queue.isExclusive() != exclusive)
{
- closeChannel(AMQConstant.ALREADY_EXISTS,
+ closeChannel(ErrorCodes.ALREADY_EXISTS,
"Cannot re-declare queue '"
+ queue.getName()
+ "' with different exclusivity (was: "
@@ -3359,7 +3354,7 @@ public class AMQChannel
? LifetimePolicy.DELETE_ON_CONNECTION_CLOSE
: LifetimePolicy.PERMANENT)))
{
- closeChannel(AMQConstant.ALREADY_EXISTS,
+ closeChannel(ErrorCodes.ALREADY_EXISTS,
"Cannot re-declare queue '"
+ queue.getName()
+ "' with different lifetime policy (was: "
@@ -3390,7 +3385,7 @@ public class AMQChannel
}
catch (AccessControlException e)
{
- _connection.sendConnectionClose(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId());
+ _connection.sendConnectionClose(ErrorCodes.ACCESS_REFUSED, e.getMessage(), getChannelId());
}
}
@@ -3423,27 +3418,27 @@ public class AMQChannel
if (queue == null)
{
- closeChannel(AMQConstant.NOT_FOUND, "Queue '" + queueName + "' does not exist.");
+ closeChannel(ErrorCodes.NOT_FOUND, "Queue '" + queueName + "' does not exist.");
}
else
{
if (ifEmpty && !queue.isEmpty())
{
- closeChannel(AMQConstant.IN_USE, "Queue: '" + queueName + "' is not empty.");
+ closeChannel(ErrorCodes.IN_USE, "Queue: '" + queueName + "' is not empty.");
}
else if (ifUnused && !queue.isUnused())
{
// TODO - Error code
- closeChannel(AMQConstant.IN_USE, "Queue: '" + queueName + "' is still used.");
+ closeChannel(ErrorCodes.IN_USE, "Queue: '" + queueName + "' is still used.");
}
else
{
if (!queue.verifySessionAccess(this))
{
- _connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, "Queue '"
- + queue.getName()
- + "' is exclusive, but not created on this Connection.", getChannelId());
+ _connection.sendConnectionClose(ErrorCodes.NOT_ALLOWED, "Queue '"
+ + queue.getName()
+ + "' is exclusive, but not created on this Connection.", getChannelId());
}
else
@@ -3461,7 +3456,7 @@ public class AMQChannel
}
catch (AccessControlException e)
{
- _connection.sendConnectionClose(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId());
+ _connection.sendConnectionClose(ErrorCodes.ACCESS_REFUSED, e.getMessage(), getChannelId());
}
}
@@ -3482,16 +3477,16 @@ public class AMQChannel
if (queueName == null && (queue = getDefaultQueue()) == null)
{
- _connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, "No queue specified.", getChannelId());
+ _connection.sendConnectionClose(ErrorCodes.NOT_ALLOWED, "No queue specified.", getChannelId());
}
else if ((queueName != null) && (queue = getQueue(queueName.toString())) == null)
{
- closeChannel(AMQConstant.NOT_FOUND, "Queue '" + queueName + "' does not exist.");
+ closeChannel(ErrorCodes.NOT_FOUND, "Queue '" + queueName + "' does not exist.");
}
else if (!queue.verifySessionAccess(this))
{
- _connection.sendConnectionClose(AMQConstant.NOT_ALLOWED,
- "Queue is exclusive, but not created on this Connection.", getChannelId());
+ _connection.sendConnectionClose(ErrorCodes.NOT_ALLOWED,
+ "Queue is exclusive, but not created on this Connection.", getChannelId());
}
else
{
@@ -3509,7 +3504,7 @@ public class AMQChannel
}
catch (AccessControlException e)
{
- _connection.sendConnectionClose(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId());
+ _connection.sendConnectionClose(ErrorCodes.ACCESS_REFUSED, e.getMessage(), getChannelId());
}
@@ -3544,13 +3539,13 @@ public class AMQChannel
String message = useDefaultQueue
? "No default queue defined on channel and queue was null"
: "Queue '" + queueName + "' does not exist.";
- closeChannel(AMQConstant.NOT_FOUND, message);
+ closeChannel(ErrorCodes.NOT_FOUND, message);
}
else if (isDefaultExchange(exchange))
{
- _connection.sendConnectionClose(AMQConstant.NOT_ALLOWED, "Cannot unbind the queue '"
- + queue.getName()
- + "' from the default exchange", getChannelId());
+ _connection.sendConnectionClose(ErrorCodes.NOT_ALLOWED, "Cannot unbind the queue '"
+ + queue.getName()
+ + "' from the default exchange", getChannelId());
}
else
@@ -3560,11 +3555,11 @@ public class AMQChannel
if (exch == null)
{
- closeChannel(AMQConstant.NOT_FOUND, "Exchange '" + exchange + "' does not exist.");
+ closeChannel(ErrorCodes.NOT_FOUND, "Exchange '" + exchange + "' does not exist.");
}
else if (!exch.hasBinding(String.valueOf(bindingKey), queue))
{
- closeChannel(AMQConstant.NOT_FOUND, "No such binding");
+ closeChannel(ErrorCodes.NOT_FOUND, "No such binding");
}
else
{
@@ -3578,7 +3573,7 @@ public class AMQChannel
}
catch (AccessControlException e)
{
- _connection.sendConnectionClose(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId());
+ _connection.sendConnectionClose(ErrorCodes.ACCESS_REFUSED, e.getMessage(), getChannelId());
}
}
@@ -3613,7 +3608,7 @@ public class AMQChannel
if (!isTransactional())
{
- closeChannel(AMQConstant.COMMAND_INVALID,
+ closeChannel(ErrorCodes.COMMAND_INVALID,
"Fatal error: commit called on non-transactional channel");
}
commit(new Runnable()
@@ -3638,7 +3633,7 @@ public class AMQChannel
if (!isTransactional())
{
- closeChannel(AMQConstant.COMMAND_INVALID,
+ closeChannel(ErrorCodes.COMMAND_INVALID,
"Fatal error: rollback called on non-transactional channel");
}
@@ -3676,7 +3671,8 @@ public class AMQChannel
}
- private void closeChannel(final AMQConstant cause, final String message)
+
+ private void closeChannel(int cause, final String message)
{
_connection.closeChannelAndWriteFrame(this, cause, message);
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java?rev=1770514&r1=1770513&r2=1770514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java Sat Nov 19 21:19:11 2016
@@ -23,7 +23,6 @@ package org.apache.qpid.server.protocol.
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.logging.EventLoggerProvider;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ContextProvider;
@@ -58,9 +57,10 @@ public interface AMQPConnection_0_8<C ex
void writeFrame(AMQDataBlock frame);
- void sendConnectionClose(AMQConstant errorCode,
+ void sendConnectionClose(int errorCode,
String message, int channelId);
+
boolean isCloseWhenNoRoute();
ContextProvider getContextProvider();
@@ -83,7 +83,7 @@ public interface AMQPConnection_0_8<C ex
boolean isSendQueueDeleteOkRegardless();
- void closeChannelAndWriteFrame(AMQChannel amqChannel, AMQConstant cause, String message);
+ void closeChannelAndWriteFrame(AMQChannel amqChannel, int code, String message);
ProtocolOutputConverter getProtocolOutputConverter();
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java?rev=1770514&r1=1770513&r2=1770514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java Sat Nov 19 21:19:11 2016
@@ -53,12 +53,13 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.QpidException;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.codec.AMQDecoder;
import org.apache.qpid.codec.ServerDecoder;
import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.configuration.CommonProperties;
import org.apache.qpid.framing.*;
import org.apache.qpid.properties.ConnectionStartProperties;
-import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.ErrorCodes;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.message.InstanceProperties;
@@ -446,20 +447,20 @@ public class AMQPConnection_0_8Impl
public void closeChannel(AMQChannel channel)
{
- closeChannel(channel, null, null, false);
+ closeChannel(channel, 0, null, false);
}
- public void closeChannelAndWriteFrame(AMQChannel channel, AMQConstant cause, String message)
+ public void closeChannelAndWriteFrame(AMQChannel channel, int cause, String message)
{
writeFrame(new AMQFrame(channel.getChannelId(),
- getMethodRegistry().createChannelCloseBody(cause.getCode(),
+ getMethodRegistry().createChannelCloseBody(cause,
AMQShortString.validValueOf(message),
_currentClassId,
_currentMethodId)));
closeChannel(channel, cause, message, true);
}
- public void closeChannel(int channelId, AMQConstant cause, String message)
+ public void closeChannel(int channelId, int cause, String message)
{
final AMQChannel channel = getChannel(channelId);
if (channel == null)
@@ -469,7 +470,7 @@ public class AMQPConnection_0_8Impl
closeChannel(channel, cause, message, true);
}
- void closeChannel(AMQChannel channel, AMQConstant cause, String message, boolean mark)
+ void closeChannel(AMQChannel channel, int cause, String message, boolean mark)
{
int channelId = channel.getChannelId();
try
@@ -544,10 +545,10 @@ public class AMQPConnection_0_8Impl
}
}
- public void sendConnectionClose(AMQConstant errorCode,
+ public void sendConnectionClose(int errorCode,
String message, int channelId)
{
- sendConnectionClose(channelId, new AMQFrame(0, new ConnectionCloseBody(getProtocolVersion(), errorCode.getCode(), AMQShortString.validValueOf(message), _currentClassId, _currentMethodId)));
+ sendConnectionClose(channelId, new AMQFrame(0, new ConnectionCloseBody(getProtocolVersion(), errorCode, AMQShortString.validValueOf(message), _currentClassId, _currentMethodId)));
}
private void sendConnectionClose(int channelId, AMQFrame frame)
@@ -774,8 +775,20 @@ public class AMQPConnection_0_8Impl
return String.valueOf(getNetwork().getRemoteAddress());
}
- public void closeSessionAsync(final AMQSessionModel<?> session, final AMQConstant cause, final String message)
+ public void closeSessionAsync(final AMQSessionModel<?> session, final CloseReason reason, final String message)
{
+ final int cause;
+ switch (reason)
+ {
+ case MANAGEMENT:
+ cause = ErrorCodes.CONNECTION_FORCED;
+ break;
+ case TRANSACTION_TIMEOUT:
+ cause = ErrorCodes.RESOURCE_ERROR;
+ break;
+ default:
+ cause = ErrorCodes.INTERNAL_ERROR;
+ }
addAsyncTask(new Action<AMQPConnection_0_8Impl>()
{
@@ -788,7 +801,7 @@ public class AMQPConnection_0_8Impl
MethodRegistry methodRegistry = getMethodRegistry();
ChannelCloseBody responseBody =
methodRegistry.createChannelCloseBody(
- cause.getCode(),
+ cause,
AMQShortString.validValueOf(message),
0, 0);
@@ -799,20 +812,20 @@ public class AMQPConnection_0_8Impl
}
@Override
- public void sendConnectionCloseAsync(final ConnectionCloseReason reason, final String description)
+ public void sendConnectionCloseAsync(final CloseReason reason, final String description)
{
stopConnection();
- final AMQConstant cause;
+ final int cause;
switch(reason)
{
case MANAGEMENT:
- cause = AMQConstant.CONNECTION_FORCED;
+ cause = ErrorCodes.CONNECTION_FORCED;
break;
case TRANSACTION_TIMEOUT:
- cause = AMQConstant.RESOURCE_ERROR;
+ cause = ErrorCodes.RESOURCE_ERROR;
break;
default:
- cause = AMQConstant.INTERNAL_ERROR;
+ cause = ErrorCodes.INTERNAL_ERROR;
}
Action<AMQPConnection_0_8Impl> action = new Action<AMQPConnection_0_8Impl>()
{
@@ -903,19 +916,19 @@ public class AMQPConnection_0_8Impl
final NamedAddressSpace virtualHost = getAddressSpace();
if (virtualHost == null)
{
- sendConnectionClose(AMQConstant.COMMAND_INVALID,
- "Virtualhost has not yet been set. ConnectionOpen has not been called.", channelId);
+ sendConnectionClose(ErrorCodes.COMMAND_INVALID,
+ "Virtualhost has not yet been set. ConnectionOpen has not been called.", channelId);
}
else if(getChannel(channelId) != null || channelAwaitingClosure(channelId))
{
- sendConnectionClose(AMQConstant.CHANNEL_ERROR, "Channel " + channelId + " already exists", channelId);
+ sendConnectionClose(ErrorCodes.CHANNEL_ERROR, "Channel " + channelId + " already exists", channelId);
}
else if(channelId > getMaximumNumberOfChannels())
{
- sendConnectionClose(AMQConstant.CHANNEL_ERROR,
+ sendConnectionClose(ErrorCodes.CHANNEL_ERROR,
"Channel " + channelId + " cannot be created as the max allowed channel id is "
+ getMaximumNumberOfChannels(),
- channelId);
+ channelId);
}
else
{
@@ -940,7 +953,7 @@ public class AMQPConnection_0_8Impl
if(_state != requiredState)
{
String replyText = "Command Invalid, expected " + requiredState + " but was " + _state;
- sendConnectionClose(AMQConstant.COMMAND_INVALID, replyText, 0);
+ sendConnectionClose(ErrorCodes.COMMAND_INVALID, replyText, 0);
throw new ConnectionScopedRuntimeException(replyText);
}
}
@@ -967,7 +980,7 @@ public class AMQPConnection_0_8Impl
if (addressSpace == null)
{
- sendConnectionClose(AMQConstant.NOT_FOUND,
+ sendConnectionClose(ErrorCodes.NOT_FOUND,
"Unknown virtual host: '" + virtualHostName + "'", 0);
}
@@ -983,7 +996,7 @@ public class AMQPConnection_0_8Impl
}
else
{
- sendConnectionClose(AMQConstant.CONNECTION_FORCED,
+ sendConnectionClose(ErrorCodes.CONNECTION_FORCED,
"Virtual host '" + addressSpace.getName() + "' is not active", 0);
}
@@ -1004,12 +1017,12 @@ public class AMQPConnection_0_8Impl
}
else
{
- sendConnectionClose(AMQConstant.ACCESS_REFUSED, "Connection refused", 0);
+ sendConnectionClose(ErrorCodes.ACCESS_REFUSED, "Connection refused", 0);
}
}
catch (AccessControlException | VirtualHostUnavailableException e)
{
- sendConnectionClose(AMQConstant.ACCESS_REFUSED, e.getMessage(), 0);
+ sendConnectionClose(ErrorCodes.ACCESS_REFUSED, e.getMessage(), 0);
}
}
}
@@ -1073,7 +1086,7 @@ public class AMQPConnection_0_8Impl
SaslServer ss = getSaslServer();
if (ss == null)
{
- sendConnectionClose(AMQConstant.INTERNAL_ERROR, "No SASL context set up in connection", 0);
+ sendConnectionClose(ErrorCodes.INTERNAL_ERROR, "No SASL context set up in connection", 0);
}
processSaslResponse(response, subjectCreator, ss);
@@ -1130,7 +1143,7 @@ public class AMQPConnection_0_8Impl
if (ss == null)
{
- sendConnectionClose(AMQConstant.RESOURCE_ERROR, "Unable to create SASL Server:" + mechanism, 0);
+ sendConnectionClose(ErrorCodes.RESOURCE_ERROR, "Unable to create SASL Server:" + mechanism, 0);
}
else
@@ -1146,7 +1159,7 @@ public class AMQPConnection_0_8Impl
catch (SaslException e)
{
disposeSaslServer();
- sendConnectionClose(AMQConstant.INTERNAL_ERROR, "SASL error: " + e, 0);
+ sendConnectionClose(ErrorCodes.INTERNAL_ERROR, "SASL error: " + e, 0);
}
}
@@ -1170,7 +1183,7 @@ public class AMQPConnection_0_8Impl
_logger.debug("Authentication failed: {}", (cause == null ? "" : cause.getMessage()));
- sendConnectionClose(AMQConstant.NOT_ALLOWED, "Authentication failed", 0);
+ sendConnectionClose(ErrorCodes.NOT_ALLOWED, "Authentication failed", 0);
disposeSaslServer();
break;
@@ -1243,17 +1256,17 @@ public class AMQPConnection_0_8Impl
if (frameMax > (long) brokerFrameMax)
{
- sendConnectionClose(AMQConstant.SYNTAX_ERROR,
+ sendConnectionClose(ErrorCodes.SYNTAX_ERROR,
"Attempt to set max frame size to " + frameMax
+ " greater than the broker will allow: "
+ brokerFrameMax, 0);
}
- else if (frameMax > 0 && frameMax < AMQConstant.FRAME_MIN_SIZE.getCode())
+ else if (frameMax > 0 && frameMax < AMQDecoder.FRAME_MIN_SIZE)
{
- sendConnectionClose(AMQConstant.SYNTAX_ERROR,
- "Attempt to set max frame size to " + frameMax
- + " which is smaller than the specification defined minimum: "
- + AMQConstant.FRAME_MIN_SIZE.getCode(), 0);
+ sendConnectionClose(ErrorCodes.SYNTAX_ERROR,
+ "Attempt to set max frame size to " + frameMax
+ + " which is smaller than the specification defined minimum: "
+ + AMQDecoder.FRAME_MIN_SIZE, 0);
}
else
{
@@ -1337,9 +1350,9 @@ public class AMQPConnection_0_8Impl
{
if(method.getName().startsWith("receive"))
{
- sendConnectionClose(AMQConstant.CHANNEL_ERROR,
+ sendConnectionClose(ErrorCodes.CHANNEL_ERROR,
"Unknown channel id: " + channelId,
- channelId);
+ channelId);
return null;
}
else if(method.getName().equals("ignoreAllButCloseOk"))
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java?rev=1770514&r1=1770513&r2=1770514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java Sat Nov 19 21:19:11 2016
@@ -41,7 +41,7 @@ import org.apache.qpid.framing.AMQShortS
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.ErrorCodes;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.message.InstanceProperties;
@@ -139,7 +139,7 @@ public class AMQChannelTest extends Qpid
channel.receiveExchangeDelete(AMQShortString.valueOf(testExchangeName), true, false);
verify(_amqConnection).closeChannelAndWriteFrame(eq(channel),
- eq(AMQConstant.IN_USE),
+ eq(ErrorCodes.IN_USE),
eq("Exchange has bindings"));
}
@@ -168,7 +168,7 @@ public class AMQChannelTest extends Qpid
channel.receiveMessageHeader(properties, maximumMessageSize + 1);
verify(_amqConnection).closeChannelAndWriteFrame(eq(channel),
- eq(AMQConstant.MESSAGE_TOO_LARGE),
+ eq(ErrorCodes.MESSAGE_TOO_LARGE),
eq("Message size of 1025 greater than allowed maximum of 1024"));
}
@@ -197,7 +197,7 @@ public class AMQChannelTest extends Qpid
channel.receiveBasicPublish(AMQShortString.EMPTY_STRING, AMQShortString.EMPTY_STRING, false, false);
channel.receiveMessageHeader(properties, 0);
- verify(_amqConnection).sendConnectionClose(eq(AMQConstant.ACCESS_REFUSED), anyString(), eq(channelId));
+ verify(_amqConnection).sendConnectionClose(eq(ErrorCodes.ACCESS_REFUSED), anyString(), eq(channelId));
verifyZeroInteractions(_messageDestination);
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1770514&r1=1770513&r2=1770514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java Sat Nov 19 21:19:11 2016
@@ -52,7 +52,6 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.configuration.CommonProperties;
-import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.AuthenticationProvider;
import org.apache.qpid.server.model.Broker;
@@ -407,18 +406,6 @@ public class AMQPConnection_1_0 extends
return _describedTypeRegistry;
}
- private void closeSessionAsync(final Session_1_0 session, final AMQConstant cause, final String message)
- {
- addAsyncTask(new Action<ConnectionHandler>()
- {
- @Override
- public void performAction(final ConnectionHandler object)
- {
- session.close(cause, message);
- }
- });
- }
-
private boolean closedForOutput()
{
@@ -1388,7 +1375,7 @@ public class AMQPConnection_1_0 extends
}
@Override
- public void sendConnectionCloseAsync(final ConnectionCloseReason reason, final String description)
+ public void sendConnectionCloseAsync(final CloseReason reason, final String description)
{
stopConnection();
@@ -1417,9 +1404,29 @@ public class AMQPConnection_1_0 extends
}
public void closeSessionAsync(final AMQSessionModel<?> session,
- final AMQConstant cause, final String message)
+ final CloseReason reason, final String message)
{
- closeSessionAsync((Session_1_0) session, cause, message);
+ final ErrorCondition cause;
+ switch(reason)
+ {
+ case MANAGEMENT:
+ cause = ConnectionError.CONNECTION_FORCED;
+ break;
+ case TRANSACTION_TIMEOUT:
+ cause = AmqpError.RESOURCE_LIMIT_EXCEEDED;
+ break;
+ default:
+ cause = AmqpError.INTERNAL_ERROR;
+ }
+ addAsyncTask(new Action<ConnectionHandler>()
+ {
+ @Override
+ public void performAction(final ConnectionHandler object)
+ {
+ ((Session_1_0)session).close(cause, message);
+ }
+ });
+
}
public void block()
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1770514&r1=1770513&r2=1770514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Sat Nov 19 21:19:11 2016
@@ -46,7 +46,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
@@ -73,6 +72,7 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
+import org.apache.qpid.server.protocol.v1_0.type.ErrorCondition;
import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
import org.apache.qpid.server.protocol.v1_0.type.LifetimePolicy;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
@@ -1253,14 +1253,13 @@ public class Session_1_0 implements AMQS
}
- @Override
- public void close(AMQConstant cause, String message)
+ public void close(ErrorCondition condition, String message)
{
performCloseTasks();
final End end = new End();
final Error theError = new Error();
theError.setDescription(message);
- theError.setCondition(ConnectionError.CONNECTION_FORCED);
+ theError.setCondition(condition);
end.setError(theError);
end(end);
}
@@ -1445,31 +1444,24 @@ public class Session_1_0 implements AMQS
}
@Override
- public Long getTxnCount()
+ public long getTxnStart()
{
// TODO
return 0l;
}
@Override
- public Long getTxnStart()
+ public long getTxnCommits()
{
// TODO
return 0l;
}
@Override
- public Long getTxnCommits()
+ public long getTxnRejects()
{
// TODO
- return 0l;
- }
-
- @Override
- public Long getTxnRejects()
- {
- // TODO
- return 0l;
+ return 0L;
}
@Override
@@ -1640,7 +1632,7 @@ public class Session_1_0 implements AMQS
@Override
public void doTimeoutAction(final String reason)
{
- getAMQPConnection().closeSessionAsync(this, AMQConstant.RESOURCE_ERROR, reason);
+ getAMQPConnection().closeSessionAsync(this, AMQPConnection.CloseReason.TRANSACTION_TIMEOUT, reason);
}
private void consumerAdded(Consumer<?> consumer)
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java?rev=1770514&r1=1770513&r2=1770514&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java Sat Nov 19 21:19:11 2016
@@ -21,17 +21,19 @@
package org.apache.qpid.client;
import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.ErrorCodes;
/**
* AMQAuthenticationException represents all failures to authenticate access to a broker.
- * <p>
- * TODO Will this alwyas have the same status code, NOT_ALLOWED 530? Might set this up to always use that code.
*/
public class AMQAuthenticationException extends AMQException
{
- public AMQAuthenticationException(AMQConstant error, String msg, Throwable cause)
+ public AMQAuthenticationException(int code, String msg, Throwable cause)
{
- super(error, msg, cause);
+ super(code, msg, cause);
+ }
+ public AMQAuthenticationException(String msg, Throwable cause)
+ {
+ super(ErrorCodes.NOT_ALLOWED, msg, cause);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org