You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2017/01/09 15:07:22 UTC
[09/30] qpid-proton git commit: PROTON-1385: remove proton-j from the
existing repo, it now has its own repo at:
https://git-wip-us.apache.org/repos/asf/qpid-proton-j.git
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
deleted file mode 100644
index e7c9d9e..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
+++ /dev/null
@@ -1,1555 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.qpid.proton.messenger.impl;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.qpid.proton.Proton;
-import org.apache.qpid.proton.InterruptException;
-import org.apache.qpid.proton.TimeoutException;
-import org.apache.qpid.proton.driver.Connector;
-import org.apache.qpid.proton.driver.Driver;
-import org.apache.qpid.proton.driver.Listener;
-import org.apache.qpid.proton.engine.Connection;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.engine.EndpointState;
-import org.apache.qpid.proton.engine.Link;
-import org.apache.qpid.proton.engine.Receiver;
-import org.apache.qpid.proton.engine.Sasl;
-import org.apache.qpid.proton.engine.Sender;
-import org.apache.qpid.proton.engine.Session;
-import org.apache.qpid.proton.engine.SslDomain;
-import org.apache.qpid.proton.engine.Ssl;
-import org.apache.qpid.proton.engine.Transport;
-import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.messenger.Messenger;
-import org.apache.qpid.proton.messenger.MessengerException;
-import org.apache.qpid.proton.messenger.Status;
-import org.apache.qpid.proton.messenger.Tracker;
-import org.apache.qpid.proton.amqp.messaging.Source;
-import org.apache.qpid.proton.amqp.messaging.Target;
-import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
-import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
-import org.apache.qpid.proton.amqp.Binary;
-
-/**
- * @deprecated Messenger will be removed from upcoming proton-j releases.
- */
-@Deprecated
-public class MessengerImpl implements Messenger
-{
- private enum LinkCreditMode
- {
- // method for replenishing credit
- LINK_CREDIT_EXPLICIT, // recv(N)
- LINK_CREDIT_AUTO; // recv()
- }
-
- private static final EnumSet<EndpointState> UNINIT = EnumSet.of(EndpointState.UNINITIALIZED);
- private static final EnumSet<EndpointState> ACTIVE = EnumSet.of(EndpointState.ACTIVE);
- private static final EnumSet<EndpointState> CLOSED = EnumSet.of(EndpointState.CLOSED);
- private static final EnumSet<EndpointState> ANY = EnumSet.allOf(EndpointState.class);
-
- private final Logger _logger = Logger.getLogger("proton.messenger");
- private final String _name;
- private long _timeout = -1;
- private boolean _blocking = true;
- private long _nextTag = 1;
- private Driver _driver;
- private LinkCreditMode _credit_mode = LinkCreditMode.LINK_CREDIT_EXPLICIT;
- private final int _credit_batch = 1024; // credit_mode == LINK_CREDIT_AUTO
- private int _credit; // available
- private int _distributed; // outstanding credit
- private int _receivers; // total # receiver Links
- private int _draining; // # Links in drain state
- private List<Receiver> _credited = new ArrayList<Receiver>();
- private List<Receiver> _blocked = new ArrayList<Receiver>();
- private long _next_drain;
- private TrackerImpl _incomingTracker;
- private TrackerImpl _outgoingTracker;
- private Store _incomingStore = new Store();
- private Store _outgoingStore = new Store();
- private List<Connector> _awaitingDestruction = new ArrayList<Connector>();
- private int _sendThreshold;
-
- private Transform _routes = new Transform();
- private Transform _rewrites = new Transform();
-
- private String _certificate;
- private String _privateKey;
- private String _password;
- private String _trustedDb;
-
-
- /**
- * @deprecated Messenger will be removed from upcoming proton-j releases.
- */
- @Deprecated public MessengerImpl()
- {
- this(java.util.UUID.randomUUID().toString());
- }
-
- /**
- * @deprecated Messenger will be removed from upcoming proton-j releases.
- */
- @Deprecated public MessengerImpl(String name)
- {
- _name = name;
- }
-
- public void setTimeout(long timeInMillis)
- {
- _timeout = timeInMillis;
- }
-
- public long getTimeout()
- {
- return _timeout;
- }
-
- public boolean isBlocking()
- {
- return _blocking;
- }
-
- public void setBlocking(boolean b)
- {
- _blocking = b;
- }
-
- public void setCertificate(String certificate)
- {
- _certificate = certificate;
- }
-
- public String getCertificate()
- {
- return _certificate;
- }
-
- public void setPrivateKey(String privateKey)
- {
- _privateKey = privateKey;
- }
-
- public String getPrivateKey()
- {
- return _privateKey;
- }
-
- public void setPassword(String password)
- {
- _password = password;
- }
-
- public String getPassword()
- {
- return _password;
- }
-
- public void setTrustedCertificates(String trusted)
- {
- _trustedDb = trusted;
- }
-
- public String getTrustedCertificates()
- {
- return _trustedDb;
- }
-
- public void start() throws IOException
- {
- _driver = Proton.driver();
- }
-
- public void stop()
- {
- if (_driver != null) {
- if(_logger.isLoggable(Level.FINE))
- {
- _logger.fine(this + " about to stop");
- }
- //close all connections
- for (Connector<?> c : _driver.connectors())
- {
- Connection connection = c.getConnection();
- connection.close();
- }
- //stop listeners
- for (Listener<?> l : _driver.listeners())
- {
- try
- {
- l.close();
- }
- catch (IOException e)
- {
- _logger.log(Level.WARNING, "Error while closing listener", e);
- }
- }
- waitUntil(_allClosed);
- }
- }
-
- public boolean stopped()
- {
- return _allClosed.test();
- }
-
- public boolean work(long timeout) throws TimeoutException
- {
- if (_driver == null) { return false; }
- _worked = false;
- return waitUntil(_workPred, timeout);
- }
-
- public void interrupt()
- {
- if (_driver != null) {
- _driver.wakeup();
- }
- }
-
- private String defaultRewrite(String address) {
- if (address != null && address.contains("@")) {
- Address addr = new Address(address);
- String scheme = addr.getScheme();
- String host = addr.getHost();
- String port = addr.getPort();
- String name = addr.getName();
-
- StringBuilder sb = new StringBuilder();
- if (scheme != null) {
- sb.append(scheme).append("://");
- }
- if (host != null) {
- sb.append(host);
- }
- if (port != null) {
- sb.append(":").append(port);
- }
- if (name != null) {
- sb.append("/").append(name);
- }
- return sb.toString();
- } else {
- return address;
- }
- }
-
-
- private String _original;
-
- private void rewriteMessage(Message m)
- {
- _original = m.getAddress();
- if (_rewrites.apply(_original)) {
- m.setAddress(_rewrites.result());
- } else {
- m.setAddress(defaultRewrite(_original));
- }
- }
-
- private void restoreMessage(Message m)
- {
- m.setAddress(_original);
- }
-
- private String routeAddress(String addr)
- {
- if (_routes.apply(addr)) {
- return _routes.result();
- } else {
- return addr;
- }
- }
-
- public void put(Message m) throws MessengerException
- {
- if (_driver == null) {
- throw new IllegalStateException("cannot put while messenger is stopped");
- }
-
- if(_logger.isLoggable(Level.FINE))
- {
- _logger.fine(this + " about to put message: " + m);
- }
-
- StoreEntry entry = _outgoingStore.put( m.getAddress() );
- _outgoingTracker = new TrackerImpl(TrackerImpl.Type.OUTGOING,
- _outgoingStore.trackEntry(entry));
-
- String routedAddress = routeAddress(m.getAddress());
- Address address = new Address(routedAddress);
- if (address.getHost() == null)
- {
- throw new MessengerException("unable to send to address: " + routedAddress);
- }
-
- rewriteMessage(m);
-
- try {
- adjustReplyTo(m);
-
- int encoded;
- byte[] buffer = new byte[5*1024];
- while (true)
- {
- try
- {
- encoded = m.encode(buffer, 0, buffer.length);
- break;
- } catch (java.nio.BufferOverflowException e) {
- buffer = new byte[buffer.length*2];
- }
- }
- entry.setEncodedMsg( buffer, encoded );
- }
- finally
- {
- restoreMessage(m);
- }
-
- Sender sender = getLink(address, new SenderFinder(address.getName()));
- pumpOut(m.getAddress(), sender);
- }
-
- private void reclaimLink(Link link)
- {
- if (link instanceof Receiver)
- {
- int credit = link.getCredit();
- if (credit > 0)
- {
- _credit += credit;
- _distributed -= credit;
- }
- }
-
- Delivery delivery = link.head();
- while (delivery != null)
- {
- StoreEntry entry = (StoreEntry) delivery.getContext();
- if (entry != null)
- {
- entry.setDelivery(null);
- if (delivery.isBuffered()) {
- entry.setStatus(Status.ABORTED);
- }
- }
- delivery = delivery.next();
- }
- linkRemoved(link);
- }
-
- private int pumpOut( String address, Sender sender )
- {
- StoreEntry entry = _outgoingStore.get( address );
- if (entry == null) {
- sender.drained();
- return 0;
- }
-
- byte[] tag = String.valueOf(_nextTag++).getBytes();
- Delivery delivery = sender.delivery(tag);
- entry.setDelivery( delivery );
- _logger.log(Level.FINE, "Sending on delivery: " + delivery);
- int n = sender.send( entry.getEncodedMsg(), 0, entry.getEncodedLength());
- if (n < 0) {
- _outgoingStore.freeEntry( entry );
- _logger.log(Level.WARNING, "Send error: " + n);
- return n;
- } else {
- sender.advance();
- _outgoingStore.freeEntry( entry );
- return 0;
- }
- }
-
- public void send() throws TimeoutException
- {
- send(-1);
- }
-
- public void send(int n) throws TimeoutException
- {
- if (_driver == null) {
- throw new IllegalStateException("cannot send while messenger is stopped");
- }
-
- if(_logger.isLoggable(Level.FINE))
- {
- _logger.fine(this + " about to send");
- }
-
- if (n == -1)
- _sendThreshold = 0;
- else
- {
- _sendThreshold = outgoing() - n;
- if (_sendThreshold < 0)
- _sendThreshold = 0;
- }
-
- waitUntil(_sentSettled);
- }
-
- public void recv(int n) throws TimeoutException
- {
- if (_driver == null) {
- throw new IllegalStateException("cannot recv while messenger is stopped");
- }
-
- if (_logger.isLoggable(Level.FINE) && n != -1)
- {
- _logger.fine(this + " about to wait for up to " + n + " messages to be received");
- }
-
- if (n == -1)
- {
- _credit_mode = LinkCreditMode.LINK_CREDIT_AUTO;
- }
- else
- {
- _credit_mode = LinkCreditMode.LINK_CREDIT_EXPLICIT;
- if (n > _distributed)
- _credit = n - _distributed;
- else // cancel unallocated
- _credit = 0;
- }
-
- distributeCredit();
-
- waitUntil(_messageAvailable);
- }
-
- public void recv() throws TimeoutException
- {
- recv(-1);
- }
-
- public int receiving()
- {
- return _credit + _distributed;
- }
-
- public Message get()
- {
- StoreEntry entry = _incomingStore.get( null );
- if (entry != null)
- {
- Message message = Proton.message();
- message.decode( entry.getEncodedMsg(), 0, entry.getEncodedLength() );
-
- _incomingTracker = new TrackerImpl(TrackerImpl.Type.INCOMING,
- _incomingStore.trackEntry(entry));
-
- _incomingStore.freeEntry( entry );
- return message;
- }
- return null;
- }
-
- private int pumpIn(String address, Receiver receiver)
- {
- Delivery delivery = receiver.current();
- if (delivery.isReadable() && !delivery.isPartial())
- {
- StoreEntry entry = _incomingStore.put( address );
- entry.setDelivery( delivery );
-
- _logger.log(Level.FINE, "Readable delivery found: " + delivery);
-
- int size = delivery.pending();
- byte[] buffer = new byte[size];
- int read = receiver.recv( buffer, 0, buffer.length );
- if (read != size) {
- throw new IllegalStateException();
- }
- entry.setEncodedMsg( buffer, size );
- receiver.advance();
-
- // account for the used credit, replenish if
- // low (< 20% maximum per-link batch) and
- // extra credit available
- assert(_distributed > 0);
- _distributed--;
- if (!receiver.getDrain() && _blocked.isEmpty() && _credit > 0)
- {
- final int max = perLinkCredit();
- final int lo_thresh = (int)(max * 0.2 + 0.5);
- if (receiver.getRemoteCredit() < lo_thresh)
- {
- final int more = Math.min(_credit, max - receiver.getRemoteCredit());
- _credit -= more;
- _distributed += more;
- receiver.flow(more);
- }
- }
- // check if blocked
- if (receiver.getRemoteCredit() == 0 && _credited.contains(receiver))
- {
- _credited.remove(receiver);
- if (receiver.getDrain())
- {
- receiver.setDrain(false);
- assert( _draining > 0 );
- _draining--;
- }
- _blocked.add(receiver);
- }
- }
- return 0;
- }
-
- public void subscribe(String source) throws MessengerException
- {
- if (_driver == null) {
- throw new IllegalStateException("messenger is stopped");
- }
-
- String routed = routeAddress(source);
- Address address = new Address(routed);
-
- String hostName = address.getHost();
- if (hostName == null) throw new MessengerException("Invalid address (hostname cannot be null): " + routed);
- int port = Integer.valueOf(address.getImpliedPort());
- if (address.isPassive())
- {
- if(_logger.isLoggable(Level.FINE))
- {
- _logger.fine(this + " about to subscribe to source " + source + " using address " + hostName + ":" + port);
- }
- ListenerContext ctx = new ListenerContext(address);
- _driver.createListener(hostName, port, ctx);
- }
- else
- {
- if(_logger.isLoggable(Level.FINE))
- {
- _logger.fine(this + " about to subscribe to source " + source);
- }
- getLink(address, new ReceiverFinder(address.getName()));
- }
- }
-
- public int outgoing()
- {
- return _outgoingStore.size() + queued(true);
- }
-
- public int incoming()
- {
- return _incomingStore.size() + queued(false);
- }
-
- public int getIncomingWindow()
- {
- return _incomingStore.getWindow();
- }
-
- public void setIncomingWindow(int window)
- {
- _incomingStore.setWindow(window);
- }
-
- public int getOutgoingWindow()
- {
- return _outgoingStore.getWindow();
- }
-
- public void setOutgoingWindow(int window)
- {
- _outgoingStore.setWindow(window);
- }
-
- public Tracker incomingTracker()
- {
- return _incomingTracker;
- }
- public Tracker outgoingTracker()
- {
- return _outgoingTracker;
- }
-
- private Store getTrackerStore(Tracker tracker)
- {
- return ((TrackerImpl)tracker).isOutgoing() ? _outgoingStore : _incomingStore;
- }
-
- @Override
- public void reject(Tracker tracker, int flags)
- {
- int id = ((TrackerImpl)tracker).getSequence();
- getTrackerStore(tracker).update(id, Status.REJECTED, flags, false, false);
- }
-
- @Override
- public void accept(Tracker tracker, int flags)
- {
- int id = ((TrackerImpl)tracker).getSequence();
- getTrackerStore(tracker).update(id, Status.ACCEPTED, flags, false, false);
- }
-
- @Override
- public void settle(Tracker tracker, int flags)
- {
- int id = ((TrackerImpl)tracker).getSequence();
- getTrackerStore(tracker).update(id, Status.UNKNOWN, flags, true, true);
- }
-
- public Status getStatus(Tracker tracker)
- {
- int id = ((TrackerImpl)tracker).getSequence();
- StoreEntry e = getTrackerStore(tracker).getEntry(id);
- if (e != null)
- {
- return e.getStatus();
- }
- return Status.UNKNOWN;
- }
-
- @Override
- public void route(String pattern, String address)
- {
- _routes.rule(pattern, address);
- }
-
- @Override
- public void rewrite(String pattern, String address)
- {
- _rewrites.rule(pattern, address);
- }
-
- private int queued(boolean outgoing)
- {
- int count = 0;
- if (_driver != null) {
- for (Connector<?> c : _driver.connectors())
- {
- Connection connection = c.getConnection();
- for (Link link : new Links(connection, ACTIVE, ANY))
- {
- if (outgoing)
- {
- if (link instanceof Sender) count += link.getQueued();
- }
- else
- {
- if (link instanceof Receiver) count += link.getQueued();
- }
- }
- }
- }
- return count;
- }
-
- private void bringDestruction()
- {
- for (Connector<?> c : _awaitingDestruction)
- {
- c.destroy();
- }
- _awaitingDestruction.clear();
- }
-
- private void processAllConnectors()
- {
- distributeCredit();
- for (Connector<?> c : _driver.connectors())
- {
- processEndpoints(c);
- try
- {
- if (c.process()) {
- _worked = true;
- }
- }
- catch (IOException e)
- {
- _logger.log(Level.SEVERE, "Error processing connection", e);
- }
- }
- bringDestruction();
- distributeCredit();
- }
-
- private void processActive()
- {
- //process active listeners
- for (Listener<?> l = _driver.listener(); l != null; l = _driver.listener())
- {
- _worked = true;
- Connector<?> c = l.accept();
- Connection connection = Proton.connection();
- connection.setContainer(_name);
- ListenerContext ctx = (ListenerContext) l.getContext();
- connection.setContext(new ConnectionContext(ctx.getAddress(), c));
- c.setConnection(connection);
- Transport transport = c.getTransport();
- //TODO: full SASL
- Sasl sasl = c.sasl();
- if (sasl != null)
- {
- sasl.server();
- sasl.setMechanisms(new String[]{"ANONYMOUS"});
- sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
- }
- transport.ssl(ctx.getDomain());
- connection.open();
- }
- // process connectors, reclaiming credit on closed connectors
- for (Connector<?> c = _driver.connector(); c != null; c = _driver.connector())
- {
- _worked = true;
- if (c.isClosed())
- {
- _awaitingDestruction.add(c);
- reclaimCredit(c.getConnection());
- }
- else
- {
- _logger.log(Level.FINE, "Processing active connector " + c);
- try
- {
- c.process();
- processEndpoints(c);
- c.process();
- }
- catch (IOException e)
- {
- _logger.log(Level.SEVERE, "Error processing connection", e);
- }
- }
- }
- bringDestruction();
- distributeCredit();
- }
-
- private void processEndpoints(Connector c)
- {
- Connection connection = c.getConnection();
-
- if (connection.getLocalState() == EndpointState.UNINITIALIZED)
- {
- connection.open();
- }
-
- Delivery delivery = connection.getWorkHead();
- while (delivery != null)
- {
- Link link = delivery.getLink();
- if (delivery.isUpdated())
- {
- if (link instanceof Sender)
- {
- delivery.disposition(delivery.getRemoteState());
- }
- StoreEntry e = (StoreEntry) delivery.getContext();
- if (e != null) e.updated();
- }
-
- if (delivery.isReadable())
- {
- pumpIn( link.getSource().getAddress(), (Receiver)link );
- }
-
- Delivery next = delivery.getWorkNext();
- delivery.clear();
- delivery = next;
- }
-
- for (Session session : new Sessions(connection, UNINIT, ANY))
- {
- session.open();
- _logger.log(Level.FINE, "Opened session " + session);
- }
- for (Link link : new Links(connection, UNINIT, ANY))
- {
- //TODO: the following is not correct; should only copy those properties that we understand
- //TODO: is this any better:
- if (link.getRemoteSource() != null) {
- link.setSource(link.getRemoteSource().copy());
- }
- if (link.getRemoteTarget() != null) {
- link.setTarget(link.getRemoteTarget().copy());
- }
- linkAdded(link);
- link.open();
- _logger.log(Level.FINE, "Opened link " + link);
- }
-
- distributeCredit();
-
- for (Link link : new Links(connection, ACTIVE, ACTIVE))
- {
- if (link instanceof Sender)
- {
- pumpOut(link.getTarget().getAddress(), (Sender)link);
- }
- }
-
- for (Session session : new Sessions(connection, ACTIVE, CLOSED))
- {
- session.close();
- }
-
- for (Link link : new Links(connection, ANY, CLOSED))
- {
- if (link.getLocalState() == EndpointState.ACTIVE)
- {
- link.close();
- }
- else
- {
- reclaimLink(link);
- }
- }
-
- if (connection.getRemoteState() == EndpointState.CLOSED)
- {
- if (connection.getLocalState() == EndpointState.ACTIVE)
- {
- connection.close();
- }
- }
- }
-
- private boolean waitUntil(Predicate condition) throws TimeoutException
- {
- if (_blocking) {
- boolean done = waitUntil(condition, _timeout);
- if (!done) {
- _logger.log(Level.SEVERE, String.format
- ("Timeout when waiting for condition %s after %s ms",
- condition, _timeout));
- throw new TimeoutException();
- }
- return done;
- } else {
- return waitUntil(condition, 0);
- }
- }
-
- private boolean waitUntil(Predicate condition, long timeout)
- {
- if (_driver == null) {
- throw new IllegalStateException("cannot wait while messenger is stopped");
- }
-
- processAllConnectors();
-
- // wait until timeout expires or until test is true
- long now = System.currentTimeMillis();
- final long deadline = timeout < 0 ? Long.MAX_VALUE : now + timeout;
- boolean done = false;
-
- while (true)
- {
- done = condition.test();
- if (done) break;
-
- long remaining;
- if (timeout < 0)
- remaining = -1;
- else {
- remaining = deadline - now;
- if (remaining < 0) break;
- }
-
- // Update the credit scheduler. If the scheduler detects
- // credit imbalance on the links, wake up in time to
- // service credit drain
- distributeCredit();
- if (_next_drain != 0)
- {
- long wakeup = (_next_drain > now) ? _next_drain - now : 0;
- remaining = (remaining == -1) ? wakeup : Math.min(remaining, wakeup);
- }
-
- boolean woken;
- woken = _driver.doWait(remaining);
- processActive();
- if (woken) {
- throw new InterruptException();
- }
- now = System.currentTimeMillis();
- }
-
- return done;
- }
-
- private Connection lookup(Address address)
- {
- for (Connector<?> c : _driver.connectors())
- {
- Connection connection = c.getConnection();
- ConnectionContext ctx = (ConnectionContext) connection.getContext();
- if (ctx.matches(address))
- {
- return connection;
- }
- }
- return null;
- }
-
- private void reclaimCredit(Connection connection)
- {
- for (Link link : new Links(connection, ANY, ANY))
- {
- reclaimLink(link);
- }
- }
-
- private void distributeCredit()
- {
- if (_receivers == 0) return;
-
- if (_credit_mode == LinkCreditMode.LINK_CREDIT_AUTO)
- {
- // replenish, but limit the max total messages buffered
- final int max = _receivers * _credit_batch;
- final int used = _distributed + incoming();
- if (max > used)
- _credit = max - used;
- }
-
- // reclaim any credit left over after draining links has completed
- if (_draining > 0)
- {
- Iterator<Receiver> itr = _credited.iterator();
- while (itr.hasNext())
- {
- Receiver link = (Receiver) itr.next();
- if (link.getDrain())
- {
- if (!link.draining())
- {
- // drain completed for this link
- int drained = link.drained();
- assert(_distributed >= drained);
- _distributed -= drained;
- _credit += drained;
- link.setDrain(false);
- _draining--;
- itr.remove();
- _blocked.add(link);
- }
- }
- }
- }
-
- // distribute available credit to blocked links
- final int batch = perLinkCredit();
- while (_credit > 0 && !_blocked.isEmpty())
- {
- Receiver link = _blocked.get(0);
- _blocked.remove(0);
-
- final int more = Math.min(_credit, batch);
- _distributed += more;
- _credit -= more;
-
- link.flow(more);
- _credited.add(link);
-
- // flow changed, must process it
- ConnectionContext ctx = (ConnectionContext) link.getSession().getConnection().getContext();
- try
- {
- ctx.getConnector().process();
- } catch (IOException e) {
- _logger.log(Level.SEVERE, "Error processing connection", e);
- }
- }
-
- if (_blocked.isEmpty())
- {
- _next_drain = 0;
- }
- else
- {
- // not enough credit for all links - start draining granted credit
- if (_draining == 0)
- {
- // don't do it too often - pace ourselves (it's expensive)
- if (_next_drain == 0)
- {
- _next_drain = System.currentTimeMillis() + 250;
- }
- else if (_next_drain <= System.currentTimeMillis())
- {
- // initiate drain, free up at most enough to satisfy blocked
- _next_drain = 0;
- int needed = _blocked.size() * batch;
-
- for (Receiver link : _credited)
- {
- if (!link.getDrain()) {
- link.setDrain(true);
- needed -= link.getRemoteCredit();
- _draining++;
- // drain requested on link, must process it
- ConnectionContext ctx = (ConnectionContext) link.getSession().getConnection().getContext();
- try
- {
- ctx.getConnector().process();
- } catch (IOException e) {
- _logger.log(Level.SEVERE, "Error processing connection", e);
- }
- if (needed <= 0) break;
- }
- }
- }
- }
- }
- }
-
- private interface Predicate
- {
- boolean test();
- }
-
- private class SentSettled implements Predicate
- {
- public boolean test()
- {
- //are all sent messages settled?
- int total = _outgoingStore.size();
-
- for (Connector<?> c : _driver.connectors())
- {
- // TBD
- // check if transport is done generating output
- // pn_transport_t *transport = pn_connector_transport(ctor);
- // if (transport) {
- // if (!pn_transport_quiesced(transport)) {
- // pn_connector_process(ctor);
- // return false;
- // }
- // }
-
- Connection connection = c.getConnection();
- for (Link link : new Links(connection, ACTIVE, ANY))
- {
- if (link instanceof Sender)
- {
- total += link.getQueued();
- }
- }
-
- // TBD: there is no per-link unsettled
- // deliveries iterator, so for now get the
- // deliveries by walking the outgoing trackers
- Iterator<StoreEntry> entries = _outgoingStore.trackedEntries();
- while (entries.hasNext() && total <= _sendThreshold)
- {
- StoreEntry e = (StoreEntry) entries.next();
- if (e != null )
- {
- Delivery d = e.getDelivery();
- if (d != null)
- {
- if (d.getRemoteState() == null && !d.remotelySettled())
- {
- total++;
- }
- }
- }
- }
- }
- return total <= _sendThreshold;
- }
- }
-
- private class MessageAvailable implements Predicate
- {
- public boolean test()
- {
- //do we have at least one pending message?
- if (_incomingStore.size() > 0) return true;
- for (Connector<?> c : _driver.connectors())
- {
- Connection connection = c.getConnection();
- Delivery delivery = connection.getWorkHead();
- while (delivery != null)
- {
- if (delivery.isReadable() && !delivery.isPartial())
- {
- return true;
- }
- else
- {
- delivery = delivery.getWorkNext();
- }
- }
- }
- // if no connections, or not listening, exit as there won't ever be a message
- if (!_driver.listeners().iterator().hasNext() && !_driver.connectors().iterator().hasNext())
- return true;
-
- return false;
- }
- }
-
- private class AllClosed implements Predicate
- {
- public boolean test()
- {
- if (_driver == null) {
- return true;
- }
-
- for (Connector<?> c : _driver.connectors()) {
- if (!c.isClosed()) {
- return false;
- }
- }
-
- _driver.destroy();
- _driver = null;
-
- return true;
- }
- }
-
- private boolean _worked = false;
-
- private class WorkPred implements Predicate
- {
- public boolean test()
- {
- return _worked;
- }
- }
-
- private final SentSettled _sentSettled = new SentSettled();
- private final MessageAvailable _messageAvailable = new MessageAvailable();
- private final AllClosed _allClosed = new AllClosed();
- private final WorkPred _workPred = new WorkPred();
-
- private interface LinkFinder<C extends Link>
- {
- C test(Link link);
- C create(Session session);
- }
-
- private class SenderFinder implements LinkFinder<Sender>
- {
- private final String _path;
-
- SenderFinder(String path)
- {
- _path = path == null ? "" : path;
- }
-
- public Sender test(Link link)
- {
- if (link instanceof Sender && matchTarget((Target) link.getTarget(), _path))
- {
- return (Sender) link;
- }
- else
- {
- return null;
- }
- }
-
- public Sender create(Session session)
- {
- Sender sender = session.sender(_path);
- Target target = new Target();
- target.setAddress(_path);
- sender.setTarget(target);
- // the C implemenation does this:
- Source source = new Source();
- source.setAddress(_path);
- sender.setSource(source);
- if (getOutgoingWindow() > 0)
- {
- // use explicit settlement via dispositions (not pre-settled)
- sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
- sender.setReceiverSettleMode(ReceiverSettleMode.SECOND); // desired
- }
- return sender;
- }
- }
-
- private class ReceiverFinder implements LinkFinder<Receiver>
- {
- private final String _path;
-
- ReceiverFinder(String path)
- {
- _path = path == null ? "" : path;
- }
-
- public Receiver test(Link link)
- {
- if (link instanceof Receiver && matchSource((Source) link.getSource(), _path))
- {
- return (Receiver) link;
- }
- else
- {
- return null;
- }
- }
-
- public Receiver create(Session session)
- {
- Receiver receiver = session.receiver(_path);
- Source source = new Source();
- source.setAddress(_path);
- receiver.setSource(source);
- // the C implemenation does this:
- Target target = new Target();
- target.setAddress(_path);
- receiver.setTarget(target);
- if (getIncomingWindow() > 0)
- {
- // use explicit settlement via dispositions (not pre-settled)
- receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED); // desired
- receiver.setReceiverSettleMode(ReceiverSettleMode.SECOND);
- }
- return receiver;
- }
- }
-
- private <C extends Link> C getLink(Address address, LinkFinder<C> finder)
- {
- Connection connection = lookup(address);
- if (connection == null)
- {
- String host = address.getHost();
- int port = Integer.valueOf(address.getImpliedPort());
- Connector<?> connector = _driver.createConnector(host, port, null);
- _logger.log(Level.FINE, "Connecting to " + host + ":" + port);
- connection = Proton.connection();
- connection.setContainer(_name);
- connection.setHostname(host);
- connection.setContext(new ConnectionContext(address, connector));
- connector.setConnection(connection);
- Sasl sasl = connector.sasl();
- if (sasl != null)
- {
- sasl.client();
- sasl.setMechanisms(new String[]{"ANONYMOUS"});
- }
- if ("amqps".equalsIgnoreCase(address.getScheme())) {
- Transport transport = connector.getTransport();
- SslDomain domain = makeDomain(address, SslDomain.Mode.CLIENT);
- if (_trustedDb != null) {
- domain.setPeerAuthentication(SslDomain.VerifyMode.VERIFY_PEER);
- //domain.setPeerAuthentication(SslDomain.VerifyMode.VERIFY_PEER_NAME);
- } else {
- domain.setPeerAuthentication(SslDomain.VerifyMode.ANONYMOUS_PEER);
- }
- Ssl ssl = transport.ssl(domain);
- //ssl.setPeerHostname(host);
- }
- connection.open();
- }
-
- for (Link link : new Links(connection, ACTIVE, ANY))
- {
- C result = finder.test(link);
- if (result != null) return result;
- }
- Session session = connection.session();
- session.open();
- C link = finder.create(session);
- linkAdded(link);
- link.open();
- return link;
- }
-
- private static class Links implements Iterable<Link>
- {
- private final Connection _connection;
- private final EnumSet<EndpointState> _local;
- private final EnumSet<EndpointState> _remote;
-
- Links(Connection connection, EnumSet<EndpointState> local, EnumSet<EndpointState> remote)
- {
- _connection = connection;
- _local = local;
- _remote = remote;
- }
-
- public java.util.Iterator<Link> iterator()
- {
- return new LinkIterator(_connection, _local, _remote);
- }
- }
-
- private static class LinkIterator implements java.util.Iterator<Link>
- {
- private final EnumSet<EndpointState> _local;
- private final EnumSet<EndpointState> _remote;
- private Link _next;
-
- LinkIterator(Connection connection, EnumSet<EndpointState> local, EnumSet<EndpointState> remote)
- {
- _local = local;
- _remote = remote;
- _next = connection.linkHead(_local, _remote);
- }
-
- public boolean hasNext()
- {
- return _next != null;
- }
-
- public Link next()
- {
- try
- {
- return _next;
- }
- finally
- {
- _next = _next.next(_local, _remote);
- }
- }
-
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
- }
-
- private static class Sessions implements Iterable<Session>
- {
- private final Connection _connection;
- private final EnumSet<EndpointState> _local;
- private final EnumSet<EndpointState> _remote;
-
- Sessions(Connection connection, EnumSet<EndpointState> local, EnumSet<EndpointState> remote)
- {
- _connection = connection;
- _local = local;
- _remote = remote;
- }
-
- public java.util.Iterator<Session> iterator()
- {
- return new SessionIterator(_connection, _local, _remote);
- }
- }
-
- private static class SessionIterator implements java.util.Iterator<Session>
- {
- private final EnumSet<EndpointState> _local;
- private final EnumSet<EndpointState> _remote;
- private Session _next;
-
- SessionIterator(Connection connection, EnumSet<EndpointState> local, EnumSet<EndpointState> remote)
- {
- _local = local;
- _remote = remote;
- _next = connection.sessionHead(_local, _remote);
- }
-
- public boolean hasNext()
- {
- return _next != null;
- }
-
- public Session next()
- {
- try
- {
- return _next;
- }
- finally
- {
- _next = _next.next(_local, _remote);
- }
- }
-
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
- }
-
- private void adjustReplyTo(Message m)
- {
- String original = m.getReplyTo();
- if (original != null) {
- if (original.startsWith("~/"))
- {
- m.setReplyTo("amqp://" + _name + "/" + original.substring(2));
- }
- else if (original.equals("~"))
- {
- m.setReplyTo("amqp://" + _name);
- }
- }
- }
-
- private static boolean matchTarget(Target target, String path)
- {
- if (target == null) return path.isEmpty();
- else return path.equals(target.getAddress());
- }
-
- private static boolean matchSource(Source source, String path)
- {
- if (source == null) return path.isEmpty();
- else return path.equals(source.getAddress());
- }
-
- @Override
- public String toString()
- {
- StringBuilder builder = new StringBuilder();
- builder.append("MessengerImpl [_name=").append(_name).append("]");
- return builder.toString();
- }
-
- // compute the maximum amount of credit each receiving link is
- // entitled to. The actual credit given to the link depends on
- // what amount of credit is actually available.
- private int perLinkCredit()
- {
- if (_receivers == 0) return 0;
- int total = _credit + _distributed;
- return Math.max(total/_receivers, 1);
- }
-
- // a new link has been created, account for it.
- private void linkAdded(Link link)
- {
- if (link instanceof Receiver)
- {
- _receivers++;
- _blocked.add((Receiver)link);
- link.setContext(Boolean.TRUE);
- }
- }
-
- // a link is being removed, account for it.
- private void linkRemoved(Link _link)
- {
- if (_link instanceof Receiver && (Boolean) _link.getContext())
- {
- _link.setContext(Boolean.FALSE);
- Receiver link = (Receiver)_link;
- assert _receivers > 0;
- _receivers--;
- if (link.getDrain())
- {
- link.setDrain(false);
- assert _draining > 0;
- _draining--;
- }
- if (_blocked.contains(link))
- _blocked.remove(link);
- else if (_credited.contains(link))
- _credited.remove(link);
- else
- assert(false);
- }
- }
-
- private static class ConnectionContext
- {
- private Address _address;
- private Connector _connector;
-
- public ConnectionContext(Address address, Connector connector)
- {
- _address = address;
- _connector = connector;
- }
-
- public Address getAddress()
- {
- return _address;
- }
-
- public boolean matches(Address address)
- {
- String host = address.getHost();
- String port = address.getImpliedPort();
- Connection conn = _connector.getConnection();
- return host.equals(conn.getRemoteContainer()) ||
- (_address.getHost().equals(host) && _address.getImpliedPort().equals(port));
- }
-
- public Connector getConnector()
- {
- return _connector;
- }
- }
-
- private SslDomain makeDomain(Address address, SslDomain.Mode mode)
- {
- SslDomain domain = Proton.sslDomain();
- domain.init(mode);
- if (_certificate != null) {
- domain.setCredentials(_certificate, _privateKey, _password);
- }
- if (_trustedDb != null) {
- domain.setTrustedCaDb(_trustedDb);
- }
-
- if ("amqps".equalsIgnoreCase(address.getScheme())) {
- domain.allowUnsecuredClient(false);
- } else {
- domain.allowUnsecuredClient(true);
- }
-
- return domain;
- }
-
-
- private class ListenerContext
- {
- private Address _address;
- private SslDomain _domain;
-
- public ListenerContext(Address address)
- {
- _address = address;
- _domain = makeDomain(address, SslDomain.Mode.SERVER);
- }
-
- public SslDomain getDomain()
- {
- return _domain;
- }
-
- public Address getAddress()
- {
- return _address;
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Store.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Store.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Store.java
deleted file mode 100644
index b60e8ed..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Store.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.qpid.proton.messenger.impl;
-
-import java.util.List;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.Collection;
-import java.util.Iterator;
-
-import org.apache.qpid.proton.messenger.Status;
-import org.apache.qpid.proton.messenger.Messenger;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.amqp.messaging.Accepted;
-import org.apache.qpid.proton.amqp.messaging.Rejected;
-
-/**
- * @deprecated Messenger will be removed from upcoming proton-j releases.
- */
-class Store
-{
- private static final Accepted ACCEPTED = Accepted.getInstance();
- private static final Rejected REJECTED = new Rejected();
-
- private LinkedList<StoreEntry> _store = new LinkedList<StoreEntry>();
- private HashMap<String, LinkedList<StoreEntry>> _stream = new HashMap<String, LinkedList<StoreEntry>>();
-
- // for incoming/outgoing window tracking
- int _window;
- int _lwm;
- int _hwm;
- private HashMap<Integer, StoreEntry> _tracked = new HashMap<Integer, StoreEntry>();
-
- Store()
- {
- }
-
- private boolean isTracking( Integer id )
- {
- return id != null && (id.intValue() - _lwm >= 0) && (_hwm - id.intValue() > 0);
- }
-
- int size()
- {
- return _store.size();
- }
-
- int getWindow()
- {
- return _window;
- }
-
- void setWindow(int window)
- {
- _window = window;
- }
-
- StoreEntry put(String address)
- {
- if (address == null) address = "";
- StoreEntry entry = new StoreEntry(this, address);
- _store.add( entry );
- LinkedList<StoreEntry> list = _stream.get( address );
- if (list != null) {
- list.add( entry );
- } else {
- list = new LinkedList<StoreEntry>();
- list.add( entry );
- _stream.put( address, list );
- }
- entry.stored();
- return entry;
- }
-
- StoreEntry get(String address)
- {
- if (address != null) {
- LinkedList<StoreEntry> list = _stream.get( address );
- if (list != null) return list.peekFirst();
- } else {
- return _store.peekFirst();
- }
- return null;
- }
-
- StoreEntry getEntry(int id)
- {
- return _tracked.get(id);
- }
-
- Iterator<StoreEntry> trackedEntries()
- {
- return _tracked.values().iterator();
- }
-
- void freeEntry(StoreEntry entry)
- {
- if (entry.isStored()) {
- _store.remove( entry );
- LinkedList<StoreEntry> list = _stream.get( entry.getAddress() );
- if (list != null) list.remove( entry );
- entry.notStored();
- }
- // note well: may still be in _tracked map if still in window!
- }
-
- public int trackEntry(StoreEntry entry)
- {
- assert( entry.getStore() == this );
- entry.setId(_hwm++);
- _tracked.put(entry.getId(), entry);
- slideWindow();
- return entry.getId();
- }
-
- private void slideWindow()
- {
- if (_window >= 0)
- {
- while (_hwm - _lwm > _window)
- {
- StoreEntry old = getEntry(_lwm);
- if (old != null)
- {
- _tracked.remove( old.getId() );
- Delivery d = old.getDelivery();
- if (d != null) {
- if (d.getLocalState() == null)
- d.disposition(ACCEPTED);
- d.settle();
- }
- }
- _lwm++;
- }
- }
- }
-
- int update(int id, Status status, int flags, boolean settle, boolean match )
- {
- if (!isTracking(id)) return 0;
-
- int start = (Messenger.CUMULATIVE & flags) != 0 ? _lwm : id;
- for (int i = start; (id - i) >= 0; i++)
- {
- StoreEntry e = getEntry(i);
- if (e != null)
- {
- Delivery d = e.getDelivery();
- if (d != null)
- {
- if (d.getLocalState() == null)
- {
- if (match)
- {
- d.disposition(d.getRemoteState());
- }
- else
- {
- switch (status)
- {
- case ACCEPTED:
- d.disposition(ACCEPTED);
- break;
- case REJECTED:
- d.disposition(REJECTED);
- break;
- default:
- break;
- }
- }
- e.updated();
- }
- }
- if (settle)
- {
- if (d != null)
- {
- d.settle();
- }
- _tracked.remove(e.getId());
- }
- }
- }
-
- while (_hwm - _lwm > 0 && !_tracked.containsKey(_lwm))
- {
- _lwm++;
- }
-
- return 0;
- }
-}
-
-
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/StoreEntry.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/StoreEntry.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/StoreEntry.java
deleted file mode 100644
index 1687b94..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/StoreEntry.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.qpid.proton.messenger.impl;
-
-import org.apache.qpid.proton.messenger.Tracker;
-import org.apache.qpid.proton.messenger.Status;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.amqp.messaging.Accepted;
-import org.apache.qpid.proton.amqp.messaging.Modified;
-import org.apache.qpid.proton.amqp.messaging.Rejected;
-import org.apache.qpid.proton.amqp.messaging.Released;
-import org.apache.qpid.proton.amqp.messaging.Received;
-import org.apache.qpid.proton.amqp.transport.DeliveryState;
-
-/**
- * @deprecated Messenger will be removed from upcoming proton-j releases.
- */
-class StoreEntry
-{
- private Store _store;
- private Integer _id;
- private String _address;
- private byte[] _encodedMsg;
- private int _encodedLength;
- private Delivery _delivery;
- private Status _status = Status.UNKNOWN;
- private Object _context;
- private boolean _inStore = false;
-
- public StoreEntry(Store store, String address)
- {
- _store = store;
- _address = address;
- }
-
- public Store getStore()
- {
- return _store;
- }
-
- public boolean isStored()
- {
- return _inStore;
- }
-
- public void stored()
- {
- _inStore = true;
- }
-
- public void notStored()
- {
- _inStore = false;
- }
-
- public String getAddress()
- {
- return _address;
- }
-
- public byte[] getEncodedMsg()
- {
- return _encodedMsg;
- }
-
- public int getEncodedLength()
- {
- return _encodedLength;
- }
-
- public void setEncodedMsg( byte[] encodedMsg, int length )
- {
- _encodedMsg = encodedMsg;
- _encodedLength = length;
- }
-
- public void setId(int id)
- {
- _id = new Integer(id);
- }
-
- public Integer getId()
- {
- return _id;
- }
-
- public void setDelivery( Delivery d )
- {
- if (_delivery != null)
- {
- _delivery.setContext(null);
- }
- _delivery = d;
- if (_delivery != null)
- {
- _delivery.setContext(this);
- }
- updated();
- }
-
- public Delivery getDelivery()
- {
- return _delivery;
- }
-
- public Status getStatus()
- {
- return _status;
- }
-
- public void setStatus(Status status)
- {
- _status = status;
- }
-
- private static Status _disp2status(DeliveryState disp)
- {
- if (disp == null) return Status.PENDING;
-
- if (disp instanceof Received)
- return Status.PENDING;
- if (disp instanceof Accepted)
- return Status.ACCEPTED;
- if (disp instanceof Rejected)
- return Status.REJECTED;
- if (disp instanceof Released)
- return Status.RELEASED;
- if (disp instanceof Modified)
- return Status.MODIFIED;
- assert(false);
- return null;
- }
-
- public void updated()
- {
- if (_delivery != null)
- {
- if (_delivery.getRemoteState() != null)
- {
- _status = _disp2status(_delivery.getRemoteState());
- }
- else if (_delivery.remotelySettled())
- {
- DeliveryState disp = _delivery.getLocalState();
- if (disp == null) {
- _status = Status.SETTLED;
- } else {
- _status = _disp2status(_delivery.getLocalState());
- }
- }
- else
- {
- _status = Status.PENDING;
- }
- }
- }
-
- public void setContext(Object context)
- {
- _context = context;
- }
-
- public Object getContext()
- {
- return _context;
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerImpl.java
deleted file mode 100644
index 2d8b584..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerImpl.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.qpid.proton.messenger.impl;
-
-import org.apache.qpid.proton.messenger.Tracker;
-
-/**
- * @deprecated Messenger will be removed from upcoming proton-j releases.
- */
-class TrackerImpl implements Tracker
-{
- public enum Type {
- OUTGOING,
- INCOMING
- }
-
- private Type _type;
- private int _sequence;
-
- TrackerImpl(Type type, int sequence)
- {
- _type = type;
- _sequence = sequence;
- }
-
- boolean isOutgoing()
- {
- return _type == Type.OUTGOING;
- }
-
- int getSequence()
- {
- return _sequence;
- }
-
- public String toString()
- {
- return (isOutgoing() ? "O:" : "I:") + Integer.toString(_sequence);
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Transform.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Transform.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Transform.java
deleted file mode 100644
index c3a08ea..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Transform.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.messenger.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-
-/**
- * Transform
- *
- * @deprecated Messenger will be removed from upcoming proton-j releases.
- */
-class Transform
-{
-
- private static class Rule {
-
- String _pattern;
- String _substitution;
-
- Pattern _compiled;
- StringBuilder _sb = new StringBuilder();
- boolean _matched = false;
- String _result = null;
-
- Rule(String pattern, String substitution)
- {
- _pattern = pattern;
- _substitution = substitution;
- _compiled = Pattern.compile(_pattern.replace("*", "(.*)").replace("%", "([^/]*)"));
- }
-
- boolean apply(String src) {
- _matched = false;
- _result = null;
- Matcher m = _compiled.matcher(src);
- if (m.matches()) {
- _matched = true;
- if (_substitution != null) {
- _sb.setLength(0);
- int limit = _substitution.length();
- int idx = 0;
- while (idx < limit) {
- char c = _substitution.charAt(idx);
- switch (c) {
- case '$':
- idx++;
- if (idx < limit) {
- c = _substitution.charAt(idx);
- } else {
- throw new IllegalStateException("substition index truncated");
- }
-
- if (c == '$') {
- _sb.append(c);
- idx++;
- } else {
- int num = 0;
- while (Character.isDigit(c)) {
- num *= 10;
- num += c - '0';
- idx++;
- c = idx < limit ? _substitution.charAt(idx) : '\0';
- }
- if (num > 0) {
- _sb.append(m.group(num));
- } else {
- throw new IllegalStateException
- ("bad substitution index at character[" +
- idx + "]: " + _substitution);
- }
- }
- break;
- default:
- _sb.append(c);
- idx++;
- break;
- }
- }
- _result = _sb.toString();
- }
- }
-
- return _matched;
- }
-
- boolean matched() {
- return _matched;
- }
-
- String result() {
- return _result;
- }
-
- }
-
- private List<Rule> _rules = new ArrayList<Rule>();
- private Rule _matched = null;
-
- public void rule(String pattern, String substitution)
- {
- _rules.add(new Rule(pattern, substitution));
- }
-
- public boolean apply(String src)
- {
- _matched = null;
-
- for (Rule rule: _rules) {
- if (rule.apply(src)) {
- _matched = rule;
- break;
- }
- }
-
- return _matched != null;
- }
-
- public boolean matched()
- {
- return _matched != null;
- }
-
- public String result()
- {
- return _matched != null ? _matched.result() : null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/Acceptor.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Acceptor.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Acceptor.java
deleted file mode 100644
index 222ce40..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Acceptor.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.reactor;
-
-import org.apache.qpid.proton.engine.Extendable;
-
-/**
- * Acceptors are children of a {@link Reactor} that accept in-bound network
- * connections.
- */
-public interface Acceptor extends ReactorChild, Extendable {
-
- /**
- * Closes the acceptor, stopping it accepting any further in-bound
- * connections. Already accepted connections continue to be processed by
- * the associated reactor.
- */
- void close();
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/FlowController.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/FlowController.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/FlowController.java
deleted file mode 100644
index 716b2a7..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/FlowController.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.reactor;
-
-import org.apache.qpid.proton.engine.BaseHandler;
-import org.apache.qpid.proton.engine.Event;
-import org.apache.qpid.proton.engine.Link;
-import org.apache.qpid.proton.engine.Receiver;
-
-/**
- * A handler that applies flow control to a connection. This handler tops-up
- * link credit each time credit is expended by the receipt of messages.
- */
-public class FlowController extends BaseHandler {
-
- private int drained;
- private int window;
-
- public FlowController(int window) {
- // XXX: a window of 1 doesn't work because we won't necessarily get
- // notified when the one allowed delivery is settled
- if (window <= 1) throw new IllegalArgumentException();
- this.window = window;
- this.drained = 0;
- }
-
- public FlowController() {
- this(1024);
- }
-
- private void topup(Receiver link, int window) {
- int delta = window - link.getCredit();
- link.flow(delta);
- }
-
- @Override
- public void onUnhandled(Event event) {
- int window = this.window;
- Link link = event.getLink();
-
- switch(event.getType()) {
- case LINK_LOCAL_OPEN:
- case LINK_REMOTE_OPEN:
- case LINK_FLOW:
- case DELIVERY:
- if (link instanceof Receiver) {
- this.drained += link.drained();
- if (this.drained == 0) {
- topup((Receiver)link, window);
- }
- }
- break;
- default:
- break;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/Handshaker.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Handshaker.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Handshaker.java
deleted file mode 100644
index 1423f32..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Handshaker.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.reactor;
-
-import org.apache.qpid.proton.engine.BaseHandler;
-import org.apache.qpid.proton.engine.Endpoint;
-import org.apache.qpid.proton.engine.EndpointState;
-import org.apache.qpid.proton.engine.Event;
-import org.apache.qpid.proton.engine.Link;
-
-/**
- * A handler that mirrors the actions of the remote end of a connection. This
- * handler responds in kind when the remote end of the connection is opened and
- * closed. Likewise if the remote end of the connection opens or closes
- * sessions and links, this handler responds by opening or closing the local end
- * of the session or link.
- */
-public class Handshaker extends BaseHandler {
-
- private void open(Endpoint endpoint) {
- if (endpoint.getLocalState() == EndpointState.UNINITIALIZED) {
- endpoint.open();
- }
- }
-
- private void close(Endpoint endpoint) {
- if (endpoint.getLocalState() != EndpointState.CLOSED) {
- endpoint.close();
- }
- }
-
- @Override
- public void onConnectionRemoteOpen(Event event) {
- open(event.getConnection());
- }
-
- @Override
- public void onSessionRemoteOpen(Event event) {
- open(event.getSession());
- }
-
- @Override
- public void onLinkRemoteOpen(Event event) {
- Link link = event.getLink();
- if (link.getLocalState() == EndpointState.UNINITIALIZED) {
- if (link.getRemoteSource() != null) {
- link.setSource(link.getRemoteSource().copy());
- }
- if (link.getRemoteTarget() != null) {
- link.setTarget(link.getRemoteTarget().copy());
- }
- }
- open(link);
- }
-
- @Override
- public void onConnectionRemoteClose(Event event) {
- close(event.getConnection());
- }
-
- @Override
- public void onSessionRemoteClose(Event event) {
- close(event.getSession());
- }
-
- @Override
- public void onLinkRemoteClose(Event event) {
- close(event.getLink());
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
deleted file mode 100644
index f687bb3..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
+++ /dev/null
@@ -1,323 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.reactor;
-
-import java.io.IOException;
-import java.util.Set;
-
-import org.apache.qpid.proton.engine.BaseHandler;
-import org.apache.qpid.proton.engine.Collector;
-import org.apache.qpid.proton.engine.Connection;
-import org.apache.qpid.proton.engine.Event.Type;
-import org.apache.qpid.proton.engine.Handler;
-import org.apache.qpid.proton.engine.HandlerException;
-import org.apache.qpid.proton.engine.Record;
-import org.apache.qpid.proton.reactor.impl.ReactorImpl;
-
-/**
- * The proton reactor provides a general purpose event processing
- * library for writing reactive programs. A reactive program is defined
- * by a set of event handlers. An event handler is just any class or
- * object that extends the Handler interface. For convenience, a class
- * can extend {@link BaseHandler} and only handle the events that it cares to
- * implement methods for.
- * <p>
- * This class is not thread safe (with the exception of the {@link #wakeup()}
- * method) and should only be used by a single thread at any given time.
- */
-public interface Reactor {
-
- public static final class Factory
- {
- public static Reactor create() throws IOException {
- return new ReactorImpl();
- }
- }
-
- /**
- * Updates the last time that the reactor's state has changed, potentially
- * resulting in events being generated.
- * @return the current time in milliseconds
- * {@link System#currentTimeMillis()}.
- */
- long mark();
-
- /** @return the last time that {@link #mark()} was called. */
- long now();
-
- /** @return an instance of {@link Record} that can be used to associate
- * other objects (attachments) with this instance of the
- * Reactor class.
- */
- Record attachments();
-
- /**
- * The value the reactor will use for {@link Selector#select(long)} that is called as part of {@link #process()}.
- *
- * @param timeout a timeout value in milliseconds, to associate with this instance of
- * the reactor. This can be retrieved using the
- * {@link #getTimeout()} method
- */
- void setTimeout(long timeout);
-
- /**
- * @return the value previously set using {@link #setTimeout(long)} or
- * 0 if no previous value has been set.
- */
- long getTimeout();
-
- /**
- * @return the global handler for this reactor. Every event the reactor
- * sees is dispatched to the global handler. To receive every
- * event generated by the reactor, associate a child handler
- * with the global handler. For example:
- * <pre>
- * getGlobalHandler().add(yourHandler);
- * </pre>
- */
- Handler getGlobalHandler();
-
- /**
- * Sets a new global handler. You probably don't want to do this and
- * would be better adding a handler to the value returned by the
- * {{@link #getGlobalHandler()} method.
- * @param handler the new global handler.
- */
- void setGlobalHandler(Handler handler);
-
- /**
- * @return the handler for this reactor. Every event the reactor sees,
- * which is not handled by a child of the reactor (such as a
- * timer, connection, acceptor, or selector) is passed to this
- * handler. To receive these events, it is recommend that you
- * associate a child handler with the handler returned by this
- * method. For example:
- * <pre>
- * getHandler().add(yourHandler);
- * </pre>
- */
- Handler getHandler();
-
- /**
- * Sets a new handler, that will receive any events not handled by a child
- * of the reactor. Note that setting a handler via this method replaces
- * the previous handler, and will result in no further events being
- * dispatched to the child handlers associated with the previous handler.
- * For this reason it is recommended that you do not use this method and
- * instead add child handlers to the value returned by the
- * {@link #getHandler()} method.
- * @param handler the new handler for this reactor.
- */
- void setHandler(Handler handler);
-
- /**
- * @return a set containing the child objects associated with this reactor.
- * This will contain any active instances of: {@link Task} -
- * created using the {@link #schedule(int, Handler)} method,
- * {@link Connection} - created using the
- * {@link #connectionToHost(String, int, Handler)} method,
- * {@link Acceptor} - created using the
- * {@link #acceptor(String, int)} method,
- * {@link #acceptor(String, int, Handler)} method, or
- * {@link Selectable} - created using the
- * {@link #selectable()} method.
- */
- Set<ReactorChild> children();
-
- /**
- * @return the Collector used to gather events generated by this reactor.
- */
- Collector collector();
-
- /**
- * Creates a new <code>Selectable</code> as a child of this reactor.
- * @return the newly created <code>Selectable</code>.
- */
- Selectable selectable();
-
- /**
- * Updates the specified <code>Selectable</code> either emitting a
- * {@link Type#SELECTABLE_UPDATED} event if the selectable is not terminal,
- * or {@link Type#SELECTABLE_FINAL} if the selectable is terminal and has
- * not already emitted a {@link Type#SELECTABLE_FINAL} event.
- * @param selectable
- */
- void update(Selectable selectable);
-
- /**
- * Yields, causing the next call to {@link #process()} to return
- * successfully - without processing any events. If multiple calls
- * can be made to <code>yield</code> and only the next invocation of
- * {@link #process()} will be affected.
- */
- void yield() ;
-
- /**
- * @return <code>true</code> if the reactor is in quiesced state (e.g. has
- * no events to process). <code>false</code> is returned otherwise.
- */
- boolean quiesced();
-
- /**
- * Process any events pending for this reactor. Events are dispatched to
- * the handlers registered with the reactor, or child objects associated
- * with the reactor. This method blocks until the reactor has no more work
- * to do (and no more work pending, in terms of scheduled tasks or open
- * selectors to process).
- * @return <code>true</code> if the reactor may have more events in the
- * future. For example: if there are scheduled tasks, or open
- * selectors. <code>false</code> is returned if the reactor has
- * (and will have) no more events to process.
- * @throws HandlerException if an unchecked exception is thrown by one of
- * the handlers - it will be re-thrown attached to an instance of
- * <code>HandlerException</code>.
- */
- boolean process() throws HandlerException;
-
- /**
- * Wakes up the thread (if any) blocked in the {@link #process()} method.
- * This is the only method of this class that is thread safe, in that it
- * can be used at the same time as another thread is using the reactor.
- */
- void wakeup();
-
- /**
- * Starts the reactor. This method should be invoked before the first call
- * to {@link #process()}.
- */
- void start();
-
- /**
- * Stops the reactor. This method should be invoked after the last call to
- * {@link #process()}.
- * @throws HandlerException
- */
- void stop() throws HandlerException;
-
- /**
- * Simplifies the use of the reactor by wrapping the use of
- * <code>start</code>, <code>run</code>, and <code>stop</code> method
- * calls.
- * <p>
- * Logically the implementation of this method is:
- * <pre>
- * start();
- * while(process()) {}
- * stop();
- * </pre>
- * @throws HandlerException if an unchecked exception is thrown by one of
- * the handlers - it will be re-thrown attached to an instance of
- * <code>HandlerException</code>.
- */
- void run() throws HandlerException;
-
- /**
- * Schedules execution of a task to take place at some point in the future.
- * @param delay the number of milliseconds, in the future, to schedule the
- * task for.
- * @param handler a handler to associate with the task. This is notified
- * when the deadline for the task is reached.
- * @return an object representing the task that has been scheduled.
- */
- Task schedule(int delay, Handler handler);
-
- /**
- * Creates a new out-bound connection.
- * @param handler a handler that is notified when events occur for the
- * connection. Typically the host and port to connect to
- * would be supplied to the connection object inside the
- * logic which handles the {@link Type#CONNECTION_INIT}
- * event via
- * {@link #setConnectionHost(Connection, String, int)}
- * @return the newly created connection object.
- * @deprecated Use {@link #connectionToHost(String, int, Handler)} instead.
- */
- @Deprecated
- Connection connection(Handler handler);
-
- /**
- * Creates a new out-bound connection to the given host and port.
- * <p>
- * This method will cause Reactor to set up a network connection to the
- * host and create a Connection for it.
- * @param host the host to connect to (e.g. "localhost")
- * @param port the port used for the connection.
- * @param handler a handler that is notified when events occur for the
- * connection.
- * @return the newly created connection object.
- */
- Connection connectionToHost(String host, int port, Handler handler);
-
- /**
- * Set the host address used by the connection
- * <p>
- * This method will set/change the host address used by the Reactor to
- * create an outbound network connection for the given Connection
- * @param c the Connection to assign the address to
- * @param host the address of the host to connect to (e.g. "localhost")
- * @param port the port to use for the connection.
- */
- void setConnectionHost(Connection c, String host, int port);
-
- /**
- * Get the address used by the connection
- * <p>
- * This may be used to retrieve the remote peer address.
- * Note that the returned address may be in numeric IP format.
- * @param c the Connection
- * @return a string containing the address in the following format:
- * <pre>
- * host[:port]
- * </pre>
- */
- String getConnectionAddress(Connection c);
-
- /**
- * Creates a new acceptor. This is equivalent to calling:
- * <pre>
- * acceptor(host, port, null);
- * </pre>
- * @param host
- * @param port
- * @return the newly created acceptor object.
- * @throws IOException
- */
- Acceptor acceptor(String host, int port) throws IOException;
-
- /**
- * Creates a new acceptor. This acceptor listens for in-bound connections.
- * @param host the host name or address of the NIC to listen on.
- * @param port the port number to listen on.
- * @param handler if non-<code>null</code> this handler is registered with
- * each new connection accepted by the acceptor.
- * @return the newly created acceptor object.
- * @throws IOException
- */
- Acceptor acceptor(String host, int port, Handler handler)
- throws IOException;
-
- /**
- * Frees any resources (such as sockets and selectors) held by the reactor
- * or its children.
- */
- void free();
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org