You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/12/10 22:09:29 UTC
svn commit: r1773542 - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/txn/
broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/
broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server...
Author: rgodfrey
Date: Sat Dec 10 22:09:28 2016
New Revision: 1773542
URL: http://svn.apache.org/viewvc?rev=1773542&view=rev
Log:
QPID-7585 : Improve handling of transaction failure
Added:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/BaseSource.java
- copied, changed from r1773541, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/Source.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/BaseTarget.java
- copied, changed from r1773541, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/Target.java
Removed:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/Source.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/Target.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/Outcome.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Accepted.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Modified.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Rejected.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Released.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Source.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Target.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/AcceptedConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/ModifiedConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/RejectedConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/ReleasedConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/Coordinator.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/Declared.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/codec/DeclareConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/codec/DeclaredConstructor.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Attach.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/codec/AttachConstructor.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=1773542&r1=1773541&r2=1773542&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java Sat Dec 10 22:09:28 2016
@@ -56,6 +56,7 @@ public class LocalTransaction implements
private volatile long _txnStartTime = 0L;
private volatile long _txnUpdateTime = 0l;
private ListenableFuture<Runnable> _asyncTran;
+ private volatile boolean _isRollbackOnly;
public LocalTransaction(MessageStore transactionLog)
{
@@ -335,12 +336,17 @@ public class LocalTransaction implements
public void commit()
{
- sync();
commit(null);
}
public void commit(Runnable immediateAction)
{
+ if(_isRollbackOnly)
+ {
+ throw new IllegalStateException("Transaction has been marked as rollback only");
+ }
+
+
sync();
try
{
@@ -372,6 +378,10 @@ public class LocalTransaction implements
public void commitAsync(final Runnable deferred)
{
+ if(_isRollbackOnly)
+ {
+ throw new IllegalStateException("Transaction has been marked as rollback only");
+ }
sync();
if(_transaction != null)
{
@@ -508,6 +518,7 @@ public class LocalTransaction implements
_postTransactionActions.clear();
_txnStartTime = 0L;
_txnUpdateTime = 0;
+ _isRollbackOnly = false;
}
public boolean isTransactional()
@@ -520,4 +531,13 @@ public class LocalTransaction implements
long getActivityTime();
}
+ public void setRollbackOnly()
+ {
+ _isRollbackOnly = true;
+ }
+
+ public boolean isRollbackOnly()
+ {
+ return _isRollbackOnly;
+ }
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1773542&r1=1773541&r2=1773542&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Sat Dec 10 22:09:28 2016
@@ -41,7 +41,7 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
-import org.apache.qpid.server.protocol.v1_0.type.Target;
+import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
@@ -507,7 +507,7 @@ class ConsumerTarget_1_0 extends Abstrac
@Override
public String getTargetAddress()
{
- Target target = _link.getEndpoint().getTarget();
+ BaseTarget target = _link.getEndpoint().getTarget();
return target instanceof org.apache.qpid.server.protocol.v1_0.type.messaging.Target ? ((org.apache.qpid.server.protocol.v1_0.type.messaging.Target) target).getAddress() : _link.getEndpoint().getName();
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java?rev=1773542&r1=1773541&r2=1773542&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java Sat Dec 10 22:09:28 2016
@@ -27,9 +27,9 @@ import java.util.Map;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
-import org.apache.qpid.server.protocol.v1_0.type.Source;
+import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
-import org.apache.qpid.server.protocol.v1_0.type.Target;
+import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
@@ -70,8 +70,8 @@ public abstract class LinkEndpoint<T ext
private volatile State _state = State.DETACHED;
- private Source _source;
- private Target _target;
+ private BaseSource _source;
+ private BaseTarget _target;
private UnsignedInteger _deliveryCount;
private UnsignedInteger _linkCredit;
private UnsignedInteger _available;
@@ -116,22 +116,22 @@ public abstract class LinkEndpoint<T ext
public abstract Role getRole();
- public Source getSource()
+ public BaseSource getSource()
{
return _source;
}
- public void setSource(final Source source)
+ public void setSource(final BaseSource source)
{
_source = source;
}
- public Target getTarget()
+ public BaseTarget getTarget()
{
return _target;
}
- public void setTarget(final Target target)
+ public void setTarget(final BaseTarget target)
{
_target = target;
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java?rev=1773542&r1=1773541&r2=1773542&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java Sat Dec 10 22:09:28 2016
@@ -20,8 +20,8 @@
*/
package org.apache.qpid.server.protocol.v1_0;
-import org.apache.qpid.server.protocol.v1_0.type.Source;
-import org.apache.qpid.server.protocol.v1_0.type.Target;
+import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
+import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
public class ReceivingLinkAttachment
{
@@ -44,12 +44,12 @@ public class ReceivingLinkAttachment
return _endpoint;
}
- public Source getSource()
+ public BaseSource getSource()
{
return getEndpoint().getSource();
}
- public Target getTarget()
+ public BaseTarget getTarget()
{
return getEndpoint().getTarget();
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java?rev=1773542&r1=1773541&r2=1773542&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java Sat Dec 10 22:09:28 2016
@@ -37,6 +37,7 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
@@ -113,43 +114,49 @@ public class ReceivingLinkEndpoint exten
return Role.RECEIVER;
}
- void receiveTransfer(final Transfer transfer, final Delivery delivery)
+ Error receiveTransfer(final Transfer transfer, final Delivery delivery)
{
- TransientState transientState;
- final Binary deliveryTag = delivery.getDeliveryTag();
- boolean existingState = _unsettledMap.containsKey(deliveryTag);
- if (!existingState || transfer.getState() != null)
+ if(isAttached())
{
- _unsettledMap.put(deliveryTag, transfer.getState());
- }
- if (!existingState)
- {
- transientState = new TransientState(transfer.getDeliveryId());
- if (delivery.isSettled())
+ TransientState transientState;
+ final Binary deliveryTag = delivery.getDeliveryTag();
+ boolean existingState = _unsettledMap.containsKey(deliveryTag);
+ if (!existingState || transfer.getState() != null)
+ {
+ _unsettledMap.put(deliveryTag, transfer.getState());
+ }
+ if (!existingState)
+ {
+ transientState = new TransientState(transfer.getDeliveryId());
+ if (delivery.isSettled())
+ {
+ transientState.setSettled(true);
+ }
+ _unsettledIds.put(deliveryTag, transientState);
+ setLinkCredit(getLinkCredit().subtract(UnsignedInteger.ONE));
+ setDeliveryCount(getDeliveryCount().add(UnsignedInteger.ONE));
+ }
+ else
{
- transientState.setSettled(true);
+ transientState = _unsettledIds.get(deliveryTag);
+ transientState.incrementCredit();
+ if (delivery.isSettled())
+ {
+ transientState.setSettled(true);
+ }
}
- _unsettledIds.put(deliveryTag, transientState);
- setLinkCredit(getLinkCredit().subtract(UnsignedInteger.ONE));
- setDeliveryCount(getDeliveryCount().add(UnsignedInteger.ONE));
- }
- else
- {
- transientState = _unsettledIds.get(deliveryTag);
- transientState.incrementCredit();
- if (delivery.isSettled())
+ if (transientState.isSettled() && delivery.isComplete())
{
- transientState.setSettled(true);
+ _unsettledMap.remove(deliveryTag);
}
+ return getLink().messageTransfer(transfer);
}
-
- if (transientState.isSettled() && delivery.isComplete())
+ else
{
- _unsettledMap.remove(deliveryTag);
+ getSession().updateDisposition(Role.RECEIVER, transfer.getDeliveryId(), transfer.getDeliveryId(),null, true);
+ return null;
}
- getLink().messageTransfer(transfer);
-
}
@Override public void receiveFlow(final Flow flow)
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java?rev=1773542&r1=1773541&r2=1773542&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java Sat Dec 10 22:09:28 2016
@@ -20,9 +20,10 @@
*/
package org.apache.qpid.server.protocol.v1_0;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
public interface ReceivingLink_1_0 extends Link_1_0
{
- void messageTransfer(Transfer xfr);
+ Error messageTransfer(Transfer xfr);
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java?rev=1773542&r1=1773541&r2=1773542&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java Sat Dec 10 22:09:28 2016
@@ -22,7 +22,7 @@ package org.apache.qpid.server.protocol.
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
-import org.apache.qpid.server.protocol.v1_0.type.Source;
+import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
public class SendingLinkAttachment
{
@@ -45,7 +45,7 @@ public class SendingLinkAttachment
return _endpoint;
}
- public Source getSource()
+ public BaseSource getSource()
{
return getEndpoint().getSource();
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1773542&r1=1773541&r2=1773542&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Sat Dec 10 22:09:28 2016
@@ -741,8 +741,11 @@ public class Session_1_0 implements AMQS
}
}
- endpoint.receiveTransfer(transfer, delivery);
-
+ Error error = endpoint.receiveTransfer(transfer, delivery);
+ if(error != null)
+ {
+ endpoint.close(error);
+ }
if ((delivery.isComplete() && delivery.isSettled() || Boolean.TRUE.equals(transfer.getAborted())))
{
_incomingUnsettled.remove(deliveryId);
@@ -1175,9 +1178,9 @@ public class Session_1_0 implements AMQS
ServerTransaction getTransaction(Binary transactionId)
{
- // TODO should treat invalid id differently to null
+
ServerTransaction transaction = _openTransactions.get(binaryToInteger(transactionId));
- if(transaction == null)
+ if(transactionId == null)
{
if(_transaction == null)
{
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java?rev=1773542&r1=1773541&r2=1773542&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java Sat Dec 10 22:09:28 2016
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.
import java.security.AccessControlException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -50,6 +51,7 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v1_0.type.messaging.HeaderSection;
import org.apache.qpid.server.protocol.v1_0.type.messaging.MessageAnnotationsSection;
import org.apache.qpid.server.protocol.v1_0.type.messaging.PropertiesSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
@@ -61,8 +63,9 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.store.MessageHandle;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.server.util.ServerScopedRuntimeException;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
public class StandardReceivingLink_1_0 implements ReceivingLink_1_0
{
@@ -97,7 +100,7 @@ public class StandardReceivingLink_1_0 i
}
- public void messageTransfer(Transfer xfr)
+ public Error messageTransfer(Transfer xfr)
{
List<QpidByteBuffer> fragments = null;
@@ -110,7 +113,7 @@ public class StandardReceivingLink_1_0 i
_incompleteMessage.add(xfr);
_resumedMessage = Boolean.TRUE.equals(xfr.getResume());
_messageDeliveryTag = deliveryTag;
- return;
+ return null;
}
else if(_incompleteMessage != null)
{
@@ -118,7 +121,7 @@ public class StandardReceivingLink_1_0 i
if(Boolean.TRUE.equals(xfr.getMore()))
{
- return;
+ return null;
}
fragments = new ArrayList<>(_incompleteMessage.size());
@@ -153,7 +156,7 @@ public class StandardReceivingLink_1_0 i
}
else
{
- throw new ServerScopedRuntimeException("Unexpected delivery Tag: " + _messageDeliveryTag + "_unsettledMap: " + _unsettledMap);
+ throw new ConnectionScopedRuntimeException("Unexpected delivery Tag: " + _messageDeliveryTag + "_unsettledMap: " + _unsettledMap);
}
}
else
@@ -214,20 +217,37 @@ public class StandardReceivingLink_1_0 i
_destination.authorizePublish(session.getSecurityToken(), message);
Outcome outcome = _destination.send(message, transaction, session.getCapacityCheckAction());
+ Source source = (Source) getEndpoint().getSource();
DeliveryState resultantState;
- if (transactionId == null)
+ if(source.getOutcomes() == null || Arrays.asList(source.getOutcomes()).contains(outcome.getSymbol()))
{
- resultantState = (DeliveryState) outcome;
+ if (transactionId == null)
+ {
+ resultantState = (DeliveryState) outcome;
+ }
+ else
+ {
+ TransactionalState transactionalState = new TransactionalState();
+ transactionalState.setOutcome(outcome);
+ transactionalState.setTxnId(transactionId);
+ resultantState = transactionalState;
+ }
+ }
+ else if(transactionId != null)
+ {
+ // cause the txn to fail
+ if(transaction instanceof LocalTransaction)
+ {
+ ((LocalTransaction) transaction).setRollbackOnly();
+ }
+ resultantState = null;
}
else
{
- TransactionalState transactionalState = new TransactionalState();
- transactionalState.setOutcome(outcome);
- transactionalState.setTxnId(transactionId);
- resultantState = transactionalState;
-
+ // we should just use the default outcome
+ resultantState = null;
}
@@ -274,6 +294,7 @@ public class StandardReceivingLink_1_0 i
reference.release();
}
}
+ return null;
}
private MessageMetaData_1_0 createMessageMetaData(final List<QpidByteBuffer> fragments,
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java?rev=1773542&r1=1773541&r2=1773542&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java Sat Dec 10 22:09:28 2016
@@ -46,6 +46,7 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
+import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -73,7 +74,7 @@ public class TxnCoordinatorReceivingLink
_openTransactions = openTransactions;
}
- public void messageTransfer(Transfer xfr)
+ public Error messageTransfer(Transfer xfr)
{
List<QpidByteBuffer> payload = new ArrayList<>();
@@ -83,14 +84,14 @@ public class TxnCoordinatorReceivingLink
{
_incompleteMessage = new ArrayList<Transfer>();
_incompleteMessage.add(xfr);
- return;
+ return null;
}
else if(_incompleteMessage != null)
{
_incompleteMessage.add(xfr);
if(Boolean.TRUE.equals(xfr.getMore()))
{
- return;
+ return null;
}
int size = 0;
@@ -148,9 +149,10 @@ public class TxnCoordinatorReceivingLink
{
Discharge discharge = (Discharge) command;
- discharge(_session.binaryToInteger(discharge.getTxnId()), Boolean.TRUE.equals(discharge.getFail()));
- _endpoint.updateDisposition(deliveryTag, new Accepted(), true);
-
+ final Error error = discharge(_session.binaryToInteger(discharge.getTxnId()),
+ Boolean.TRUE.equals(discharge.getFail()));
+ _endpoint.updateDisposition(deliveryTag, error == null ? new Accepted() : null, true);
+ return error;
}
else
{
@@ -164,8 +166,7 @@ public class TxnCoordinatorReceivingLink
}
catch (AmqpErrorException e)
{
- //TODO
- _logger.error("AMQP error", e);
+ return e.getError();
}
finally
{
@@ -174,7 +175,7 @@ public class TxnCoordinatorReceivingLink
buf.dispose();
}
}
-
+ return null;
}
public void remoteDetached(LinkEndpoint endpoint, Detach detach)
@@ -199,11 +200,22 @@ public class TxnCoordinatorReceivingLink
txn.rollback();
_session.incrementRolledBackTransactions();
}
- else
+ else if(!(txn instanceof LocalTransaction && ((LocalTransaction)txn).isRollbackOnly()))
{
txn.commit();
_session.incrementCommittedTransactions();
}
+ else
+ {
+ txn.rollback();
+ _session.incrementRolledBackTransactions();
+ error = new Error();
+ error.setCondition(LinkError.DETACH_FORCED);
+ error.setDescription("The transaction was marked as rollback only due to an earlier issue (e.g. a published message was sent settled but could not be enqueued)");
+ _openTransactions.remove(transactionId);
+
+ return error;
+ }
_openTransactions.remove(transactionId);
}
else
Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/BaseSource.java (from r1773541, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/Source.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/BaseSource.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/BaseSource.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/Source.java&r1=1773541&r2=1773542&rev=1773542&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/Source.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/BaseSource.java Sat Dec 10 22:09:28 2016
@@ -21,6 +21,6 @@
package org.apache.qpid.server.protocol.v1_0.type;
-public interface Source
+public interface BaseSource
{
}
Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/BaseTarget.java (from r1773541, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/Target.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/BaseTarget.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/BaseTarget.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/Target.java&r1=1773541&r2=1773542&rev=1773542&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/Target.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/BaseTarget.java Sat Dec 10 22:09:28 2016
@@ -21,6 +21,6 @@
package org.apache.qpid.server.protocol.v1_0.type;
-public interface Target
+public interface BaseTarget
{
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/Outcome.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/Outcome.java?rev=1773542&r1=1773541&r2=1773542&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/Outcome.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/Outcome.java Sat Dec 10 22:09:28 2016
@@ -23,4 +23,5 @@ package org.apache.qpid.server.protocol.
public interface Outcome
{
+ Symbol getSymbol();
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Accepted.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Accepted.java?rev=1773542&r1=1773541&r2=1773542&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Accepted.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Accepted.java Sat Dec 10 22:09:28 2016
@@ -31,8 +31,15 @@ public class Accepted
implements org.apache.qpid.server.protocol.v1_0.type.DeliveryState, Outcome
{
+ public static final Symbol ACCEPTED_SYMBOL = Symbol.valueOf("amqp:accepted:list");
- @Override
+ @Override
+ public Symbol getSymbol()
+ {
+ return ACCEPTED_SYMBOL;
+ }
+
+ @Override
public String toString()
{
StringBuilder builder = new StringBuilder("Accepted{");
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Modified.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Modified.java?rev=1773542&r1=1773541&r2=1773542&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Modified.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Modified.java Sat Dec 10 22:09:28 2016
@@ -35,7 +35,8 @@ public class Modified
{
- private Boolean _deliveryFailed;
+ public static final Symbol MODIFIED_SYMBOL = Symbol.valueOf("amqp:modified:list");
+ private Boolean _deliveryFailed;
private Boolean _undeliverableHere;
@@ -71,7 +72,13 @@ public class Modified
_messageAnnotations = messageAnnotations;
}
- @Override
+ @Override
+ public Symbol getSymbol()
+ {
+ return MODIFIED_SYMBOL;
+ }
+
+ @Override
public String toString()
{
StringBuilder builder = new StringBuilder("Modified{");
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Rejected.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Rejected.java?rev=1773542&r1=1773541&r2=1773542&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Rejected.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Rejected.java Sat Dec 10 22:09:28 2016
@@ -35,7 +35,8 @@ public class Rejected
{
- private Error _error;
+ public static final Symbol REJECTED_SYMBOL = Symbol.valueOf("amqp:rejected:list");
+ private Error _error;
public Error getError()
{
@@ -47,7 +48,13 @@ public class Rejected
_error = error;
}
- @Override
+ @Override
+ public Symbol getSymbol()
+ {
+ return REJECTED_SYMBOL;
+ }
+
+ @Override
public String toString()
{
StringBuilder builder = new StringBuilder("Rejected{");
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Released.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Released.java?rev=1773542&r1=1773541&r2=1773542&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Released.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Released.java Sat Dec 10 22:09:28 2016
@@ -32,7 +32,15 @@ public class Released
{
- @Override
+ public static final Symbol RELEASED_SYMBOL = Symbol.valueOf("amqp:released:list");
+
+ @Override
+ public Symbol getSymbol()
+ {
+ return RELEASED_SYMBOL;
+ }
+
+ @Override
public String toString()
{
StringBuilder builder = new StringBuilder("Released{");
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Source.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Source.java?rev=1773542&r1=1773541&r2=1773542&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Source.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Source.java Sat Dec 10 22:09:28 2016
@@ -31,8 +31,7 @@ import java.util.Map;
import org.apache.qpid.server.protocol.v1_0.type.*;
-public class Source
- implements org.apache.qpid.server.protocol.v1_0.type.Source
+public class Source implements BaseSource
{
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Target.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Target.java?rev=1773542&r1=1773541&r2=1773542&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Target.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Target.java Sat Dec 10 22:09:28 2016
@@ -32,7 +32,7 @@ import java.util.Map;
import org.apache.qpid.server.protocol.v1_0.type.*;
public class Target
- implements org.apache.qpid.server.protocol.v1_0.type.Target
+ implements BaseTarget
{
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/AcceptedConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/AcceptedConstructor.java?rev=1773542&r1=1773541&r2=1773542&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/AcceptedConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/AcceptedConstructor.java Sat Dec 10 22:09:28 2016
@@ -33,10 +33,9 @@ import java.util.List;
public class AcceptedConstructor extends AbstractDescribedTypeConstructor<Accepted>
{
- public static final Symbol SYMBOL_CONSTRUCTOR = Symbol.valueOf("amqp:accepted:list");
private static final Object[] DESCRIPTORS =
{
- SYMBOL_CONSTRUCTOR,UnsignedLong.valueOf(0x0000000000000024L),
+ Accepted.ACCEPTED_SYMBOL,UnsignedLong.valueOf(0x0000000000000024L),
};
private static final AcceptedConstructor INSTANCE = new AcceptedConstructor();
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/ModifiedConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/ModifiedConstructor.java?rev=1773542&r1=1773541&r2=1773542&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/ModifiedConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/ModifiedConstructor.java Sat Dec 10 22:09:28 2016
@@ -23,20 +23,19 @@
package org.apache.qpid.server.protocol.v1_0.type.messaging.codec;
-import org.apache.qpid.server.protocol.v1_0.codec.AbstractDescribedTypeConstructor;
-import org.apache.qpid.server.protocol.v1_0.codec.DescribedTypeConstructorRegistry;
-import org.apache.qpid.server.protocol.v1_0.type.*;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.*;
-
-
import java.util.List;
import java.util.Map;
+import org.apache.qpid.server.protocol.v1_0.codec.AbstractDescribedTypeConstructor;
+import org.apache.qpid.server.protocol.v1_0.codec.DescribedTypeConstructorRegistry;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Modified;
+
public class ModifiedConstructor extends AbstractDescribedTypeConstructor<Modified>
{
private static final Object[] DESCRIPTORS =
{
- Symbol.valueOf("amqp:modified:list"),UnsignedLong.valueOf(0x0000000000000027L),
+ Modified.MODIFIED_SYMBOL,UnsignedLong.valueOf(0x0000000000000027L),
};
private static final ModifiedConstructor INSTANCE = new ModifiedConstructor();
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/RejectedConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/RejectedConstructor.java?rev=1773542&r1=1773541&r2=1773542&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/RejectedConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/RejectedConstructor.java Sat Dec 10 22:09:28 2016
@@ -23,20 +23,18 @@
package org.apache.qpid.server.protocol.v1_0.type.messaging.codec;
+import java.util.List;
+
import org.apache.qpid.server.protocol.v1_0.codec.AbstractDescribedTypeConstructor;
import org.apache.qpid.server.protocol.v1_0.codec.DescribedTypeConstructorRegistry;
-import org.apache.qpid.server.protocol.v1_0.type.*;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.*;
-
-
-import java.util.List;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
public class RejectedConstructor extends AbstractDescribedTypeConstructor<Rejected>
{
- public static final Symbol SYMBOL_CONSTRUCTOR = Symbol.valueOf("amqp:rejected:list");
private static final Object[] DESCRIPTORS =
{
- SYMBOL_CONSTRUCTOR,UnsignedLong.valueOf(0x0000000000000025L),
+ Rejected.REJECTED_SYMBOL,UnsignedLong.valueOf(0x0000000000000025L),
};
private static final RejectedConstructor INSTANCE = new RejectedConstructor();
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/ReleasedConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/ReleasedConstructor.java?rev=1773542&r1=1773541&r2=1773542&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/ReleasedConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/codec/ReleasedConstructor.java Sat Dec 10 22:09:28 2016
@@ -23,19 +23,18 @@
package org.apache.qpid.server.protocol.v1_0.type.messaging.codec;
+import java.util.List;
+
import org.apache.qpid.server.protocol.v1_0.codec.AbstractDescribedTypeConstructor;
import org.apache.qpid.server.protocol.v1_0.codec.DescribedTypeConstructorRegistry;
-import org.apache.qpid.server.protocol.v1_0.type.*;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.*;
-
-
-import java.util.List;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Released;
public class ReleasedConstructor extends AbstractDescribedTypeConstructor<Released>
{
private static final Object[] DESCRIPTORS =
{
- Symbol.valueOf("amqp:released:list"),UnsignedLong.valueOf(0x0000000000000026L),
+ Released.RELEASED_SYMBOL,UnsignedLong.valueOf(0x0000000000000026L),
};
private static final ReleasedConstructor INSTANCE = new ReleasedConstructor();
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/Coordinator.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/Coordinator.java?rev=1773542&r1=1773541&r2=1773542&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/Coordinator.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/Coordinator.java Sat Dec 10 22:09:28 2016
@@ -31,7 +31,7 @@ import java.util.Arrays;
import org.apache.qpid.server.protocol.v1_0.type.*;
public class Coordinator
- implements Target
+ implements BaseTarget
{
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/Declared.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/Declared.java?rev=1773542&r1=1773541&r2=1773542&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/Declared.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/Declared.java Sat Dec 10 22:09:28 2016
@@ -25,14 +25,18 @@ package org.apache.qpid.server.protocol.
-import org.apache.qpid.server.protocol.v1_0.type.*;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
+import org.apache.qpid.server.protocol.v1_0.type.Outcome;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
public class Declared
implements DeliveryState, Outcome
{
- private Binary _txnId;
+ public static final Symbol DECLARED_SYMBOL = Symbol.valueOf("amqp:declared:list");
+ private Binary _txnId;
public Binary getTxnId()
{
@@ -64,4 +68,9 @@ public class Declared
}
+ @Override
+ public Symbol getSymbol()
+ {
+ return DECLARED_SYMBOL;
+ }
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/codec/DeclareConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/codec/DeclareConstructor.java?rev=1773542&r1=1773541&r2=1773542&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/codec/DeclareConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/codec/DeclareConstructor.java Sat Dec 10 22:09:28 2016
@@ -23,13 +23,14 @@
package org.apache.qpid.server.protocol.v1_0.type.transaction.codec;
+import java.util.List;
+
import org.apache.qpid.server.protocol.v1_0.codec.AbstractDescribedTypeConstructor;
import org.apache.qpid.server.protocol.v1_0.codec.DescribedTypeConstructorRegistry;
-import org.apache.qpid.server.protocol.v1_0.type.*;
-import org.apache.qpid.server.protocol.v1_0.type.transaction.*;
-
-
-import java.util.List;
+import org.apache.qpid.server.protocol.v1_0.type.GlobalTxId;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.Declare;
public class DeclareConstructor extends AbstractDescribedTypeConstructor<Declare>
{
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/codec/DeclaredConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/codec/DeclaredConstructor.java?rev=1773542&r1=1773541&r2=1773542&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/codec/DeclaredConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/codec/DeclaredConstructor.java Sat Dec 10 22:09:28 2016
@@ -23,19 +23,19 @@
package org.apache.qpid.server.protocol.v1_0.type.transaction.codec;
+import java.util.List;
+
import org.apache.qpid.server.protocol.v1_0.codec.AbstractDescribedTypeConstructor;
import org.apache.qpid.server.protocol.v1_0.codec.DescribedTypeConstructorRegistry;
-import org.apache.qpid.server.protocol.v1_0.type.*;
-import org.apache.qpid.server.protocol.v1_0.type.transaction.*;
-
-
-import java.util.List;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.Declared;
public class DeclaredConstructor extends AbstractDescribedTypeConstructor<Declared>
{
private static final Object[] DESCRIPTORS =
{
- Symbol.valueOf("amqp:declared:list"),UnsignedLong.valueOf(0x0000000000000033L),
+ Declared.DECLARED_SYMBOL,UnsignedLong.valueOf(0x0000000000000033L),
};
private static final DeclaredConstructor INSTANCE = new DeclaredConstructor();
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Attach.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Attach.java?rev=1773542&r1=1773541&r2=1773542&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Attach.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Attach.java Sat Dec 10 22:09:28 2016
@@ -51,9 +51,9 @@ public class Attach
private ReceiverSettleMode _rcvSettleMode;
- private Source _source;
+ private BaseSource _source;
- private Target _target;
+ private BaseTarget _target;
private Map _unsettled;
@@ -119,22 +119,22 @@ public class Attach
_rcvSettleMode = rcvSettleMode;
}
- public Source getSource()
+ public BaseSource getSource()
{
return _source;
}
- public void setSource(Source source)
+ public void setSource(BaseSource source)
{
_source = source;
}
- public Target getTarget()
+ public BaseTarget getTarget()
{
return _target;
}
- public void setTarget(Target target)
+ public void setTarget(BaseTarget target)
{
_target = target;
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/codec/AttachConstructor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/codec/AttachConstructor.java?rev=1773542&r1=1773541&r2=1773542&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/codec/AttachConstructor.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/codec/AttachConstructor.java Sat Dec 10 22:09:28 2016
@@ -203,7 +203,7 @@ public class AttachConstructor extends A
try
{
- obj.setSource( (Source) val );
+ obj.setSource( (BaseSource) val);
}
catch(ClassCastException e)
{
@@ -230,7 +230,7 @@ public class AttachConstructor extends A
try
{
- obj.setTarget( (Target) val );
+ obj.setTarget( (BaseTarget) val);
}
catch(ClassCastException e)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org