You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2016/12/20 18:50:40 UTC

svn commit: r1775336 - in /qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0: Delivery.java ReceivingLinkEndpoint.java Session_1_0.java

Author: kwall
Date: Tue Dec 20 18:50:40 2016
New Revision: 1775336

URL: http://svn.apache.org/viewvc?rev=1775336&view=rev
Log:
QPID-7592: [Java Broker] Remove settled entries from the outgoing unsettled map.

Also removed dead code from ReceivingLinkEndpoint

Modified:
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java?rev=1775336&r1=1775335&r2=1775336&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java Tue Dec 20 18:50:40 2016
@@ -25,11 +25,12 @@ import org.apache.qpid.server.protocol.v
 
 public class Delivery
 {
-    private boolean _complete;
-    private boolean _settled;
     private final UnsignedInteger _deliveryId;
     private final Binary _deliveryTag;
     private final LinkEndpoint _linkEndpoint;
+    private boolean _complete;
+    private boolean _settled;
+    private int _numberOfTransfers = 0;
 
     public Delivery(Transfer transfer, final LinkEndpoint endpoint)
     {
@@ -60,8 +61,9 @@ public class Delivery
         _settled = settled;
     }
 
-    public void addTransfer(Transfer transfer)
+    public final void addTransfer(Transfer transfer)
     {
+        _numberOfTransfers++;
         if(Boolean.TRUE.equals(transfer.getAborted()) || !Boolean.TRUE.equals(transfer.getMore()))
         {
             setComplete(true);
@@ -86,4 +88,9 @@ public class Delivery
     {
         return _deliveryTag;
     }
+
+    public int getNumberOfTransfers()
+    {
+        return _numberOfTransfers;
+    }
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java?rev=1775336&r1=1775335&r2=1775336&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java Tue Dec 20 18:50:40 2016
@@ -21,14 +21,9 @@
 
 package org.apache.qpid.server.protocol.v1_0;
 
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
 
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
@@ -90,7 +85,6 @@ public class ReceivingLinkEndpoint exten
     private Map<Binary, TransientState> _unsettledIds = new LinkedHashMap<Binary, TransientState>();
     private boolean _creditWindow;
     private boolean _remoteDrain;
-    private UnsignedInteger _remoteTransferCount;
     private UnsignedInteger _drainLimit;
 
 
@@ -262,14 +256,6 @@ public class ReceivingLinkEndpoint exten
         sendFlowConditional();
     }
 
-    public void drain()
-    {
-        setDrain(true);
-        _creditWindow = false;
-        _drainLimit = getDeliveryCount().add(getLinkCredit());
-        sendFlowWithEcho();
-    }
-
     @Override
     public void receiveDeliveryState(final Delivery unsettled, final DeliveryState state, final Boolean settled)
     {
@@ -284,102 +270,9 @@ public class ReceivingLinkEndpoint exten
         }
     }
 
-    public void requestTransactionalSend(Object txnId)
-    {
-        setDrain(true);
-        _creditWindow = false;
-        setTransactionId(txnId);
-        sendFlow();
-    }
-
-    private void sendFlow(final Object transactionId)
-    {
-        sendFlow();
-    }
-
-
-    public void clearDrain()
-    {
-        setDrain(false);
-        sendFlow();
-    }
-
-    public void updateAllDisposition(Binary deliveryTag, DeliveryState deliveryState, boolean settled)
-    {
-        if(!_unsettledIds.isEmpty())
-        {
-            Binary firstTag = _unsettledIds.keySet().iterator().next();
-            Binary lastTag = deliveryTag;
-            updateDispositions(firstTag, lastTag, deliveryState, settled);
-        }
-    }
-
-    private void updateDispositions(Binary firstTag, Binary lastTag, DeliveryState state, boolean settled)
-    {
-        SortedMap<UnsignedInteger, UnsignedInteger> ranges = new TreeMap<UnsignedInteger, UnsignedInteger>();
-
-        Iterator<Binary> iter = _unsettledIds.keySet().iterator();
-        List<Binary> tagsToUpdate = new ArrayList<Binary>();
-        Binary tag = null;
 
-        while (iter.hasNext() && !(tag = iter.next()).equals(firstTag)) ;
 
-        if (firstTag.equals(tag))
-        {
-            tagsToUpdate.add(tag);
-
-            UnsignedInteger deliveryId = _unsettledIds.get(firstTag).getDeliveryId();
-
-            UnsignedInteger first = deliveryId;
-            UnsignedInteger last = first;
-
-            if (!firstTag.equals(lastTag))
-            {
-                while (iter.hasNext())
-                {
-                    tag = iter.next();
-                    tagsToUpdate.add(tag);
 
-                    deliveryId = _unsettledIds.get(tag).getDeliveryId();
-
-                    if (deliveryId.equals(last.add(UnsignedInteger.ONE)))
-                    {
-                        last = deliveryId;
-                    }
-                    else
-                    {
-                        ranges.put(first, last);
-                        first = last = deliveryId;
-                    }
-
-                    if (tag.equals(lastTag))
-                    {
-                        break;
-                    }
-                }
-            }
-            ranges.put(first, last);
-        }
-
-        if (settled)
-        {
-
-            for (Binary deliveryTag : tagsToUpdate)
-            {
-                if (settled(deliveryTag) && _creditWindow)
-                {
-                    setLinkCredit(getLinkCredit().add(UnsignedInteger.ONE));
-                }
-            }
-            sendFlowConditional();
-        }
-
-
-        for (Map.Entry<UnsignedInteger, UnsignedInteger> range : ranges.entrySet())
-        {
-            getSession().updateDisposition(getRole(), range.getKey(), range.getValue(), state, settled);
-        }
-    }
 
     @Override
     public void settle(Binary deliveryTag)

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1775336&r1=1775335&r2=1775336&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Tue Dec 20 18:50:40 2016
@@ -142,7 +142,7 @@ public class Session_1_0 implements AMQS
     private final Set<ConsumerTarget_1_0> _consumersWithPendingWork = new ScheduledConsumerTargetSet<>();
     private Iterator<ConsumerTarget_1_0> _processPendingIterator;
 
-    private SessionState _state ;
+    private SessionState _state;
 
     private final Map<String, SendingLinkEndpoint> _sendingLinkMap = new HashMap<>();
     private final Map<String, ReceivingLinkEndpoint> _receivingLinkMap = new HashMap<>();
@@ -315,23 +315,16 @@ public class Session_1_0 implements AMQS
 
         disposition.setState(state);
 
-
-        if(settled)
+        if (settled)
         {
-            if(role == Role.RECEIVER)
+            final LinkedHashMap<UnsignedInteger, Delivery> unsettled =
+                    role == Role.RECEIVER ? _incomingUnsettled : _outgoingUnsettled;
+            SequenceNumber pos = new SequenceNumber(first.intValue());
+            SequenceNumber end = new SequenceNumber(last.intValue());
+            while (pos.compareTo(end) <= 0)
             {
-                SequenceNumber pos = new SequenceNumber(first.intValue());
-                SequenceNumber end = new SequenceNumber(last.intValue());
-                while(pos.compareTo(end)<=0)
-                {
-                    Delivery d = _incomingUnsettled.remove(new UnsignedInteger(pos.intValue()));
-
-/*
-                    _availableIncomingCredit += d.getTransfers().size();
-*/
-
-                    pos.incr();
-                }
+                unsettled.remove(new UnsignedInteger(pos.intValue()));
+                pos.incr();
             }
         }
 
@@ -355,35 +348,39 @@ public class Session_1_0 implements AMQS
     {
         _nextOutgoingTransferId.incr();
         UnsignedInteger deliveryId;
-        if(newDelivery)
+        final boolean settled = Boolean.TRUE.equals(xfr.getSettled());
+        if (newDelivery)
         {
             deliveryId = UnsignedInteger.valueOf(_nextOutgoingDeliveryId++);
             endpoint.setLastDeliveryId(deliveryId);
+            if (!settled)
+            {
+                final Delivery delivery = new Delivery(xfr, endpoint);
+                _outgoingUnsettled.put(deliveryId, delivery);
+                _outgoingSessionCredit = _outgoingSessionCredit.subtract(UnsignedInteger.ONE);
+                endpoint.addUnsettled(delivery);
+            }
         }
         else
         {
             deliveryId = endpoint.getLastDeliveryId();
-        }
-        xfr.setDeliveryId(deliveryId);
-
-        if(!Boolean.TRUE.equals(xfr.getSettled()))
-        {
-            Delivery delivery;
-            if((delivery = _outgoingUnsettled.get(deliveryId))== null)
+            final Delivery delivery = _outgoingUnsettled.get(deliveryId);
+            if (delivery != null)
             {
-                delivery = new Delivery(xfr, endpoint);
-                _outgoingUnsettled.put(deliveryId, delivery);
-
-            }
-            else
-            {
-                delivery.addTransfer(xfr);
+                if (!settled)
+                {
+                    delivery.addTransfer(xfr);
+                    _outgoingSessionCredit = _outgoingSessionCredit.subtract(UnsignedInteger.ONE);
+                }
+                else
+                {
+                    _outgoingSessionCredit = _outgoingSessionCredit.add(new UnsignedInteger(delivery.getNumberOfTransfers()));
+                    endpoint.settle(delivery.getDeliveryTag());
+                    _outgoingUnsettled.remove(deliveryId);
+                }
             }
-            _outgoingSessionCredit = _outgoingSessionCredit.subtract(UnsignedInteger.ONE);
-            endpoint.addUnsettled(delivery);
-
         }
-
+        xfr.setDeliveryId(deliveryId);
 
         try
         {
@@ -395,31 +392,29 @@ public class Session_1_0 implements AMQS
             {
                 // TODO - should make this iterative and not recursive
 
+                Transfer secondTransfer = new Transfer();
 
-                    Transfer secondTransfer = new Transfer();
-
-                    secondTransfer.setDeliveryTag(xfr.getDeliveryTag());
-                    secondTransfer.setHandle(xfr.getHandle());
-                    secondTransfer.setSettled(xfr.getSettled());
-                    secondTransfer.setState(xfr.getState());
-                    secondTransfer.setMessageFormat(xfr.getMessageFormat());
-                    secondTransfer.setPayload(payload);
-
-                    sendTransfer(secondTransfer, endpoint, false);
+                secondTransfer.setDeliveryTag(xfr.getDeliveryTag());
+                secondTransfer.setHandle(xfr.getHandle());
+                secondTransfer.setSettled(xfr.getSettled());
+                secondTransfer.setState(xfr.getState());
+                secondTransfer.setMessageFormat(xfr.getMessageFormat());
+                secondTransfer.setPayload(payload);
 
-                    secondTransfer.dispose();
+                sendTransfer(secondTransfer, endpoint, false);
 
+                secondTransfer.dispose();
             }
 
-            if(payload != null)
+            if (payload != null)
             {
-                for(QpidByteBuffer buf : payload)
+                for (QpidByteBuffer buf : payload)
                 {
                     buf.dispose();
                 }
             }
         }
-        catch(OversizeFrameException e)
+        catch (OversizeFrameException e)
         {
             throw new ConnectionScopedRuntimeException(e);
         }
@@ -453,10 +448,7 @@ public class Session_1_0 implements AMQS
                 reply.setError(error);
                 _connection.sendEnd(sendChannel, reply, true);
                 break;
-
-
         }
-
     }
 
     public UnsignedInteger getNextOutgoingId()
@@ -545,8 +537,12 @@ public class Session_1_0 implements AMQS
             if(delivery != null)
             {
                 delivery.getLinkEndpoint().receiveDeliveryState(delivery,
-                                                           disposition.getState(),
-                                                           disposition.getSettled());
+                                                                disposition.getState(),
+                                                                disposition.getSettled());
+                if (Boolean.TRUE.equals(disposition.getSettled()))
+                {
+                    unsettledTransfers.remove(deliveryId);
+                }
             }
             deliveryId = deliveryId.add(UnsignedInteger.ONE);
         }
@@ -1179,7 +1175,7 @@ public class Session_1_0 implements AMQS
         LifetimePolicy lifetimePolicy = properties == null
                                         ? null
                                         : (LifetimePolicy) properties.get(LIFETIME_POLICY);
-        Map<String,Object> attributes = new HashMap<String,Object>();
+        Map<String,Object> attributes = new HashMap<>();
         attributes.put(Queue.ID, UUID.randomUUID());
         attributes.put(Queue.NAME, queueName);
         attributes.put(Queue.DURABLE, false);
@@ -1262,7 +1258,9 @@ public class Session_1_0 implements AMQS
 
         byte[] data = txnId.getArray();
         if(data.length > 4)
+        {
             throw new IllegalArgumentException();
+        }
 
         int id = 0;
         for(int i = 0; i < data.length; i++)
@@ -1283,7 +1281,6 @@ public class Session_1_0 implements AMQS
         data[1] = (byte) ((txnId & 0xff0000) >> 16);
         data[0] = (byte) ((txnId & 0xff000000) >> 24);
         return new Binary(data);
-
     }
 
     @Override
@@ -1394,9 +1391,9 @@ public class Session_1_0 implements AMQS
 
     private boolean isQueueDestinationForLink(final Queue<?> queue, final ReceivingDestination recvDest)
     {
-        return (recvDest instanceof NodeReceivingDestination && queue == ((NodeReceivingDestination) recvDest).getDestination())
-                || recvDest instanceof QueueDestination && queue == ((QueueDestination) recvDest).getQueue();
-
+        return (recvDest instanceof NodeReceivingDestination
+                && queue == ((NodeReceivingDestination) recvDest).getDestination())
+               || recvDest instanceof QueueDestination && queue == ((QueueDestination) recvDest).getQueue();
     }
 
     @Override



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org