You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/02/16 16:11:27 UTC
svn commit: r1783244 [2/2] -
/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/
Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java (from r1783242, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java&r1=1783242&r2=1783244&rev=1783244&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java Thu Feb 16 16:11:27 2017
@@ -1,5 +1,4 @@
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,8 +15,8 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
*/
+
package org.apache.qpid.server.protocol.v1_0;
import java.security.AccessControlException;
@@ -35,7 +34,6 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.plugin.MessageFormat;
import org.apache.qpid.server.protocol.MessageFormatRegistry;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
@@ -59,6 +57,7 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
@@ -70,40 +69,37 @@ import org.apache.qpid.server.txn.LocalT
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
-public class StandardReceivingLink_1_0 implements ReceivingLink_1_0
+public class StandardReceivingLinkEndpoint extends ReceivingLinkEndpoint
{
- private static final Logger LOGGER = LoggerFactory.getLogger(StandardReceivingLink_1_0.class);
- private NamedAddressSpace _addressSpace;
-
- private ReceivingDestination _destination;
- private SectionDecoderImpl _sectionDecoder;
- private volatile ReceivingLinkAttachment _attachment;
-
+ private static final Logger LOGGER = LoggerFactory.getLogger(StandardReceivingLinkEndpoint.class);
+ private final SectionDecoderImpl _sectionDecoder;
private ArrayList<Transfer> _incompleteMessage;
- private TerminusDurability _durability;
-
- private Map<Binary, Outcome> _unsettledMap = Collections.synchronizedMap(new HashMap<Binary, Outcome>());
private boolean _resumedMessage;
private Binary _messageDeliveryTag;
- private ReceiverSettleMode _receivingSettlementMode;
-
+ private Map<Binary, Outcome> _unsettledMap = Collections.synchronizedMap(new HashMap<Binary, Outcome>());
- public StandardReceivingLink_1_0(ReceivingLinkAttachment receivingLinkAttachment, NamedAddressSpace addressSpace,
- ReceivingDestination destination)
+ public StandardReceivingLinkEndpoint(final Session_1_0 session,
+ final Attach attach)
{
- _addressSpace = addressSpace;
- _destination = destination;
- _attachment = receivingLinkAttachment;
- _receivingSettlementMode = receivingLinkAttachment.getEndpoint().getReceivingSettlementMode();
- _durability = ((Target)receivingLinkAttachment.getTarget()).getDurable();
-
- _sectionDecoder = new SectionDecoderImpl(receivingLinkAttachment.getEndpoint().getSession().getConnection().getSectionDecoderRegistry());
+ super(session, attach);
+ _sectionDecoder = new SectionDecoderImpl(getSession().getConnection().getSectionDecoderRegistry());
+ }
+ @Override
+ public void start()
+ {
+ setLinkCredit(UnsignedInteger.valueOf(getReceivingDestination().getCredit()));
+ setCreditWindow();
+ }
+ private TerminusDurability getDurability()
+ {
+ return ((Target) getTarget()).getDurable();
}
- public Error messageTransfer(Transfer xfr)
+ @Override
+ protected Error messageTransfer(Transfer xfr)
{
List<QpidByteBuffer> fragments = null;
@@ -121,7 +117,6 @@ public class StandardReceivingLink_1_0 i
else if(_incompleteMessage != null)
{
_incompleteMessage.add(xfr);
-
if(Boolean.TRUE.equals(xfr.getMore()))
{
return null;
@@ -157,7 +152,7 @@ public class StandardReceivingLink_1_0 i
{
Outcome outcome = _unsettledMap.get(_messageDeliveryTag);
boolean settled = ReceiverSettleMode.FIRST.equals(getReceivingSettlementMode());
- getEndpoint().updateDisposition(_messageDeliveryTag, (DeliveryState) outcome, settled);
+ updateDisposition(_messageDeliveryTag, (DeliveryState) outcome, settled);
if(settled)
{
_unsettledMap.remove(_messageDeliveryTag);
@@ -178,7 +173,7 @@ public class StandardReceivingLink_1_0 i
List<EncodingRetainingSection<?>> dataSections = new ArrayList<>();
MessageMetaData_1_0 mmd = createMessageMetaData(fragments, dataSections);
- MessageHandle<MessageMetaData_1_0> handle = _addressSpace.getMessageStore().addMessage(mmd);
+ MessageHandle<MessageMetaData_1_0> handle = getAddressSpace().getMessageStore().addMessage(mmd);
for (EncodingRetainingSection<?> dataSection : dataSections)
{
@@ -190,7 +185,8 @@ public class StandardReceivingLink_1_0 i
}
final StoredMessage<MessageMetaData_1_0> storedMessage = handle.allContentAdded();
Message_1_0 message = new Message_1_0(storedMessage, getSession().getConnection().getReference());
- routingAddress = _destination.getRoutingAddress(message);
+
+ routingAddress = getReceivingDestination().getRoutingAddress(message);
serverMessage = message;
}
else
@@ -198,8 +194,8 @@ public class StandardReceivingLink_1_0 i
MessageFormat format = MessageFormatRegistry.getFormat(messageFormat.intValue());
if(format != null)
{
- serverMessage = format.createMessage(fragments, _addressSpace.getMessageStore(), getSession().getConnection().getReference());
- routingAddress = format.getRoutingAddress(serverMessage, _destination.getAddress());
+ serverMessage = format.createMessage(fragments, getAddressSpace().getMessageStore(), getSession().getConnection().getReference());
+ routingAddress = format.getRoutingAddress(serverMessage, getReceivingDestination().getAddress());
}
else
{
@@ -235,7 +231,7 @@ public class StandardReceivingLink_1_0 i
}
else
{
- transaction = new AutoCommitTransaction(_addressSpace.getMessageStore());
+ transaction = new AutoCommitTransaction(getAddressSpace().getMessageStore());
}
try
@@ -243,11 +239,11 @@ public class StandardReceivingLink_1_0 i
Session_1_0 session = getSession();
session.getAMQPConnection()
- .checkAuthorizedMessagePrincipal(serverMessage.getMessageHeader().getUserId());
- _destination.authorizePublish(session.getSecurityToken(), routingAddress);
+ .checkAuthorizedMessagePrincipal(serverMessage.getMessageHeader().getUserId());
+ getReceivingDestination().authorizePublish(session.getSecurityToken(), routingAddress);
- Outcome outcome = _destination.send(serverMessage, routingAddress, transaction, session.getCapacityCheckAction());
- Source source = (Source) getEndpoint().getSource();
+ Outcome outcome = getReceivingDestination().send(serverMessage, routingAddress, transaction, session.getCapacityCheckAction());
+ Source source = (Source) getSource();
DeliveryState resultantState;
@@ -288,10 +284,10 @@ public class StandardReceivingLink_1_0 i
_unsettledMap.put(deliveryTag, outcome);
}
- getEndpoint().updateDisposition(deliveryTag, resultantState, settled);
+ updateDisposition(deliveryTag, resultantState, settled);
getSession().getAMQPConnection()
- .registerMessageReceived(serverMessage.getSize(), serverMessage.getArrivalTime());
+ .registerMessageReceived(serverMessage.getSize(), serverMessage.getArrivalTime());
if (!(transaction instanceof AutoCommitTransaction))
{
@@ -300,12 +296,12 @@ public class StandardReceivingLink_1_0 i
{
public void postCommit()
{
- getEndpoint().updateDisposition(deliveryTag, null, true);
+ updateDisposition(deliveryTag, null, true);
}
public void onRollback()
{
- getEndpoint().updateDisposition(deliveryTag, null, true);
+ updateDisposition(deliveryTag, null, true);
}
});
}
@@ -315,7 +311,7 @@ public class StandardReceivingLink_1_0 i
final Error err = new Error();
err.setCondition(AmqpError.NOT_ALLOWED);
err.setDescription(e.getMessage());
- _attachment.getEndpoint().close(err);
+ close(err);
}
}
@@ -327,6 +323,36 @@ public class StandardReceivingLink_1_0 i
return null;
}
+ @Override
+ protected void remoteDetachedPerformDetach(Detach detach)
+ {
+ if(!TerminusDurability.UNSETTLED_STATE.equals(getDurability()) ||
+ (detach != null && Boolean.TRUE.equals(detach.getClosed())))
+ {
+ close();
+ }
+ else if(detach == null || detach.getError() != null)
+ {
+ getLink().setLinkAttachmentToNull();
+ // TODO do we have to set an error?
+ detach();
+ }
+ else
+ {
+ detach();
+ }
+ }
+
+ @Override
+ protected void handle(Binary deliveryTag, DeliveryState state, Boolean settled)
+ {
+ if(Boolean.TRUE.equals(settled))
+ {
+ _unsettledMap.remove(deliveryTag);
+ }
+ }
+
+
private MessageMetaData_1_0 createMessageMetaData(final List<QpidByteBuffer> fragments,
final List<EncodingRetainingSection<?>> dataSections)
{
@@ -334,7 +360,7 @@ public class StandardReceivingLink_1_0 i
List<EncodingRetainingSection<?>> sections;
try
{
- sections = _sectionDecoder.parseAll(fragments);
+ sections = _sectionDecoder.parseAll(fragments);
}
catch (AmqpErrorException e)
{
@@ -396,7 +422,8 @@ public class StandardReceivingLink_1_0 i
contentSize += s.getEncodedSize();
dataSections.add(s);
s = iter.hasNext() ? iter.next() : null;
- } while (s instanceof DataSection);
+ }
+ while (s instanceof DataSection);
}
else if (s instanceof AmqpSequenceSection)
{
@@ -416,7 +443,7 @@ public class StandardReceivingLink_1_0 i
}
if (s != null)
{
- // TODO error
+ throw new ConnectionScopedRuntimeException(String.format("Encountered unexpected section '%s'", s.getClass().getSimpleName()));
}
return new MessageMetaData_1_0(headerSection,
deliveryAnnotationsSection,
@@ -428,58 +455,9 @@ public class StandardReceivingLink_1_0 i
contentSize);
}
- private ReceiverSettleMode getReceivingSettlementMode()
- {
- return _receivingSettlementMode;
- }
-
- public void remoteDetached(LinkEndpoint endpoint, Detach detach)
- {
- //TODO
- // if not durable or close
- if(!TerminusDurability.UNSETTLED_STATE.equals(_durability) ||
- (detach != null && Boolean.TRUE.equals(detach.getClosed())))
- {
- endpoint.close();
- }
- else if(detach == null || detach.getError() != null)
- {
- _attachment = null;
- }
- }
-
- public void start()
- {
- getEndpoint().setLinkCredit(UnsignedInteger.valueOf(_destination.getCredit()));
- getEndpoint().setCreditWindow();
- }
-
- public ReceivingLinkEndpoint getEndpoint()
+ public void doLinkAttachment()
{
- return _attachment.getEndpoint();
- }
-
-
- public Session_1_0 getSession()
- {
- ReceivingLinkAttachment attachment = _attachment;
- return attachment == null ? null : attachment.getSession();
- }
-
- public void handle(Binary deliveryTag, DeliveryState state, Boolean settled)
- {
- if(Boolean.TRUE.equals(settled))
- {
- _unsettledMap.remove(deliveryTag);
- }
- }
-
- public void setLinkAttachment(ReceivingLinkAttachment linkAttachment)
- {
- _attachment = linkAttachment;
- _receivingSettlementMode = linkAttachment.getEndpoint().getReceivingSettlementMode();
- ReceivingLinkEndpoint endpoint = linkAttachment.getEndpoint();
- Map initialUnsettledMap = endpoint.getInitialUnsettledMap();
+ Map initialUnsettledMap = getInitialUnsettledMap();
Map<Binary, Outcome> unsettledCopy = new HashMap<Binary, Outcome>(_unsettledMap);
for(Map.Entry<Binary, Outcome> entry : unsettledCopy.entrySet())
@@ -490,16 +468,12 @@ public class StandardReceivingLink_1_0 i
_unsettledMap.remove(deliveryTag);
}
}
-
}
- public Map getUnsettledOutcomeMap()
+ public Map<Binary, Outcome> getUnsettledOutcomeMap()
{
return _unsettledMap;
}
- public ReceivingDestination getDestination()
- {
- return _destination;
- }
+
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java?rev=1783244&r1=1783243&r2=1783244&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java Thu Feb 16 16:11:27 2017
@@ -20,486 +20,30 @@
*/
package org.apache.qpid.server.protocol.v1_0;
-import java.security.AccessControlException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.NamedAddressSpace;
-import org.apache.qpid.server.plugin.MessageFormat;
-import org.apache.qpid.server.protocol.MessageFormatRegistry;
-import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
-import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
-import org.apache.qpid.server.protocol.v1_0.type.Binary;
-import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
-import org.apache.qpid.server.protocol.v1_0.type.Outcome;
-import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpSequenceSection;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.ApplicationPropertiesSection;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.DataSection;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.DeliveryAnnotationsSection;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.FooterSection;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.HeaderSection;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.MessageAnnotationsSection;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.PropertiesSection;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
-import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
-import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
-import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-import org.apache.qpid.server.store.MessageHandle;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.txn.AutoCommitTransaction;
-import org.apache.qpid.server.txn.LocalTransaction;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
-
public class StandardReceivingLink_1_0 implements ReceivingLink_1_0
{
- private static final Logger LOGGER = LoggerFactory.getLogger(StandardReceivingLink_1_0.class);
- private NamedAddressSpace _addressSpace;
-
- private ReceivingDestination _destination;
- private SectionDecoderImpl _sectionDecoder;
- private volatile ReceivingLinkAttachment _attachment;
-
-
- private ArrayList<Transfer> _incompleteMessage;
- private TerminusDurability _durability;
-
- private Map<Binary, Outcome> _unsettledMap = Collections.synchronizedMap(new HashMap<Binary, Outcome>());
- private boolean _resumedMessage;
- private Binary _messageDeliveryTag;
- private ReceiverSettleMode _receivingSettlementMode;
-
-
- public StandardReceivingLink_1_0(ReceivingLinkAttachment receivingLinkAttachment, NamedAddressSpace addressSpace,
- ReceivingDestination destination)
- {
- _addressSpace = addressSpace;
- _destination = destination;
- _attachment = receivingLinkAttachment;
- _receivingSettlementMode = receivingLinkAttachment.getEndpoint().getReceivingSettlementMode();
- _durability = ((Target)receivingLinkAttachment.getTarget()).getDurable();
-
- _sectionDecoder = new SectionDecoderImpl(receivingLinkAttachment.getEndpoint().getSession().getConnection().getSectionDecoderRegistry());
-
-
- }
-
- public Error messageTransfer(Transfer xfr)
- {
- List<QpidByteBuffer> fragments = null;
-
- org.apache.qpid.server.protocol.v1_0.type.DeliveryState xfrState = xfr.getState();
- final Binary deliveryTag = xfr.getDeliveryTag();
- UnsignedInteger messageFormat = null;
- if(Boolean.TRUE.equals(xfr.getMore()) && _incompleteMessage == null)
- {
- _incompleteMessage = new ArrayList<>();
- _incompleteMessage.add(xfr);
- _resumedMessage = Boolean.TRUE.equals(xfr.getResume());
- _messageDeliveryTag = deliveryTag;
- return null;
- }
- else if(_incompleteMessage != null)
- {
- _incompleteMessage.add(xfr);
-
- if(Boolean.TRUE.equals(xfr.getMore()))
- {
- return null;
- }
-
- fragments = new ArrayList<>(_incompleteMessage.size());
-
- for(Transfer t : _incompleteMessage)
- {
- if(t.getMessageFormat() != null && messageFormat == null)
- {
- messageFormat = t.getMessageFormat();
- }
- fragments.addAll(t.getPayload());
- t.dispose();
- }
- _incompleteMessage=null;
-
- }
- else
- {
- _resumedMessage = Boolean.TRUE.equals(xfr.getResume());
- _messageDeliveryTag = deliveryTag;
- fragments = xfr.getPayload();
- messageFormat = xfr.getMessageFormat();
-
- xfr.dispose();
- }
-
- if(_resumedMessage)
- {
- if(_unsettledMap.containsKey(_messageDeliveryTag))
- {
- Outcome outcome = _unsettledMap.get(_messageDeliveryTag);
- boolean settled = ReceiverSettleMode.FIRST.equals(getReceivingSettlementMode());
- getEndpoint().updateDisposition(_messageDeliveryTag, (DeliveryState) outcome, settled);
- if(settled)
- {
- _unsettledMap.remove(_messageDeliveryTag);
- }
- }
- else
- {
- throw new ConnectionScopedRuntimeException("Unexpected delivery Tag: " + _messageDeliveryTag + "_unsettledMap: " + _unsettledMap);
- }
- }
- else
- {
- ServerMessage<?> serverMessage;
- String routingAddress;
-
- if(messageFormat == null || UnsignedInteger.ZERO.equals(messageFormat))
- {
- List<EncodingRetainingSection<?>> dataSections = new ArrayList<>();
-
- MessageMetaData_1_0 mmd = createMessageMetaData(fragments, dataSections);
- MessageHandle<MessageMetaData_1_0> handle = _addressSpace.getMessageStore().addMessage(mmd);
-
- for (EncodingRetainingSection<?> dataSection : dataSections)
- {
- for (QpidByteBuffer buf : dataSection.getEncodedForm())
- {
- handle.addContent(buf);
- buf.dispose();
- }
- }
- final StoredMessage<MessageMetaData_1_0> storedMessage = handle.allContentAdded();
- Message_1_0 message = new Message_1_0(storedMessage, getSession().getConnection().getReference());
- routingAddress = _destination.getRoutingAddress(message);
- serverMessage = message;
- }
- else
- {
- MessageFormat format = MessageFormatRegistry.getFormat(messageFormat.intValue());
- if(format != null)
- {
- serverMessage = format.createMessage(fragments, _addressSpace.getMessageStore(), getSession().getConnection().getReference());
- routingAddress = format.getRoutingAddress(serverMessage, _destination.getAddress());
- }
- else
- {
- final Error err = new Error();
- err.setCondition(AmqpError.NOT_IMPLEMENTED);
- err.setDescription("Unknown message format: " + messageFormat);
- return err;
- }
- }
-
- for(QpidByteBuffer fragment: fragments)
- {
- fragment.dispose();
- }
- fragments = null;
-
- MessageReference<?> reference = serverMessage.newReference();
- try
- {
- Binary transactionId = null;
- if (xfrState != null)
- {
- if (xfrState instanceof TransactionalState)
- {
- transactionId = ((TransactionalState) xfrState).getTxnId();
- }
- }
-
- ServerTransaction transaction = null;
- if (transactionId != null)
- {
- transaction = getSession().getTransaction(transactionId);
- }
- else
- {
- transaction = new AutoCommitTransaction(_addressSpace.getMessageStore());
- }
-
- try
- {
- Session_1_0 session = getSession();
-
- session.getAMQPConnection()
- .checkAuthorizedMessagePrincipal(serverMessage.getMessageHeader().getUserId());
- _destination.authorizePublish(session.getSecurityToken(), routingAddress);
-
- Outcome outcome = _destination.send(serverMessage, routingAddress, transaction, session.getCapacityCheckAction());
- Source source = (Source) getEndpoint().getSource();
-
- DeliveryState resultantState;
-
- if(source.getOutcomes() == null || Arrays.asList(source.getOutcomes()).contains(outcome.getSymbol()))
- {
- if (transactionId == null)
- {
- resultantState = (DeliveryState) outcome;
- }
- else
- {
- TransactionalState transactionalState = new TransactionalState();
- transactionalState.setOutcome(outcome);
- transactionalState.setTxnId(transactionId);
- resultantState = transactionalState;
- }
- }
- else if(transactionId != null)
- {
- // cause the txn to fail
- if(transaction instanceof LocalTransaction)
- {
- ((LocalTransaction) transaction).setRollbackOnly();
- }
- resultantState = null;
- }
- else
- {
- // we should just use the default outcome
- resultantState = null;
- }
-
-
- boolean settled = ReceiverSettleMode.FIRST.equals(getReceivingSettlementMode() );
+ private volatile ReceivingLinkEndpoint _linkEndpoint;
- if (!settled)
- {
- _unsettledMap.put(deliveryTag, outcome);
- }
-
- getEndpoint().updateDisposition(deliveryTag, resultantState, settled);
-
- getSession().getAMQPConnection()
- .registerMessageReceived(serverMessage.getSize(), serverMessage.getArrivalTime());
-
- if (!(transaction instanceof AutoCommitTransaction))
- {
- ServerTransaction.Action a;
- transaction.addPostTransactionAction(new ServerTransaction.Action()
- {
- public void postCommit()
- {
- getEndpoint().updateDisposition(deliveryTag, null, true);
- }
-
- public void onRollback()
- {
- getEndpoint().updateDisposition(deliveryTag, null, true);
- }
- });
- }
- }
- catch (AccessControlException e)
- {
- final Error err = new Error();
- err.setCondition(AmqpError.NOT_ALLOWED);
- err.setDescription(e.getMessage());
- _attachment.getEndpoint().close(err);
-
- }
- }
- finally
- {
- reference.release();
- }
- }
- return null;
- }
-
- private MessageMetaData_1_0 createMessageMetaData(final List<QpidByteBuffer> fragments,
- final List<EncodingRetainingSection<?>> dataSections)
- {
-
- List<EncodingRetainingSection<?>> sections;
- try
- {
- sections = _sectionDecoder.parseAll(fragments);
- }
- catch (AmqpErrorException e)
- {
- LOGGER.error("Decoding read section error", e);
- // TODO - fix error handling
- throw new IllegalArgumentException(e);
- }
-
- long contentSize = 0L;
-
- HeaderSection headerSection = null;
- PropertiesSection propertiesSection = null;
- DeliveryAnnotationsSection deliveryAnnotationsSection = null;
- MessageAnnotationsSection messageAnnotationsSection = null;
- ApplicationPropertiesSection applicationPropertiesSection = null;
- FooterSection footerSection = null;
- Iterator<EncodingRetainingSection<?>> iter = sections.iterator();
- EncodingRetainingSection<?> s = iter.hasNext() ? iter.next() : null;
- if (s instanceof HeaderSection)
- {
- headerSection = (HeaderSection) s;
- s = iter.hasNext() ? iter.next() : null;
- }
-
- if (s instanceof DeliveryAnnotationsSection)
- {
- deliveryAnnotationsSection = (DeliveryAnnotationsSection) s;
- s = iter.hasNext() ? iter.next() : null;
- }
-
- if (s instanceof MessageAnnotationsSection)
- {
- messageAnnotationsSection = (MessageAnnotationsSection) s;
- s = iter.hasNext() ? iter.next() : null;
- }
-
- if (s instanceof PropertiesSection)
- {
- propertiesSection = (PropertiesSection) s;
- s = iter.hasNext() ? iter.next() : null;
- }
-
- if (s instanceof ApplicationPropertiesSection)
- {
- applicationPropertiesSection = (ApplicationPropertiesSection) s;
- s = iter.hasNext() ? iter.next() : null;
- }
-
- if (s instanceof AmqpValueSection)
- {
- contentSize = s.getEncodedSize();
- dataSections.add(s);
- s = iter.hasNext() ? iter.next() : null;
- }
- else if (s instanceof DataSection)
- {
- do
- {
- contentSize += s.getEncodedSize();
- dataSections.add(s);
- s = iter.hasNext() ? iter.next() : null;
- } while (s instanceof DataSection);
- }
- else if (s instanceof AmqpSequenceSection)
- {
- do
- {
- contentSize += s.getEncodedSize();
- dataSections.add(s);
- s = iter.hasNext() ? iter.next() : null;
- }
- while (s instanceof AmqpSequenceSection);
- }
-
- if (s instanceof FooterSection)
- {
- footerSection = (FooterSection) s;
- s = iter.hasNext() ? iter.next() : null;
- }
- if (s != null)
- {
- // TODO error
- }
- return new MessageMetaData_1_0(headerSection,
- deliveryAnnotationsSection,
- messageAnnotationsSection,
- propertiesSection,
- applicationPropertiesSection,
- footerSection,
- System.currentTimeMillis(),
- contentSize);
- }
-
- private ReceiverSettleMode getReceivingSettlementMode()
+ public StandardReceivingLink_1_0(final ReceivingLinkEndpoint endpoint)
{
- return _receivingSettlementMode;
- }
-
- public void remoteDetached(LinkEndpoint endpoint, Detach detach)
- {
- //TODO
- // if not durable or close
- if(!TerminusDurability.UNSETTLED_STATE.equals(_durability) ||
- (detach != null && Boolean.TRUE.equals(detach.getClosed())))
- {
- endpoint.close();
- }
- else if(detach == null || detach.getError() != null)
- {
- _attachment = null;
- }
- }
-
- public void start()
- {
- getEndpoint().setLinkCredit(UnsignedInteger.valueOf(_destination.getCredit()));
- getEndpoint().setCreditWindow();
+ _linkEndpoint = endpoint;
}
public ReceivingLinkEndpoint getEndpoint()
{
- return _attachment.getEndpoint();
- }
-
-
- public Session_1_0 getSession()
- {
- ReceivingLinkAttachment attachment = _attachment;
- return attachment == null ? null : attachment.getSession();
+ return _linkEndpoint;
}
- public void handle(Binary deliveryTag, DeliveryState state, Boolean settled)
+ public void setLinkAttachment(final ReceivingLinkEndpoint linkEndpoint)
{
- if(Boolean.TRUE.equals(settled))
- {
- _unsettledMap.remove(deliveryTag);
- }
+ _linkEndpoint = linkEndpoint;
+ ((StandardReceivingLinkEndpoint)getEndpoint()).doLinkAttachment();
}
- public void setLinkAttachment(ReceivingLinkAttachment linkAttachment)
+ @Override
+ public void setLinkAttachmentToNull()
{
- _attachment = linkAttachment;
- _receivingSettlementMode = linkAttachment.getEndpoint().getReceivingSettlementMode();
- ReceivingLinkEndpoint endpoint = linkAttachment.getEndpoint();
- Map initialUnsettledMap = endpoint.getInitialUnsettledMap();
-
- Map<Binary, Outcome> unsettledCopy = new HashMap<Binary, Outcome>(_unsettledMap);
- for(Map.Entry<Binary, Outcome> entry : unsettledCopy.entrySet())
- {
- Binary deliveryTag = entry.getKey();
- if(!initialUnsettledMap.containsKey(deliveryTag))
- {
- _unsettledMap.remove(deliveryTag);
- }
- }
-
+ _linkEndpoint = null;
}
- public Map getUnsettledOutcomeMap()
- {
- return _unsettledMap;
- }
-
- public ReceivingDestination getDestination()
- {
- return _destination;
- }
}
Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java (from r1783242, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java&r1=1783242&r2=1783244&rev=1783244&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java Thu Feb 16 16:11:27 2017
@@ -1,5 +1,4 @@
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,8 +15,8 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
*/
+
package org.apache.qpid.server.protocol.v1_0;
import java.util.ArrayList;
@@ -25,13 +24,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.model.NamedAddressSpace;
-import org.apache.qpid.server.protocol.v1_0.codec.QpidByteBufferUtils;
-import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
@@ -40,43 +33,42 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
-import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
import org.apache.qpid.server.protocol.v1_0.type.transaction.Declare;
import org.apache.qpid.server.protocol.v1_0.type.transaction.Declared;
import org.apache.qpid.server.protocol.v1_0.type.transaction.Discharge;
-import org.apache.qpid.server.protocol.v1_0.type.transaction.TxnCapability;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
-public class TxnCoordinatorReceivingLink_1_0 implements ReceivingLink_1_0
+public class TxnCoordinatorReceivingLinkEndpoint extends ReceivingLinkEndpoint
{
- private static final Logger _logger = LoggerFactory.getLogger(TxnCoordinatorReceivingLink_1_0.class);
- private NamedAddressSpace _namedAddressSpace;
- private ReceivingLinkEndpoint _endpoint;
-
- private ArrayList<Transfer> _incompleteMessage;
- private SectionDecoder _sectionDecoder;
+ private final SectionDecoderImpl _sectionDecoder;
private final LinkedHashMap<Integer, ServerTransaction> _createdTransactions = new LinkedHashMap<>();
- private Session_1_0 _session;
+ private ArrayList<Transfer> _incompleteMessage;
+
+ public TxnCoordinatorReceivingLinkEndpoint(final Session_1_0 session,
+ final Attach attach)
+ {
+ super(session, attach);
+ _sectionDecoder = new SectionDecoderImpl(getSession().getConnection().getDescribedTypeRegistry().getSectionDecoderRegistry());
+ }
- public TxnCoordinatorReceivingLink_1_0(NamedAddressSpace namedAddressSpace,
- Session_1_0 session_1_0,
- ReceivingLinkEndpoint endpoint)
+ @Override
+ public void start()
{
- _namedAddressSpace = namedAddressSpace;
- _session = session_1_0;
- _endpoint = endpoint;
- _sectionDecoder = new SectionDecoderImpl(endpoint.getSession().getConnection().getDescribedTypeRegistry().getSectionDecoderRegistry());
- ((Coordinator)endpoint.getTarget()).setCapabilities(TxnCapability.LOCAL_TXN, TxnCapability.MULTI_SSNS_PER_TXN, TxnCapability.MULTI_TXNS_PER_SSN);
+ setLinkCredit(UnsignedInteger.ONE);
+ setCreditWindow();
}
- public Error messageTransfer(Transfer xfr)
+ @Override
+ protected Error messageTransfer(Transfer xfr)
{
List<QpidByteBuffer> payload = new ArrayList<>();
@@ -96,13 +88,11 @@ public class TxnCoordinatorReceivingLink
return null;
}
- int size = 0;
for(Transfer t : _incompleteMessage)
{
final List<QpidByteBuffer> bufs = t.getPayload();
if(bufs != null)
{
- size += QpidByteBufferUtils.remaining(bufs);
payload.addAll(bufs);
}
t.dispose();
@@ -120,44 +110,51 @@ public class TxnCoordinatorReceivingLink
try
{
List<EncodingRetainingSection<?>> sections = _sectionDecoder.parseAll(payload);
+ boolean amqpValueSectionFound = false;
for(EncodingRetainingSection section : sections)
{
if(section instanceof AmqpValueSection)
{
+ if (amqpValueSectionFound)
+ {
+ throw new ConnectionScopedRuntimeException("Received more than one AmqpValue sections");
+ }
+ amqpValueSectionFound = true;
Object command = section.getValue();
-
if(command instanceof Declare)
{
- final IdentifiedTransaction txn = _session.getConnection().createLocalTransaction();
+ final IdentifiedTransaction txn = getSession().getConnection().createLocalTransaction();
_createdTransactions.put(txn.getId(), txn.getServerTransaction());
Declared state = new Declared();
- _session.incrementStartedTransactions();
+ getSession().incrementStartedTransactions();
- state.setTxnId(_session.integerToBinary(txn.getId()));
- _endpoint.updateDisposition(deliveryTag, state, true);
+ state.setTxnId(getSession().integerToBinary(txn.getId()));
+ updateDisposition(deliveryTag, state, true);
}
else if(command instanceof Discharge)
{
Discharge discharge = (Discharge) command;
- final Error error = discharge(_session.binaryToInteger(discharge.getTxnId()),
+ final Error error = discharge(getSession().binaryToInteger(discharge.getTxnId()),
Boolean.TRUE.equals(discharge.getFail()));
- _endpoint.updateDisposition(deliveryTag, error == null ? new Accepted() : null, true);
+ updateDisposition(deliveryTag, error == null ? new Accepted() : null, true);
return error;
}
else
{
- // TODO error handling
-
- // also should panic if we receive more than one AmqpValue, or no AmqpValue section
+ throw new ConnectionScopedRuntimeException(String.format("Received unknown command '%s'",
+ command.getClass().getSimpleName()));
}
}
}
-
+ if (!amqpValueSectionFound)
+ {
+ throw new ConnectionScopedRuntimeException("Received no AmqpValue section");
+ }
}
catch (AmqpErrorException e)
{
@@ -173,25 +170,6 @@ public class TxnCoordinatorReceivingLink
return null;
}
- public void remoteDetached(LinkEndpoint endpoint, Detach detach)
- {
- // force rollback of open transactions
- for(Map.Entry<Integer, ServerTransaction> entry : _createdTransactions.entrySet())
- {
- entry.getValue().rollback();
- _session.incrementRolledBackTransactions();
- _session.getConnection().removeTransaction(entry.getKey());
- }
- endpoint.detach();
- }
-
-
- @Override
- public void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
- {
-
- }
-
private Error discharge(Integer transactionId, boolean fail)
{
Error error = null;
@@ -201,23 +179,23 @@ public class TxnCoordinatorReceivingLink
if(fail)
{
txn.rollback();
- _session.incrementRolledBackTransactions();
+ getSession().incrementRolledBackTransactions();
}
else if(!(txn instanceof LocalTransaction && ((LocalTransaction)txn).isRollbackOnly()))
{
txn.commit();
- _session.incrementCommittedTransactions();
+ getSession().incrementCommittedTransactions();
}
else
{
txn.rollback();
- _session.incrementRolledBackTransactions();
+ getSession().incrementRolledBackTransactions();
error = new Error();
error.setCondition(LinkError.DETACH_FORCED);
error.setDescription("The transaction was marked as rollback only due to an earlier issue (e.g. a published message was sent settled but could not be enqueued)");
}
_createdTransactions.remove(transactionId);
- _session.getConnection().removeTransaction(transactionId);
+ getSession().getConnection().removeTransaction(transactionId);
}
else
{
@@ -228,11 +206,24 @@ public class TxnCoordinatorReceivingLink
return error;
}
+ @Override
+ protected void remoteDetachedPerformDetach(Detach detach)
+ {
+ // force rollback of open transactions
+ for(Map.Entry<Integer, ServerTransaction> entry : _createdTransactions.entrySet())
+ {
+ entry.getValue().rollback();
+ getSession().incrementRolledBackTransactions();
+ getSession().getConnection().removeTransaction(entry.getKey());
+ }
+ detach();
+ }
-
- public void start()
+ @Override
+ protected void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
{
- _endpoint.setLinkCredit(UnsignedInteger.ONE);
- _endpoint.setCreditWindow();
+
}
+
+
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java?rev=1783244&r1=1783243&r2=1783244&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java Thu Feb 16 16:11:27 2017
@@ -20,219 +20,20 @@
*/
package org.apache.qpid.server.protocol.v1_0;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.model.NamedAddressSpace;
-import org.apache.qpid.server.protocol.v1_0.codec.QpidByteBufferUtils;
-import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder;
-import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
-import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
-import org.apache.qpid.server.protocol.v1_0.type.Binary;
-import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
-import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
-import org.apache.qpid.server.protocol.v1_0.type.transaction.Declare;
-import org.apache.qpid.server.protocol.v1_0.type.transaction.Declared;
-import org.apache.qpid.server.protocol.v1_0.type.transaction.Discharge;
import org.apache.qpid.server.protocol.v1_0.type.transaction.TxnCapability;
-import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
-import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-import org.apache.qpid.server.txn.LocalTransaction;
-import org.apache.qpid.server.txn.ServerTransaction;
public class TxnCoordinatorReceivingLink_1_0 implements ReceivingLink_1_0
{
- private static final Logger _logger = LoggerFactory.getLogger(TxnCoordinatorReceivingLink_1_0.class);
- private NamedAddressSpace _namedAddressSpace;
- private ReceivingLinkEndpoint _endpoint;
-
- private ArrayList<Transfer> _incompleteMessage;
- private SectionDecoder _sectionDecoder;
- private final LinkedHashMap<Integer, ServerTransaction> _createdTransactions = new LinkedHashMap<>();
- private Session_1_0 _session;
-
-
- public TxnCoordinatorReceivingLink_1_0(NamedAddressSpace namedAddressSpace,
- Session_1_0 session_1_0,
- ReceivingLinkEndpoint endpoint)
+ public TxnCoordinatorReceivingLink_1_0(ReceivingLinkEndpoint endpoint)
{
- _namedAddressSpace = namedAddressSpace;
- _session = session_1_0;
- _endpoint = endpoint;
- _sectionDecoder = new SectionDecoderImpl(endpoint.getSession().getConnection().getDescribedTypeRegistry().getSectionDecoderRegistry());
((Coordinator)endpoint.getTarget()).setCapabilities(TxnCapability.LOCAL_TXN, TxnCapability.MULTI_SSNS_PER_TXN, TxnCapability.MULTI_TXNS_PER_SSN);
}
- public Error messageTransfer(Transfer xfr)
- {
- List<QpidByteBuffer> payload = new ArrayList<>();
-
- final Binary deliveryTag = xfr.getDeliveryTag();
-
- if(Boolean.TRUE.equals(xfr.getMore()) && _incompleteMessage == null)
- {
- _incompleteMessage = new ArrayList<Transfer>();
- _incompleteMessage.add(xfr);
- return null;
- }
- else if(_incompleteMessage != null)
- {
- _incompleteMessage.add(xfr);
- if(Boolean.TRUE.equals(xfr.getMore()))
- {
- return null;
- }
-
- int size = 0;
- for(Transfer t : _incompleteMessage)
- {
- final List<QpidByteBuffer> bufs = t.getPayload();
- if(bufs != null)
- {
- size += QpidByteBufferUtils.remaining(bufs);
- payload.addAll(bufs);
- }
- t.dispose();
- }
- _incompleteMessage=null;
-
- }
- else
- {
- payload.addAll(xfr.getPayload());
- xfr.dispose();
- }
-
- // Only interested in the amqp-value section that holds the message to the coordinator
- try
- {
- List<EncodingRetainingSection<?>> sections = _sectionDecoder.parseAll(payload);
- for(EncodingRetainingSection section : sections)
- {
- if(section instanceof AmqpValueSection)
- {
- Object command = section.getValue();
-
-
- if(command instanceof Declare)
- {
- final IdentifiedTransaction txn = _session.getConnection().createLocalTransaction();
- _createdTransactions.put(txn.getId(), txn.getServerTransaction());
-
- Declared state = new Declared();
-
- _session.incrementStartedTransactions();
-
- state.setTxnId(_session.integerToBinary(txn.getId()));
- _endpoint.updateDisposition(deliveryTag, state, true);
-
- }
- else if(command instanceof Discharge)
- {
- Discharge discharge = (Discharge) command;
-
- final Error error = discharge(_session.binaryToInteger(discharge.getTxnId()),
- Boolean.TRUE.equals(discharge.getFail()));
- _endpoint.updateDisposition(deliveryTag, error == null ? new Accepted() : null, true);
- return error;
- }
- else
- {
- // TODO error handling
-
- // also should panic if we receive more than one AmqpValue, or no AmqpValue section
- }
- }
- }
-
- }
- catch (AmqpErrorException e)
- {
- return e.getError();
- }
- finally
- {
- for(QpidByteBuffer buf : payload)
- {
- buf.dispose();
- }
- }
- return null;
- }
-
- public void remoteDetached(LinkEndpoint endpoint, Detach detach)
- {
- // force rollback of open transactions
- for(Map.Entry<Integer, ServerTransaction> entry : _createdTransactions.entrySet())
- {
- entry.getValue().rollback();
- _session.incrementRolledBackTransactions();
- _session.getConnection().removeTransaction(entry.getKey());
- }
- endpoint.detach();
- }
-
@Override
- public void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
+ public void setLinkAttachmentToNull()
{
-
- }
-
- private Error discharge(Integer transactionId, boolean fail)
- {
- Error error = null;
- ServerTransaction txn = _createdTransactions.get(transactionId);
- if(txn != null)
- {
- if(fail)
- {
- txn.rollback();
- _session.incrementRolledBackTransactions();
- }
- else if(!(txn instanceof LocalTransaction && ((LocalTransaction)txn).isRollbackOnly()))
- {
- txn.commit();
- _session.incrementCommittedTransactions();
- }
- else
- {
- txn.rollback();
- _session.incrementRolledBackTransactions();
- error = new Error();
- error.setCondition(LinkError.DETACH_FORCED);
- error.setDescription("The transaction was marked as rollback only due to an earlier issue (e.g. a published message was sent settled but could not be enqueued)");
- }
- _createdTransactions.remove(transactionId);
- _session.getConnection().removeTransaction(transactionId);
- }
- else
- {
- error = new Error();
- error.setCondition(AmqpError.NOT_FOUND);
- error.setDescription("Unknown transactionId" + transactionId);
- }
- return error;
}
-
-
- public void start()
- {
- _endpoint.setLinkCredit(UnsignedInteger.ONE);
- _endpoint.setCreditWindow();
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org