You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2013/11/08 17:17:28 UTC
svn commit: r1540104 - in /qpid/proton/trunk: proton-c/src/messenger/
proton-j/proton-api/src/main/java/org/apache/qpid/proton/amqp/transaction/
proton-j/proton-api/src/main/java/org/apache/qpid/proton/amqp/transport/
proton-j/proton-api/src/main/java/...
Author: kgiusti
Date: Fri Nov 8 16:17:27 2013
New Revision: 1540104
URL: http://svn.apache.org/r1540104
Log:
PROTON-444: port internal store from C implementation
Added:
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/Store.java (with props)
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/StoreEntry.java (with props)
Removed:
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerQueue.java
Modified:
qpid/proton/trunk/proton-c/src/messenger/store.c
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/amqp/transaction/Coordinator.java
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/amqp/transport/Source.java
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/amqp/transport/Target.java
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Status.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerImpl.java
Modified: qpid/proton/trunk/proton-c/src/messenger/store.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/store.c?rev=1540104&r1=1540103&r2=1540104&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger/store.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger/store.c Fri Nov 8 16:17:27 2013
@@ -220,6 +220,7 @@ pni_entry_t *pni_store_put(pni_store_t *
entry->store_prev = NULL;
entry->delivery = NULL;
entry->bytes = pn_buffer(64);
+ entry->status = PN_STATUS_UNKNOWN;
LL_ADD(stream, stream, entry);
LL_ADD(store, store, entry);
store->size++;
Modified: qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/amqp/transaction/Coordinator.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/amqp/transaction/Coordinator.java?rev=1540104&r1=1540103&r2=1540104&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/amqp/transaction/Coordinator.java (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/amqp/transaction/Coordinator.java Fri Nov 8 16:17:27 2013
@@ -50,5 +50,10 @@ public final class Coordinator
"capabilities=" + (_capabilities == null ? null : Arrays.asList(_capabilities)) +
'}';
}
+
+ public String getAddress()
+ {
+ return null;
+ }
}
-
\ No newline at end of file
+
Modified: qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/amqp/transport/Source.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/amqp/transport/Source.java?rev=1540104&r1=1540103&r2=1540104&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/amqp/transport/Source.java (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/amqp/transport/Source.java Fri Nov 8 16:17:27 2013
@@ -22,4 +22,5 @@ package org.apache.qpid.proton.amqp.tran
public interface Source
{
+ public String getAddress();
}
Modified: qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/amqp/transport/Target.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/amqp/transport/Target.java?rev=1540104&r1=1540103&r2=1540104&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/amqp/transport/Target.java (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/amqp/transport/Target.java Fri Nov 8 16:17:27 2013
@@ -23,4 +23,5 @@ package org.apache.qpid.proton.amqp.tran
public interface Target
{
+ public String getAddress();
}
Modified: qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Status.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Status.java?rev=1540104&r1=1540103&r2=1540104&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Status.java (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Status.java Fri Nov 8 16:17:27 2013
@@ -25,5 +25,6 @@ public enum Status
UNKNOWN,
PENDING,
ACCEPTED,
- REJECTED
+ REJECTED,
+ MODIFIED
}
Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java?rev=1540104&r1=1540103&r2=1540104&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java Fri Nov 8 16:17:27 2013
@@ -169,7 +169,12 @@ public class DeliveryImpl implements Del
public DeliveryImpl getWorkNext()
{
- return _workNext;
+ if (_workNext != null)
+ return _workNext;
+ // the following hack is brought to you by the C implementation!
+ if (!_work) // not on the work list
+ return _link.getConnectionImpl().getWorkHead();
+ return null;
}
DeliveryImpl getWorkPrev()
Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java?rev=1540104&r1=1540103&r2=1540104&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java Fri Nov 8 16:17:27 2013
@@ -54,6 +54,8 @@ import org.apache.qpid.proton.messenger.
import org.apache.qpid.proton.messenger.Tracker;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.amqp.Binary;
@@ -70,15 +72,17 @@ public class MessengerImpl implements Me
private long _timeout = -1;
private boolean _blocking = true;
private long _nextTag = 1;
- private byte[] _buffer = new byte[5*1024];
private Driver _driver;
private int _receiving = 0;
private static final int _creditBatch = 1024;
private int _credit;
private int _distributed;
- private TrackerQueue _incoming = new TrackerQueue();
- private TrackerQueue _outgoing = new TrackerQueue();
+ private TrackerImpl _incomingTracker;
+ private TrackerImpl _outgoingTracker;
+ private Store _incomingStore = new Store();
+ private Store _outgoingStore = new Store();
private List<Connector> _awaitingDestruction = new ArrayList<Connector>();
+ private int _sendThreshold;
private Transform _routes = new Transform();
private Transform _rewrites = new Transform();
@@ -240,42 +244,68 @@ public class MessengerImpl implements Me
_logger.fine(this + " about to put message: " + m);
}
- String addr = routeAddress(m.getAddress());
+ StoreEntry entry = _outgoingStore.put( m.getAddress() );
+ _outgoingTracker = new TrackerImpl(TrackerImpl.Type.OUTGOING,
+ _outgoingStore.trackEntry(entry));
+
+ String routedAddress = routeAddress(m.getAddress());
+ Address address = new Address(routedAddress);
+ if (address.getHost() == null)
+ {
+ throw new MessengerException("unable to send to address: " + routedAddress);
+ }
+
rewriteMessage(m);
try {
- Address address = new Address(addr);
- if (address.getHost() == null)
- {
- throw new MessengerException("unable to send to address: " + addr);
- }
- String ports = address.getPort() == null ? defaultPort(address.getScheme()) : address.getPort();
- int port = Integer.valueOf(ports);
- Sender sender = getLink(address.getHost(), port, new SenderFinder(address.getName()));
-
adjustReplyTo(m);
- byte[] tag = String.valueOf(_nextTag++).getBytes();
- Delivery delivery = sender.delivery(tag);
int encoded;
+ byte[] buffer = new byte[5*1024];
while (true)
{
try
{
- encoded = m.encode(_buffer, 0, _buffer.length);
+ encoded = m.encode(buffer, 0, buffer.length);
break;
} catch (java.nio.BufferOverflowException e) {
- _buffer = new byte[_buffer.length*2];
+ buffer = new byte[buffer.length*2];
}
}
- sender.send(_buffer, 0, encoded);
- _outgoing.add(delivery);
- sender.advance();
+ entry.setEncodedMsg( buffer, encoded );
}
finally
{
restoreMessage(m);
}
+
+ String ports = address.getPort() == null ? defaultPort(address.getScheme()) : address.getPort();
+ int port = Integer.valueOf(ports);
+ Sender sender = getLink(address.getHost(), port, new SenderFinder(address.getName()));
+ pumpOut(m.getAddress(), sender);
+ }
+
+ private int pumpOut( String address, Sender sender )
+ {
+ StoreEntry entry = _outgoingStore.get( address );
+ if (entry == null) {
+ return 0;
+ }
+
+ byte[] tag = String.valueOf(_nextTag++).getBytes();
+ Delivery delivery = sender.delivery(tag);
+ entry.setDelivery( delivery );
+ _logger.log(Level.FINE, "Sending on delivery: " + delivery);
+ int n = sender.send( entry.getEncodedMsg(), 0, entry.getEncodedLength());
+ if (n < 0) {
+ _outgoingStore.freeEntry( entry );
+ _logger.log(Level.WARNING, "Send error: " + n);
+ return n;
+ } else {
+ sender.advance();
+ _outgoingStore.freeEntry( entry );
+ return 0;
+ }
}
public void send() throws TimeoutException
@@ -293,6 +323,16 @@ public class MessengerImpl implements Me
{
_logger.fine(this + " about to send");
}
+
+ if (n == -1)
+ _sendThreshold = 0;
+ else
+ {
+ _sendThreshold = outgoing() - n;
+ if (_sendThreshold < 0)
+ _sendThreshold = 0;
+ }
+
waitUntil(_sentSettled);
}
@@ -325,44 +365,51 @@ public class MessengerImpl implements Me
public Message get()
{
- if (_driver != null) {
- for (Connector<?> c : _driver.connectors())
- {
- Connection connection = c.getConnection();
- _logger.log(Level.FINE, "Attempting to get message from " + connection);
- Delivery delivery = connection.getWorkHead();
- while (delivery != null)
- {
- if (delivery.isReadable() && !delivery.isPartial())
- {
- _logger.log(Level.FINE, "Readable delivery found: " + delivery);
- int size = read((Receiver) delivery.getLink());
- Message message = Proton.message();
- message.decode(_buffer, 0, size);
- delivery.getLink().advance();
- _incoming.add(delivery);
- _distributed--;
- return message;
- }
- else
- {
- _logger.log(Level.FINE, "Delivery not readable: " + delivery);
- delivery = delivery.getWorkNext();
- }
- }
- }
+ StoreEntry entry = _incomingStore.get( null );
+ if (entry != null)
+ {
+ Message message = Proton.message();
+ message.decode( entry.getEncodedMsg(), 0, entry.getEncodedLength() );
+
+ _incomingTracker = new TrackerImpl(TrackerImpl.Type.INCOMING,
+ _incomingStore.trackEntry(entry));
+
+ _incomingStore.freeEntry( entry );
+ _distributed--;
+ return message;
}
return null;
}
+ private int pumpIn(String address, Receiver receiver)
+ {
+ Delivery delivery = receiver.current();
+ if (delivery.isReadable() && !delivery.isPartial())
+ {
+ StoreEntry entry = _incomingStore.put( address );
+ entry.setDelivery( delivery );
+
+ _logger.log(Level.FINE, "Readable delivery found: " + delivery);
+
+ int size = delivery.pending();
+ byte[] buffer = new byte[size];
+ int read = receiver.recv( buffer, 0, buffer.length );
+ if (read != size) {
+ throw new IllegalStateException();
+ }
+ entry.setEncodedMsg( buffer, size );
+ receiver.advance();
+ }
+ return 0;
+ }
+
public void subscribe(String source) throws MessengerException
{
if (_driver == null) {
throw new IllegalStateException("messenger is stopped");
}
-
String routed = routeAddress(source);
Address address = new Address(routed);
@@ -390,68 +437,78 @@ public class MessengerImpl implements Me
public int outgoing()
{
- return queued(true);
+ return _outgoingStore.size() + queued(true);
}
public int incoming()
{
- return queued(false);
+ return _incomingStore.size() + queued(false);
}
-
public int getIncomingWindow()
{
- return _incoming.getWindow();
+ return _incomingStore.getWindow();
}
+
public void setIncomingWindow(int window)
{
- _incoming.setWindow(window);
+ _incomingStore.setWindow(window);
}
public int getOutgoingWindow()
{
- return _outgoing.getWindow();
+ return _outgoingStore.getWindow();
}
+
public void setOutgoingWindow(int window)
{
- _outgoing.setWindow(window);
+ _outgoingStore.setWindow(window);
}
public Tracker incomingTracker()
{
- return new TrackerImpl(false, _incoming.getHighWaterMark() - 1);
+ return _incomingTracker;
}
public Tracker outgoingTracker()
{
- return new TrackerImpl(true, _outgoing.getHighWaterMark() - 1);
+ return _outgoingTracker;
}
- private TrackerQueue getTrackerQueue(Tracker tracker)
+ private Store getTrackerStore(Tracker tracker)
{
- return TrackerQueue.isOutgoing(tracker) ? _outgoing : _incoming;
+ return ((TrackerImpl)tracker).isOutgoing() ? _outgoingStore : _incomingStore;
}
@Override
public void reject(Tracker tracker, int flags)
{
- getTrackerQueue(tracker).reject(tracker, flags);
+ int id = ((TrackerImpl)tracker).getSequence();
+ getTrackerStore(tracker).update(id, Status.REJECTED, flags, false, false);
}
@Override
public void accept(Tracker tracker, int flags)
{
- getTrackerQueue(tracker).accept(tracker, flags);
+ int id = ((TrackerImpl)tracker).getSequence();
+ getTrackerStore(tracker).update(id, Status.ACCEPTED, flags, false, false);
}
@Override
public void settle(Tracker tracker, int flags)
{
- getTrackerQueue(tracker).settle(tracker, flags);
+ int id = ((TrackerImpl)tracker).getSequence();
+ getTrackerStore(tracker).update(id, Status.UNKNOWN, flags, true, true);
}
public Status getStatus(Tracker tracker)
{
- return getTrackerQueue(tracker).getStatus(tracker);
+ int id = ((TrackerImpl)tracker).getSequence();
+ StoreEntry e = getTrackerStore(tracker).getEntry(id);
+ if (e != null)
+ {
+ return e.getStatus();
+ }
+ return Status.UNKNOWN;
}
@Override
@@ -489,28 +546,6 @@ public class MessengerImpl implements Me
return count;
}
- private int read(Receiver receiver)
- {
- Delivery dlv = receiver.current();
-
- if (dlv.isPartial()) {
- throw new IllegalStateException();
- }
-
- int size = dlv.pending();
-
- while (_buffer.length < size) {
- _buffer = new byte[_buffer.length * 2];
- }
-
- int read = receiver.recv(_buffer, 0, _buffer.length);
- if (read != size) {
- throw new IllegalStateException();
- }
-
- return size;
- }
-
private void bringDestruction()
{
for (Connector<?> c : _awaitingDestruction)
@@ -606,15 +641,26 @@ public class MessengerImpl implements Me
Delivery delivery = connection.getWorkHead();
while (delivery != null)
{
- if (delivery.getLink() instanceof Sender && delivery.isUpdated())
+ Link link = delivery.getLink();
+ if (delivery.isUpdated())
{
- delivery.disposition(delivery.getRemoteState());
+ if (link instanceof Sender)
+ {
+ delivery.disposition(delivery.getRemoteState());
+ }
+ StoreEntry e = (StoreEntry) delivery.getContext();
+ if (e != null) e.updated();
}
+
+ if (delivery.isReadable())
+ {
+ pumpIn( link.getSource().getAddress(), (Receiver)link );
+ }
+
Delivery next = delivery.getWorkNext();
delivery.clear();
delivery = next;
}
- _outgoing.slide();
for (Session session : new Sessions(connection, UNINIT, ANY))
{
@@ -632,6 +678,14 @@ public class MessengerImpl implements Me
distributeCredit();
+ for (Link link : new Links(connection, ACTIVE, ACTIVE))
+ {
+ if (link instanceof Sender)
+ {
+ pumpOut(link.getTarget().getAddress(), (Sender)link);
+ }
+ }
+
for (Link link : new Links(connection, ACTIVE, CLOSED))
{
link.close();
@@ -792,56 +846,50 @@ public class MessengerImpl implements Me
public boolean test()
{
//are all sent messages settled?
+ int total = _outgoingStore.size();
+
for (Connector<?> c : _driver.connectors())
{
+ // TBD
+ // check if transport is done generating output
+ // pn_transport_t *transport = pn_connector_transport(ctor);
+ // if (transport) {
+ // if (!pn_transport_quiesced(transport)) {
+ // pn_connector_process(ctor);
+ // return false;
+ // }
+ // }
+
Connection connection = c.getConnection();
for (Link link : new Links(connection, ACTIVE, ANY))
{
if (link instanceof Sender)
{
- if (link.getQueued() > 0)
- {
- return false;
- }
- //TODO: Sender.unsettled() not yet implemented, when it is change to the following
- //if (checkSettled(link.unsettled())
- //{
- // return false;
- //}
+ total += link.getQueued();
}
}
- }
- //TODO: Sender.unsettled() not yet implemented, when it is change to the following
- //return true;
- return checkSettled(_outgoing.deliveries());
- }
- boolean checkSettled(Iterator<Delivery> unsettled)
- {
- if (unsettled != null)
- {
- while (unsettled.hasNext())
+ // TBD: there is no per-link unsettled
+ // deliveries iterator, so for now get the
+ // deliveries by walking the outgoing trackers
+ Iterator<StoreEntry> entries = _outgoingStore.trackedEntries();
+ while (entries.hasNext() && total <= _sendThreshold)
{
- Delivery d = unsettled.next();
- if (d == null)
- {
- break;
- }
- if (d.getRemoteState() != null || d.remotelySettled())
- {
- d.settle();
- }
- else if (d.getLink().getSession().getConnection().getRemoteState() == EndpointState.CLOSED)
- {
- continue;
- }
- else
+ StoreEntry e = (StoreEntry) entries.next();
+ if (e != null )
{
- return false;
+ Delivery d = e.getDelivery();
+ if (d != null)
+ {
+ if (d.getRemoteState() == null && !d.remotelySettled())
+ {
+ total++;
+ }
+ }
}
}
}
- return true;
+ return total <= _sendThreshold;
}
}
@@ -849,7 +897,8 @@ public class MessengerImpl implements Me
{
public boolean test()
{
- //do we have at least one message?
+ //do we have at least one pending message?
+ if (_incomingStore.size() > 0) return true;
for (Connector<?> c : _driver.connectors())
{
Connection connection = c.getConnection();
@@ -866,6 +915,10 @@ public class MessengerImpl implements Me
}
}
}
+ // if no connections, or not listening, exit as there won't ever be a message
+ if (!_driver.listeners().iterator().hasNext() && !_driver.connectors().iterator().hasNext())
+ return true;
+
return false;
}
}
@@ -939,6 +992,16 @@ public class MessengerImpl implements Me
Target target = new Target();
target.setAddress(_path);
sender.setTarget(target);
+ // the C implemenation does this:
+ Source source = new Source();
+ source.setAddress(_path);
+ sender.setSource(source);
+ if (getOutgoingWindow() > 0)
+ {
+ // use explicit settlement via dispositions (not pre-settled)
+ sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+ sender.setReceiverSettleMode(ReceiverSettleMode.SECOND); // desired
+ }
return sender;
}
}
@@ -970,6 +1033,16 @@ public class MessengerImpl implements Me
Source source = new Source();
source.setAddress(_path);
receiver.setSource(source);
+ // the C implemenation does this:
+ Target target = new Target();
+ target.setAddress(_path);
+ receiver.setTarget(target);
+ if (getIncomingWindow() > 0)
+ {
+ // use explicit settlement via dispositions (not pre-settled)
+ receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED); // desired
+ receiver.setReceiverSettleMode(ReceiverSettleMode.SECOND);
+ }
return receiver;
}
}
Added: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/Store.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/Store.java?rev=1540104&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/Store.java (added)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/Store.java Fri Nov 8 16:17:27 2013
@@ -0,0 +1,210 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.qpid.proton.messenger.impl;
+
+import java.util.List;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.qpid.proton.messenger.Status;
+import org.apache.qpid.proton.messenger.Messenger;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+
+class Store
+{
+ private static final Accepted ACCEPTED = Accepted.getInstance();
+ private static final Rejected REJECTED = new Rejected();
+
+ private LinkedList<StoreEntry> _store = new LinkedList<StoreEntry>();
+ private HashMap<String, LinkedList<StoreEntry>> _stream = new HashMap<String, LinkedList<StoreEntry>>();
+
+ // for incoming/outgoing window tracking
+ int _window;
+ int _lwm;
+ int _hwm;
+ private HashMap<Integer, StoreEntry> _tracked = new HashMap<Integer, StoreEntry>();
+
+ Store()
+ {
+ }
+
+ private boolean isTracking( Integer id )
+ {
+ return id != null && (id.intValue() - _lwm >= 0) && (_hwm - id.intValue() > 0);
+ }
+
+ int size()
+ {
+ return _store.size();
+ }
+
+ int getWindow()
+ {
+ return _window;
+ }
+
+ void setWindow(int window)
+ {
+ _window = window;
+ }
+
+ StoreEntry put(String address)
+ {
+ if (address == null) address = "";
+ StoreEntry entry = new StoreEntry(this, address);
+ _store.add( entry );
+ LinkedList<StoreEntry> list = _stream.get( address );
+ if (list != null) {
+ list.add( entry );
+ } else {
+ list = new LinkedList<StoreEntry>();
+ list.add( entry );
+ _stream.put( address, list );
+ }
+ entry.stored();
+ return entry;
+ }
+
+ StoreEntry get(String address)
+ {
+ if (address != null) {
+ LinkedList<StoreEntry> list = _stream.get( address );
+ if (list != null) return list.peekFirst();
+ } else {
+ return _store.peekFirst();
+ }
+ return null;
+ }
+
+ StoreEntry getEntry(int id)
+ {
+ return _tracked.get(id);
+ }
+
+ Iterator<StoreEntry> trackedEntries()
+ {
+ return _tracked.values().iterator();
+ }
+
+ void freeEntry(StoreEntry entry)
+ {
+ if (entry.isStored()) {
+ _store.remove( entry );
+ LinkedList<StoreEntry> list = _stream.get( entry.getAddress() );
+ if (list != null) list.remove( entry );
+ entry.notStored();
+ }
+ // note well: may still be in _tracked map if still in window!
+ }
+
+ public int trackEntry(StoreEntry entry)
+ {
+ assert( entry.getStore() == this );
+ entry.setId(_hwm++);
+ _tracked.put(entry.getId(), entry);
+ slideWindow();
+ return entry.getId();
+ }
+
+ private void slideWindow()
+ {
+ if (_window >= 0)
+ {
+ while (_hwm - _lwm > _window)
+ {
+ StoreEntry old = getEntry(_lwm);
+ if (old != null)
+ {
+ _tracked.remove( old.getId() );
+ Delivery d = old.getDelivery();
+ if (d != null) {
+ if (d.getLocalState() == null)
+ d.disposition(ACCEPTED);
+ d.settle();
+ }
+ }
+ _lwm++;
+ }
+ }
+ }
+
+ int update(int id, Status status, int flags, boolean settle, boolean match )
+ {
+ if (!isTracking(id)) return 0;
+
+ int start = (Messenger.CUMULATIVE & flags) != 0 ? _lwm : id;
+ for (int i = start; (id - i) >= 0; i++)
+ {
+ StoreEntry e = getEntry(i);
+ if (e != null)
+ {
+ Delivery d = e.getDelivery();
+ if (d != null)
+ {
+ if (d.getLocalState() == null)
+ {
+ if (match)
+ {
+ d.disposition(d.getRemoteState());
+ }
+ else
+ {
+ switch (status)
+ {
+ case ACCEPTED:
+ d.disposition(ACCEPTED);
+ break;
+ case REJECTED:
+ d.disposition(REJECTED);
+ break;
+ default:
+ break;
+ }
+ }
+ e.updated();
+ }
+ }
+ if (settle)
+ {
+ if (d != null)
+ {
+ d.settle();
+ }
+ _tracked.remove(e.getId());
+ }
+ }
+ }
+
+ while (_hwm - _lwm > 0 && !_tracked.containsKey(_lwm))
+ {
+ _lwm++;
+ }
+
+ return 0;
+ }
+}
+
+
Propchange: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/Store.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/StoreEntry.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/StoreEntry.java?rev=1540104&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/StoreEntry.java (added)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/StoreEntry.java Fri Nov 8 16:17:27 2013
@@ -0,0 +1,173 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.qpid.proton.messenger.impl;
+
+import org.apache.qpid.proton.messenger.Tracker;
+import org.apache.qpid.proton.messenger.Status;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.Modified;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.messaging.Released;
+import org.apache.qpid.proton.amqp.messaging.Received;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+
+class StoreEntry
+{
+ private Store _store;
+ private Integer _id;
+ private String _address;
+ private byte[] _encodedMsg;
+ private int _encodedLength;
+ private Delivery _delivery;
+ private Status _status = Status.UNKNOWN;
+ private Object _context;
+ private boolean _inStore = false;
+
+ public StoreEntry(Store store, String address)
+ {
+ _store = store;
+ _address = address;
+ }
+
+ public Store getStore()
+ {
+ return _store;
+ }
+
+ public boolean isStored()
+ {
+ return _inStore;
+ }
+
+ public void stored()
+ {
+ _inStore = true;
+ }
+
+ public void notStored()
+ {
+ _inStore = false;
+ }
+
+ public String getAddress()
+ {
+ return _address;
+ }
+
+ public byte[] getEncodedMsg()
+ {
+ return _encodedMsg;
+ }
+
+ public int getEncodedLength()
+ {
+ return _encodedLength;
+ }
+
+ public void setEncodedMsg( byte[] encodedMsg, int length )
+ {
+ _encodedMsg = encodedMsg;
+ _encodedLength = length;
+ }
+
+ public void setId(int id)
+ {
+ _id = new Integer(id);
+ }
+
+ public Integer getId()
+ {
+ return _id;
+ }
+
+ public void setDelivery( Delivery d )
+ {
+ if (_delivery != null)
+ {
+ _delivery.setContext(null);
+ }
+ _delivery = d;
+ if (_delivery != null)
+ {
+ _delivery.setContext(this);
+ }
+ updated();
+ }
+
+ public Delivery getDelivery()
+ {
+ return _delivery;
+ }
+
+ public Status getStatus()
+ {
+ return _status;
+ }
+
+
+ private static Status _disp2status(DeliveryState disp)
+ {
+ if (disp == null) return Status.PENDING;
+
+ if (disp instanceof Received)
+ return Status.PENDING;
+ if (disp instanceof Accepted)
+ return Status.ACCEPTED;
+ if (disp instanceof Rejected)
+ return Status.REJECTED;
+ if (disp instanceof Released)
+ return Status.PENDING;
+ if (disp instanceof Modified)
+ return Status.MODIFIED;
+ assert(false);
+ return null;
+ }
+
+ public void updated()
+ {
+ if (_delivery != null)
+ {
+ if (_delivery.getRemoteState() != null)
+ {
+ _status = _disp2status(_delivery.getRemoteState());
+ }
+ else if (_delivery.remotelySettled())
+ {
+ _status = _disp2status(_delivery.getLocalState());
+ }
+ else
+ {
+ _status = Status.PENDING;
+ }
+ }
+ }
+
+ public void setContext(Object context)
+ {
+ _context = context;
+ }
+
+ public Object getContext()
+ {
+ return _context;
+ }
+}
Propchange: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/StoreEntry.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerImpl.java?rev=1540104&r1=1540103&r2=1540104&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerImpl.java Fri Nov 8 16:17:27 2013
@@ -24,18 +24,23 @@ import org.apache.qpid.proton.messenger.
class TrackerImpl implements Tracker
{
- private boolean _outgoing;
+ public enum Type {
+ OUTGOING,
+ INCOMING
+ }
+
+ private Type _type;
private int _sequence;
- TrackerImpl(boolean outgoing, int sequence)
+ TrackerImpl(Type type, int sequence)
{
- _outgoing = outgoing;
+ _type = type;
_sequence = sequence;
}
boolean isOutgoing()
{
- return _outgoing;
+ return _type == Type.OUTGOING;
}
int getSequence()
@@ -45,6 +50,6 @@ class TrackerImpl implements Tracker
public String toString()
{
- return (_outgoing ? "O:" : "I:") + Integer.toString(_sequence);
+ return (isOutgoing() ? "O:" : "I:") + Integer.toString(_sequence);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org