You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2013/11/08 17:17:28 UTC

svn commit: r1540104 - in /qpid/proton/trunk: proton-c/src/messenger/ proton-j/proton-api/src/main/java/org/apache/qpid/proton/amqp/transaction/ proton-j/proton-api/src/main/java/org/apache/qpid/proton/amqp/transport/ proton-j/proton-api/src/main/java/...

Author: kgiusti
Date: Fri Nov  8 16:17:27 2013
New Revision: 1540104

URL: http://svn.apache.org/r1540104
Log:
PROTON-444: port internal store from C implementation

Added:
    qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/Store.java   (with props)
    qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/StoreEntry.java   (with props)
Removed:
    qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerQueue.java
Modified:
    qpid/proton/trunk/proton-c/src/messenger/store.c
    qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/amqp/transaction/Coordinator.java
    qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/amqp/transport/Source.java
    qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/amqp/transport/Target.java
    qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Status.java
    qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
    qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
    qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerImpl.java

Modified: qpid/proton/trunk/proton-c/src/messenger/store.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/store.c?rev=1540104&r1=1540103&r2=1540104&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger/store.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger/store.c Fri Nov  8 16:17:27 2013
@@ -220,6 +220,7 @@ pni_entry_t *pni_store_put(pni_store_t *
   entry->store_prev = NULL;
   entry->delivery = NULL;
   entry->bytes = pn_buffer(64);
+  entry->status = PN_STATUS_UNKNOWN;
   LL_ADD(stream, stream, entry);
   LL_ADD(store, store, entry);
   store->size++;

Modified: qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/amqp/transaction/Coordinator.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/amqp/transaction/Coordinator.java?rev=1540104&r1=1540103&r2=1540104&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/amqp/transaction/Coordinator.java (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/amqp/transaction/Coordinator.java Fri Nov  8 16:17:27 2013
@@ -50,5 +50,10 @@ public final class Coordinator
                "capabilities=" + (_capabilities == null ? null : Arrays.asList(_capabilities)) +
                '}';
     }
+
+    public String getAddress()
+    {
+        return null;
+    }
 }
-  
\ No newline at end of file
+

Modified: qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/amqp/transport/Source.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/amqp/transport/Source.java?rev=1540104&r1=1540103&r2=1540104&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/amqp/transport/Source.java (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/amqp/transport/Source.java Fri Nov  8 16:17:27 2013
@@ -22,4 +22,5 @@ package org.apache.qpid.proton.amqp.tran
 
 public interface Source
 {
+    public String getAddress();
 }

Modified: qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/amqp/transport/Target.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/amqp/transport/Target.java?rev=1540104&r1=1540103&r2=1540104&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/amqp/transport/Target.java (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/amqp/transport/Target.java Fri Nov  8 16:17:27 2013
@@ -23,4 +23,5 @@ package org.apache.qpid.proton.amqp.tran
 
 public interface Target
 {
+    public String getAddress();
 }

Modified: qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Status.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Status.java?rev=1540104&r1=1540103&r2=1540104&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Status.java (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Status.java Fri Nov  8 16:17:27 2013
@@ -25,5 +25,6 @@ public enum Status
     UNKNOWN,
     PENDING,
     ACCEPTED,
-    REJECTED
+    REJECTED,
+    MODIFIED
 }

Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java?rev=1540104&r1=1540103&r2=1540104&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java Fri Nov  8 16:17:27 2013
@@ -169,7 +169,12 @@ public class DeliveryImpl implements Del
 
     public DeliveryImpl getWorkNext()
     {
-        return _workNext;
+        if (_workNext != null)
+            return _workNext;
+        // the following hack is brought to you by the C implementation!
+        if (!_work)  // not on the work list
+            return _link.getConnectionImpl().getWorkHead();
+        return null;
     }
 
     DeliveryImpl getWorkPrev()

Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java?rev=1540104&r1=1540103&r2=1540104&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java Fri Nov  8 16:17:27 2013
@@ -54,6 +54,8 @@ import org.apache.qpid.proton.messenger.
 import org.apache.qpid.proton.messenger.Tracker;
 import org.apache.qpid.proton.amqp.messaging.Source;
 import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
 
 import org.apache.qpid.proton.amqp.Binary;
 
@@ -70,15 +72,17 @@ public class MessengerImpl implements Me
     private long _timeout = -1;
     private boolean _blocking = true;
     private long _nextTag = 1;
-    private byte[] _buffer = new byte[5*1024];
     private Driver _driver;
     private int _receiving = 0;
     private static final int _creditBatch = 1024;
     private int _credit;
     private int _distributed;
-    private TrackerQueue _incoming = new TrackerQueue();
-    private TrackerQueue _outgoing = new TrackerQueue();
+    private TrackerImpl _incomingTracker;
+    private TrackerImpl _outgoingTracker;
+    private Store _incomingStore = new Store();
+    private Store _outgoingStore = new Store();
     private List<Connector> _awaitingDestruction = new ArrayList<Connector>();
+    private int _sendThreshold;
 
     private Transform _routes = new Transform();
     private Transform _rewrites = new Transform();
@@ -240,42 +244,68 @@ public class MessengerImpl implements Me
             _logger.fine(this + " about to put message: " + m);
         }
 
-        String addr = routeAddress(m.getAddress());
+        StoreEntry entry = _outgoingStore.put( m.getAddress() );
+        _outgoingTracker = new TrackerImpl(TrackerImpl.Type.OUTGOING,
+                                           _outgoingStore.trackEntry(entry));
+
+        String routedAddress = routeAddress(m.getAddress());
+        Address address = new Address(routedAddress);
+        if (address.getHost() == null)
+        {
+            throw new MessengerException("unable to send to address: " + routedAddress);
+        }
+
         rewriteMessage(m);
 
         try {
-            Address address = new Address(addr);
-            if (address.getHost() == null)
-            {
-                throw new MessengerException("unable to send to address: " + addr);
-            }
-            String ports = address.getPort() == null ? defaultPort(address.getScheme()) : address.getPort();
-            int port = Integer.valueOf(ports);
-            Sender sender = getLink(address.getHost(), port, new SenderFinder(address.getName()));
-
             adjustReplyTo(m);
 
-            byte[] tag = String.valueOf(_nextTag++).getBytes();
-            Delivery delivery = sender.delivery(tag);
             int encoded;
+            byte[] buffer = new byte[5*1024];
             while (true)
             {
                 try
                 {
-                    encoded = m.encode(_buffer, 0, _buffer.length);
+                    encoded = m.encode(buffer, 0, buffer.length);
                     break;
                 } catch (java.nio.BufferOverflowException e) {
-                    _buffer = new byte[_buffer.length*2];
+                    buffer = new byte[buffer.length*2];
                 }
             }
-            sender.send(_buffer, 0, encoded);
-            _outgoing.add(delivery);
-            sender.advance();
+            entry.setEncodedMsg( buffer, encoded );
         }
         finally
         {
             restoreMessage(m);
         }
+
+        String ports = address.getPort() == null ? defaultPort(address.getScheme()) : address.getPort();
+        int port = Integer.valueOf(ports);
+        Sender sender = getLink(address.getHost(), port, new SenderFinder(address.getName()));
+        pumpOut(m.getAddress(), sender);
+    }
+
+    private int pumpOut( String address, Sender sender )
+    {
+        StoreEntry entry = _outgoingStore.get( address );
+        if (entry == null) {
+            return 0;
+        }
+
+        byte[] tag = String.valueOf(_nextTag++).getBytes();
+        Delivery delivery = sender.delivery(tag);
+        entry.setDelivery( delivery );
+        _logger.log(Level.FINE, "Sending on delivery: " + delivery);
+        int n = sender.send( entry.getEncodedMsg(), 0, entry.getEncodedLength());
+        if (n < 0) {
+            _outgoingStore.freeEntry( entry );
+            _logger.log(Level.WARNING, "Send error: " + n);
+            return n;
+        } else {
+            sender.advance();
+            _outgoingStore.freeEntry( entry );
+            return 0;
+        }
     }
 
     public void send() throws TimeoutException
@@ -293,6 +323,16 @@ public class MessengerImpl implements Me
         {
             _logger.fine(this + " about to send");
         }
+
+        if (n == -1)
+            _sendThreshold = 0;
+        else
+        {
+            _sendThreshold = outgoing() - n;
+            if (_sendThreshold < 0)
+                _sendThreshold = 0;
+        }
+
         waitUntil(_sentSettled);
     }
 
@@ -325,44 +365,51 @@ public class MessengerImpl implements Me
 
     public Message get()
     {
-        if (_driver != null) {
-            for (Connector<?> c : _driver.connectors())
-            {
-                Connection connection = c.getConnection();
-                _logger.log(Level.FINE, "Attempting to get message from " + connection);
-                Delivery delivery = connection.getWorkHead();
-                while (delivery != null)
-                {
-                    if (delivery.isReadable() && !delivery.isPartial())
-                    {
-                        _logger.log(Level.FINE, "Readable delivery found: " + delivery);
-                        int size = read((Receiver) delivery.getLink());
-                        Message message = Proton.message();
-                        message.decode(_buffer, 0, size);
-                        delivery.getLink().advance();
-                        _incoming.add(delivery);
-                        _distributed--;
-                        return message;
-                    }
-                    else
-                    {
-                        _logger.log(Level.FINE, "Delivery not readable: " + delivery);
-                        delivery = delivery.getWorkNext();
-                    }
-                }
-            }
+        StoreEntry entry = _incomingStore.get( null );
+        if (entry != null)
+        {
+            Message message = Proton.message();
+            message.decode( entry.getEncodedMsg(), 0, entry.getEncodedLength() );
+
+            _incomingTracker = new TrackerImpl(TrackerImpl.Type.INCOMING,
+                                               _incomingStore.trackEntry(entry));
+
+            _incomingStore.freeEntry( entry );
+            _distributed--;
+            return message;
         }
 
         return null;
     }
 
+    private int pumpIn(String address, Receiver receiver)
+    {
+        Delivery delivery = receiver.current();
+        if (delivery.isReadable() && !delivery.isPartial())
+        {
+            StoreEntry entry = _incomingStore.put( address );
+            entry.setDelivery( delivery );
+
+            _logger.log(Level.FINE, "Readable delivery found: " + delivery);
+
+            int size = delivery.pending();
+            byte[] buffer = new byte[size];
+            int read = receiver.recv( buffer, 0, buffer.length );
+            if (read != size) {
+                throw new IllegalStateException();
+            }
+            entry.setEncodedMsg( buffer, size );
+            receiver.advance();
+        }
+        return 0;
+    }
+
     public void subscribe(String source) throws MessengerException
     {
         if (_driver == null) {
             throw new IllegalStateException("messenger is stopped");
         }
 
-
         String routed = routeAddress(source);
         Address address = new Address(routed);
 
@@ -390,68 +437,78 @@ public class MessengerImpl implements Me
 
     public int outgoing()
     {
-        return queued(true);
+        return _outgoingStore.size() + queued(true);
     }
 
     public int incoming()
     {
-        return queued(false);
+        return _incomingStore.size() + queued(false);
     }
 
-
     public int getIncomingWindow()
     {
-        return _incoming.getWindow();
+        return _incomingStore.getWindow();
     }
+
     public void setIncomingWindow(int window)
     {
-        _incoming.setWindow(window);
+        _incomingStore.setWindow(window);
     }
 
     public int getOutgoingWindow()
     {
-        return _outgoing.getWindow();
+        return _outgoingStore.getWindow();
     }
+
     public void setOutgoingWindow(int window)
     {
-        _outgoing.setWindow(window);
+        _outgoingStore.setWindow(window);
     }
 
     public Tracker incomingTracker()
     {
-        return new TrackerImpl(false, _incoming.getHighWaterMark() - 1);
+        return _incomingTracker;
     }
     public Tracker outgoingTracker()
     {
-        return new TrackerImpl(true, _outgoing.getHighWaterMark() - 1);
+        return _outgoingTracker;
     }
 
-    private TrackerQueue getTrackerQueue(Tracker tracker)
+    private Store getTrackerStore(Tracker tracker)
     {
-        return TrackerQueue.isOutgoing(tracker) ? _outgoing : _incoming;
+        return ((TrackerImpl)tracker).isOutgoing() ? _outgoingStore : _incomingStore;
     }
 
     @Override
     public void reject(Tracker tracker, int flags)
     {
-        getTrackerQueue(tracker).reject(tracker, flags);
+        int id = ((TrackerImpl)tracker).getSequence();
+        getTrackerStore(tracker).update(id, Status.REJECTED, flags, false, false);
     }
 
     @Override
     public void accept(Tracker tracker, int flags)
     {
-        getTrackerQueue(tracker).accept(tracker, flags);
+        int id = ((TrackerImpl)tracker).getSequence();
+        getTrackerStore(tracker).update(id, Status.ACCEPTED, flags, false, false);
     }
 
     @Override
     public void settle(Tracker tracker, int flags)
     {
-        getTrackerQueue(tracker).settle(tracker, flags);
+        int id = ((TrackerImpl)tracker).getSequence();
+        getTrackerStore(tracker).update(id, Status.UNKNOWN, flags, true, true);
     }
 
     public Status getStatus(Tracker tracker)
     {
-        return getTrackerQueue(tracker).getStatus(tracker);
+        int id = ((TrackerImpl)tracker).getSequence();
+        StoreEntry e = getTrackerStore(tracker).getEntry(id);
+        if (e != null)
+        {
+            return e.getStatus();
+        }
+        return Status.UNKNOWN;
     }
 
     @Override
@@ -489,28 +546,6 @@ public class MessengerImpl implements Me
         return count;
     }
 
-    private int read(Receiver receiver)
-    {
-        Delivery dlv = receiver.current();
-
-        if (dlv.isPartial()) {
-            throw new IllegalStateException();
-        }
-
-        int size = dlv.pending();
-
-        while (_buffer.length < size) {
-            _buffer = new byte[_buffer.length * 2];
-        }
-
-        int read = receiver.recv(_buffer, 0, _buffer.length);
-        if (read != size) {
-            throw new IllegalStateException();
-        }
-
-        return size;
-    }
-
     private void bringDestruction()
     {
         for (Connector<?> c : _awaitingDestruction)
@@ -606,15 +641,26 @@ public class MessengerImpl implements Me
         Delivery delivery = connection.getWorkHead();
         while (delivery != null)
         {
-            if (delivery.getLink() instanceof Sender && delivery.isUpdated())
+            Link link = delivery.getLink();
+            if (delivery.isUpdated())
             {
-                delivery.disposition(delivery.getRemoteState());
+                if (link instanceof Sender)
+                {
+                    delivery.disposition(delivery.getRemoteState());
+                }
+                StoreEntry e = (StoreEntry) delivery.getContext();
+                if (e != null) e.updated();
             }
+
+            if (delivery.isReadable())
+            {
+                pumpIn( link.getSource().getAddress(), (Receiver)link );
+            }
+
             Delivery next = delivery.getWorkNext();
             delivery.clear();
             delivery = next;
         }
-        _outgoing.slide();
 
         for (Session session : new Sessions(connection, UNINIT, ANY))
         {
@@ -632,6 +678,14 @@ public class MessengerImpl implements Me
 
         distributeCredit();
 
+        for (Link link : new Links(connection, ACTIVE, ACTIVE))
+        {
+            if (link instanceof Sender)
+            {
+                pumpOut(link.getTarget().getAddress(), (Sender)link);
+            }
+        }
+
         for (Link link : new Links(connection, ACTIVE, CLOSED))
         {
             link.close();
@@ -792,56 +846,50 @@ public class MessengerImpl implements Me
         public boolean test()
         {
             //are all sent messages settled?
+            int total = _outgoingStore.size();
+
             for (Connector<?> c : _driver.connectors())
             {
+                // TBD
+                // check if transport is done generating output
+                // pn_transport_t *transport = pn_connector_transport(ctor);
+                // if (transport) {
+                //    if (!pn_transport_quiesced(transport)) {
+                //        pn_connector_process(ctor);
+                //        return false;
+                //    }
+                // }
+
                 Connection connection = c.getConnection();
                 for (Link link : new Links(connection, ACTIVE, ANY))
                 {
                     if (link instanceof Sender)
                     {
-                        if (link.getQueued() > 0)
-                        {
-                            return false;
-                        }
-                        //TODO: Sender.unsettled() not yet implemented, when it is change to the following
-                        //if (checkSettled(link.unsettled())
-                        //{
-                        //    return false;
-                        //}
+                        total += link.getQueued();
                     }
                 }
-            }
-            //TODO: Sender.unsettled() not yet implemented, when it is change to the following
-            //return true;
-            return checkSettled(_outgoing.deliveries());
-        }
 
-        boolean checkSettled(Iterator<Delivery> unsettled)
-        {
-            if (unsettled != null)
-            {
-                while (unsettled.hasNext())
+                // TBD: there is no per-link unsettled
+                // deliveries iterator, so for now get the
+                // deliveries by walking the outgoing trackers
+                Iterator<StoreEntry> entries = _outgoingStore.trackedEntries();
+                while (entries.hasNext() && total <= _sendThreshold)
                 {
-                    Delivery d = unsettled.next();
-                    if (d == null)
-                    {
-                        break;
-                    }
-                    if (d.getRemoteState() != null || d.remotelySettled())
-                    {
-                        d.settle();
-                    }
-                    else if (d.getLink().getSession().getConnection().getRemoteState() == EndpointState.CLOSED)
-                    {
-                        continue;
-                    }
-                    else
+                    StoreEntry e = (StoreEntry) entries.next();
+                    if (e != null )
                     {
-                        return false;
+                        Delivery d = e.getDelivery();
+                        if (d != null)
+                        {
+                            if (d.getRemoteState() == null && !d.remotelySettled())
+                            {
+                                total++;
+                            }
+                        }
                     }
                 }
             }
-            return true;
+            return total <= _sendThreshold;
         }
     }
 
@@ -849,7 +897,8 @@ public class MessengerImpl implements Me
     {
         public boolean test()
         {
-            //do we have at least one message?
+            //do we have at least one pending message?
+            if (_incomingStore.size() > 0) return true;
             for (Connector<?> c : _driver.connectors())
             {
                 Connection connection = c.getConnection();
@@ -866,6 +915,10 @@ public class MessengerImpl implements Me
                     }
                 }
             }
+            // if no connections, or not listening, exit as there won't ever be a message
+            if (!_driver.listeners().iterator().hasNext() && !_driver.connectors().iterator().hasNext())
+                return true;
+
             return false;
         }
     }
@@ -939,6 +992,16 @@ public class MessengerImpl implements Me
             Target target = new Target();
             target.setAddress(_path);
             sender.setTarget(target);
+            // the C implemenation does this:
+            Source source = new Source();
+            source.setAddress(_path);
+            sender.setSource(source);
+            if (getOutgoingWindow() > 0)
+            {
+                // use explicit settlement via dispositions (not pre-settled)
+                sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+                sender.setReceiverSettleMode(ReceiverSettleMode.SECOND);  // desired
+            }
             return sender;
         }
     }
@@ -970,6 +1033,16 @@ public class MessengerImpl implements Me
             Source source = new Source();
             source.setAddress(_path);
             receiver.setSource(source);
+            // the C implemenation does this:
+            Target target = new Target();
+            target.setAddress(_path);
+            receiver.setTarget(target);
+            if (getIncomingWindow() > 0)
+            {
+                // use explicit settlement via dispositions (not pre-settled)
+                receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);  // desired
+                receiver.setReceiverSettleMode(ReceiverSettleMode.SECOND);
+            }
             return receiver;
         }
     }

Added: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/Store.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/Store.java?rev=1540104&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/Store.java (added)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/Store.java Fri Nov  8 16:17:27 2013
@@ -0,0 +1,210 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.qpid.proton.messenger.impl;
+
+import java.util.List;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.qpid.proton.messenger.Status;
+import org.apache.qpid.proton.messenger.Messenger;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+
+class Store
+{
+    private static final Accepted ACCEPTED = Accepted.getInstance();
+    private static final Rejected REJECTED = new Rejected();
+
+    private LinkedList<StoreEntry> _store = new LinkedList<StoreEntry>();
+    private HashMap<String, LinkedList<StoreEntry>> _stream = new HashMap<String, LinkedList<StoreEntry>>();
+
+    // for incoming/outgoing window tracking
+    int _window;
+    int _lwm;
+    int _hwm;
+    private HashMap<Integer, StoreEntry> _tracked = new HashMap<Integer, StoreEntry>();
+
+    Store()
+    {
+    }
+
+    private boolean isTracking( Integer id )
+    {
+        return id != null && (id.intValue() - _lwm >= 0) && (_hwm - id.intValue() > 0);
+    }
+
+    int size()
+    {
+        return _store.size();
+    }
+
+    int getWindow()
+    {
+        return _window;
+    }
+
+    void setWindow(int window)
+    {
+        _window = window;
+    }
+
+    StoreEntry put(String address)
+    {
+        if (address == null) address = "";
+        StoreEntry entry = new StoreEntry(this, address);
+        _store.add( entry );
+        LinkedList<StoreEntry> list = _stream.get( address );
+        if (list != null) {
+            list.add( entry );
+        } else {
+            list = new LinkedList<StoreEntry>();
+            list.add( entry );
+            _stream.put( address, list );
+        }
+        entry.stored();
+        return entry;
+    }
+
+    StoreEntry get(String address)
+    {
+        if (address != null) {
+            LinkedList<StoreEntry> list = _stream.get( address );
+            if (list != null) return list.peekFirst();
+        } else {
+            return _store.peekFirst();
+        }
+        return null;
+    }
+
+    StoreEntry getEntry(int id)
+    {
+        return _tracked.get(id);
+    }
+
+    Iterator<StoreEntry> trackedEntries()
+    {
+        return _tracked.values().iterator();
+    }
+
+    void freeEntry(StoreEntry entry)
+    {
+        if (entry.isStored()) {
+            _store.remove( entry );
+            LinkedList<StoreEntry> list = _stream.get( entry.getAddress() );
+            if (list != null) list.remove( entry );
+            entry.notStored();
+        }
+        // note well: may still be in _tracked map if still in window!
+    }
+
+    public int trackEntry(StoreEntry entry)
+    {
+        assert( entry.getStore() == this );
+        entry.setId(_hwm++);
+        _tracked.put(entry.getId(), entry);
+        slideWindow();
+        return entry.getId();
+    }
+
+    private void slideWindow()
+    {
+        if (_window >= 0)
+        {
+            while (_hwm - _lwm > _window)
+            {
+                StoreEntry old = getEntry(_lwm);
+                if (old != null)
+                {
+                    _tracked.remove( old.getId() );
+                    Delivery d = old.getDelivery();
+                    if (d != null) {
+                        if (d.getLocalState() == null)
+                            d.disposition(ACCEPTED);
+                        d.settle();
+                    }
+                }
+                _lwm++;
+            }
+        }
+    }
+
+    int update(int id, Status status, int flags, boolean settle, boolean match )
+    {
+        if (!isTracking(id)) return 0;
+
+        int start = (Messenger.CUMULATIVE & flags) != 0 ? _lwm : id;
+        for (int i = start; (id - i) >= 0; i++)
+        {
+            StoreEntry e = getEntry(i);
+            if (e != null)
+            {
+                Delivery d = e.getDelivery();
+                if (d != null)
+                {
+                    if (d.getLocalState() == null)
+                    {
+                        if (match)
+                        {
+                            d.disposition(d.getRemoteState());
+                        }
+                        else
+                        {
+                            switch (status)
+                            {
+                            case ACCEPTED:
+                                d.disposition(ACCEPTED);
+                                break;
+                            case REJECTED:
+                                d.disposition(REJECTED);
+                                break;
+                            default:
+                                break;
+                            }
+                        }
+                        e.updated();
+                    }
+                }
+                if (settle)
+                {
+                    if (d != null)
+                    {
+                        d.settle();
+                    }
+                    _tracked.remove(e.getId());
+                }
+            }
+        }
+
+        while (_hwm - _lwm > 0 && !_tracked.containsKey(_lwm))
+        {
+            _lwm++;
+        }
+
+        return 0;
+    }
+}
+
+

Propchange: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/Store.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/StoreEntry.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/StoreEntry.java?rev=1540104&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/StoreEntry.java (added)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/StoreEntry.java Fri Nov  8 16:17:27 2013
@@ -0,0 +1,173 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.qpid.proton.messenger.impl;
+
+import org.apache.qpid.proton.messenger.Tracker;
+import org.apache.qpid.proton.messenger.Status;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.Modified;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.messaging.Released;
+import org.apache.qpid.proton.amqp.messaging.Received;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+
+class StoreEntry
+{
+    private Store  _store;
+    private Integer _id;
+    private String _address;
+    private byte[] _encodedMsg;
+    private int _encodedLength;
+    private Delivery _delivery;
+    private Status _status = Status.UNKNOWN;
+    private Object _context;
+    private boolean _inStore = false;
+
+    public StoreEntry(Store store, String address)
+    {
+        _store = store;
+        _address = address;
+    }
+
+    public Store getStore()
+    {
+        return _store;
+    }
+
+    public boolean isStored()
+    {
+        return _inStore;
+    }
+
+    public void stored()
+    {
+        _inStore = true;
+    }
+
+    public void notStored()
+    {
+        _inStore = false;
+    }
+
+    public String getAddress()
+    {
+        return _address;
+    }
+
+    public byte[] getEncodedMsg()
+    {
+        return _encodedMsg;
+    }
+
+    public int getEncodedLength()
+    {
+        return _encodedLength;
+    }
+
+    public void setEncodedMsg( byte[] encodedMsg, int length )
+    {
+        _encodedMsg = encodedMsg;
+        _encodedLength = length;
+    }
+
+    public void setId(int id)
+    {
+        _id = new Integer(id);
+    }
+
+    public Integer getId()
+    {
+        return _id;
+    }
+
+    public void setDelivery( Delivery d )
+    {
+        if (_delivery != null)
+        {
+            _delivery.setContext(null);
+        }
+        _delivery = d;
+        if (_delivery != null)
+        {
+            _delivery.setContext(this);
+        }
+        updated();
+    }
+
+    public Delivery getDelivery()
+    {
+        return _delivery;
+    }
+
+    public Status getStatus()
+    {
+        return _status;
+    }
+
+
+    private static Status _disp2status(DeliveryState disp)
+    {
+        if (disp == null) return Status.PENDING;
+
+        if (disp instanceof Received)
+            return Status.PENDING;
+        if (disp instanceof Accepted)
+            return Status.ACCEPTED;
+        if (disp instanceof Rejected)
+            return Status.REJECTED;
+        if (disp instanceof Released)
+            return Status.PENDING;
+        if (disp instanceof Modified)
+            return Status.MODIFIED;
+        assert(false);
+        return null;
+    }
+
+    public void updated()
+    {
+        if (_delivery != null)
+        {
+            if (_delivery.getRemoteState() != null)
+            {
+                _status = _disp2status(_delivery.getRemoteState());
+            }
+            else if (_delivery.remotelySettled())
+            {
+                _status = _disp2status(_delivery.getLocalState());
+            }
+            else
+            {
+                _status = Status.PENDING;
+            }
+        }
+    }
+
+    public void setContext(Object context)
+    {
+        _context = context;
+    }
+
+    public Object getContext()
+    {
+        return _context;
+    }
+}

Propchange: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/StoreEntry.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerImpl.java?rev=1540104&r1=1540103&r2=1540104&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerImpl.java Fri Nov  8 16:17:27 2013
@@ -24,18 +24,23 @@ import org.apache.qpid.proton.messenger.
 
 class TrackerImpl implements Tracker
 {
-    private boolean _outgoing;
+    public enum Type {
+        OUTGOING,
+        INCOMING
+    }
+
+    private Type _type;
     private int _sequence;
 
-    TrackerImpl(boolean outgoing, int sequence)
+    TrackerImpl(Type type, int sequence)
     {
-        _outgoing = outgoing;
+        _type = type;
         _sequence = sequence;
     }
 
     boolean isOutgoing()
     {
-        return _outgoing;
+        return _type == Type.OUTGOING;
     }
 
     int getSequence()
@@ -45,6 +50,6 @@ class TrackerImpl implements Tracker
 
     public String toString()
     {
-        return (_outgoing ? "O:" : "I:") + Integer.toString(_sequence);
+        return (isOutgoing() ? "O:" : "I:") + Integer.toString(_sequence);
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org