You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2017/01/10 15:59:15 UTC
[54/55] qpid-proton-j git commit: PROTON-1362: remove
previously-deprecated Messenger
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2afe2ec0/proton-j/src/main/java/org/apache/qpid/proton/messenger/Tracker.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/Tracker.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/Tracker.java
deleted file mode 100644
index 974b1b6..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/messenger/Tracker.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.qpid.proton.messenger;
-
-/**
- * @deprecated Messenger will be removed from upcoming proton-j releases.
- */
-@Deprecated
-public interface Tracker { }
-
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2afe2ec0/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Address.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Address.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Address.java
deleted file mode 100644
index 27b0d39..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Address.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.messenger.impl;
-
-
-/**
- * Address
- *
- * @deprecated Messenger will be removed from upcoming proton-j releases.
- */
-public class Address
-{
-
- private String _address;
- private boolean _passive;
- private String _scheme;
- private String _user;
- private String _pass;
- private String _host;
- private String _port;
- private String _name;
-
- public void clear()
- {
- _passive = false;
- _scheme = null;
- _user = null;
- _pass = null;
- _host = null;
- _port = null;
- _name = null;
- }
-
- /**
- * @deprecated Messenger will be removed from upcoming proton-j releases.
- */
- public Address()
- {
- clear();
- }
-
- /**
- * @deprecated Messenger will be removed from upcoming proton-j releases.
- */
- public Address(String address)
- {
- clear();
- int start = 0;
- int schemeEnd = address.indexOf("://", start);
- if (schemeEnd >= 0) {
- _scheme = address.substring(start, schemeEnd);
- start = schemeEnd + 3;
- }
-
- String uphp;
- int slash = address.indexOf("/", start);
- if (slash >= 0) {
- uphp = address.substring(start, slash);
- _name = address.substring(slash + 1);
- } else {
- uphp = address.substring(start);
- }
-
- String hp;
- int at = uphp.indexOf('@');
- if (at >= 0) {
- String up = uphp.substring(0, at);
- hp = uphp.substring(at + 1);
-
- int colon = up.indexOf(':');
- if (colon >= 0) {
- _user = up.substring(0, colon);
- _pass = up.substring(colon + 1);
- } else {
- _user = up;
- }
- } else {
- hp = uphp;
- }
-
- if (hp.startsWith("[")) {
- int close = hp.indexOf(']');
- if (close >= 0) {
- _host = hp.substring(1, close);
- if (hp.substring(close + 1).startsWith(":")) {
- _port = hp.substring(close + 2);
- }
- }
- }
-
- if (_host == null) {
- int colon = hp.indexOf(':');
- if (colon >= 0) {
- _host = hp.substring(0, colon);
- _port = hp.substring(colon + 1);
- } else {
- _host = hp;
- }
- }
-
- if (_host.startsWith("~")) {
- _host = _host.substring(1);
- _passive = true;
- }
- }
-
- public String toString()
- {
- String str = new String();
- if (_scheme != null) str += _scheme + "://";
- if (_user != null) str += _user;
- if (_pass != null) str += ":" + _pass;
- if (_user != null || _pass != null) str += "@";
- if (_host != null) {
- if (_host.contains(":")) str += "[" + _host + "]";
- else str += _host;
- }
- if (_port != null) str += ":" + _port;
- if (_name != null) str += "/" + _name;
- return str;
- }
-
- public boolean isPassive()
- {
- return _passive;
- }
-
- public String getScheme()
- {
- return _scheme;
- }
-
- public String getUser()
- {
- return _user;
- }
-
- public String getPass()
- {
- return _pass;
- }
-
- public String getHost()
- {
- return _host;
- }
-
- public String getPort()
- {
- return _port;
- }
-
- public String getImpliedPort()
- {
- if (_port == null) {
- return getDefaultPort();
- } else {
- return getPort();
- }
- }
-
- public String getDefaultPort()
- {
- if ("amqps".equals(_scheme)) return "5671";
- else return "5672";
- }
-
- public String getName()
- {
- return _name;
- }
-
- public void setScheme(String scheme)
- {
- _scheme= scheme;
- }
-
- public void setUser(String user)
- {
- _user= user;
- }
-
- public void setPass(String pass)
- {
- _pass= pass;
- }
-
- public void setHost(String host)
- {
- _host= host;
- }
-
- public void setPort(String port)
- {
- _port= port;
- }
-
- public void setName(String name)
- {
- _name= name;
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2afe2ec0/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
deleted file mode 100644
index e7c9d9e..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
+++ /dev/null
@@ -1,1555 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.qpid.proton.messenger.impl;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.qpid.proton.Proton;
-import org.apache.qpid.proton.InterruptException;
-import org.apache.qpid.proton.TimeoutException;
-import org.apache.qpid.proton.driver.Connector;
-import org.apache.qpid.proton.driver.Driver;
-import org.apache.qpid.proton.driver.Listener;
-import org.apache.qpid.proton.engine.Connection;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.engine.EndpointState;
-import org.apache.qpid.proton.engine.Link;
-import org.apache.qpid.proton.engine.Receiver;
-import org.apache.qpid.proton.engine.Sasl;
-import org.apache.qpid.proton.engine.Sender;
-import org.apache.qpid.proton.engine.Session;
-import org.apache.qpid.proton.engine.SslDomain;
-import org.apache.qpid.proton.engine.Ssl;
-import org.apache.qpid.proton.engine.Transport;
-import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.messenger.Messenger;
-import org.apache.qpid.proton.messenger.MessengerException;
-import org.apache.qpid.proton.messenger.Status;
-import org.apache.qpid.proton.messenger.Tracker;
-import org.apache.qpid.proton.amqp.messaging.Source;
-import org.apache.qpid.proton.amqp.messaging.Target;
-import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
-import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
-import org.apache.qpid.proton.amqp.Binary;
-
-/**
- * @deprecated Messenger will be removed from upcoming proton-j releases.
- */
-@Deprecated
-public class MessengerImpl implements Messenger
-{
- private enum LinkCreditMode
- {
- // method for replenishing credit
- LINK_CREDIT_EXPLICIT, // recv(N)
- LINK_CREDIT_AUTO; // recv()
- }
-
- private static final EnumSet<EndpointState> UNINIT = EnumSet.of(EndpointState.UNINITIALIZED);
- private static final EnumSet<EndpointState> ACTIVE = EnumSet.of(EndpointState.ACTIVE);
- private static final EnumSet<EndpointState> CLOSED = EnumSet.of(EndpointState.CLOSED);
- private static final EnumSet<EndpointState> ANY = EnumSet.allOf(EndpointState.class);
-
- private final Logger _logger = Logger.getLogger("proton.messenger");
- private final String _name;
- private long _timeout = -1;
- private boolean _blocking = true;
- private long _nextTag = 1;
- private Driver _driver;
- private LinkCreditMode _credit_mode = LinkCreditMode.LINK_CREDIT_EXPLICIT;
- private final int _credit_batch = 1024; // credit_mode == LINK_CREDIT_AUTO
- private int _credit; // available
- private int _distributed; // outstanding credit
- private int _receivers; // total # receiver Links
- private int _draining; // # Links in drain state
- private List<Receiver> _credited = new ArrayList<Receiver>();
- private List<Receiver> _blocked = new ArrayList<Receiver>();
- private long _next_drain;
- private TrackerImpl _incomingTracker;
- private TrackerImpl _outgoingTracker;
- private Store _incomingStore = new Store();
- private Store _outgoingStore = new Store();
- private List<Connector> _awaitingDestruction = new ArrayList<Connector>();
- private int _sendThreshold;
-
- private Transform _routes = new Transform();
- private Transform _rewrites = new Transform();
-
- private String _certificate;
- private String _privateKey;
- private String _password;
- private String _trustedDb;
-
-
- /**
- * @deprecated Messenger will be removed from upcoming proton-j releases.
- */
- @Deprecated public MessengerImpl()
- {
- this(java.util.UUID.randomUUID().toString());
- }
-
- /**
- * @deprecated Messenger will be removed from upcoming proton-j releases.
- */
- @Deprecated public MessengerImpl(String name)
- {
- _name = name;
- }
-
- public void setTimeout(long timeInMillis)
- {
- _timeout = timeInMillis;
- }
-
- public long getTimeout()
- {
- return _timeout;
- }
-
- public boolean isBlocking()
- {
- return _blocking;
- }
-
- public void setBlocking(boolean b)
- {
- _blocking = b;
- }
-
- public void setCertificate(String certificate)
- {
- _certificate = certificate;
- }
-
- public String getCertificate()
- {
- return _certificate;
- }
-
- public void setPrivateKey(String privateKey)
- {
- _privateKey = privateKey;
- }
-
- public String getPrivateKey()
- {
- return _privateKey;
- }
-
- public void setPassword(String password)
- {
- _password = password;
- }
-
- public String getPassword()
- {
- return _password;
- }
-
- public void setTrustedCertificates(String trusted)
- {
- _trustedDb = trusted;
- }
-
- public String getTrustedCertificates()
- {
- return _trustedDb;
- }
-
- public void start() throws IOException
- {
- _driver = Proton.driver();
- }
-
- public void stop()
- {
- if (_driver != null) {
- if(_logger.isLoggable(Level.FINE))
- {
- _logger.fine(this + " about to stop");
- }
- //close all connections
- for (Connector<?> c : _driver.connectors())
- {
- Connection connection = c.getConnection();
- connection.close();
- }
- //stop listeners
- for (Listener<?> l : _driver.listeners())
- {
- try
- {
- l.close();
- }
- catch (IOException e)
- {
- _logger.log(Level.WARNING, "Error while closing listener", e);
- }
- }
- waitUntil(_allClosed);
- }
- }
-
- public boolean stopped()
- {
- return _allClosed.test();
- }
-
- public boolean work(long timeout) throws TimeoutException
- {
- if (_driver == null) { return false; }
- _worked = false;
- return waitUntil(_workPred, timeout);
- }
-
- public void interrupt()
- {
- if (_driver != null) {
- _driver.wakeup();
- }
- }
-
- private String defaultRewrite(String address) {
- if (address != null && address.contains("@")) {
- Address addr = new Address(address);
- String scheme = addr.getScheme();
- String host = addr.getHost();
- String port = addr.getPort();
- String name = addr.getName();
-
- StringBuilder sb = new StringBuilder();
- if (scheme != null) {
- sb.append(scheme).append("://");
- }
- if (host != null) {
- sb.append(host);
- }
- if (port != null) {
- sb.append(":").append(port);
- }
- if (name != null) {
- sb.append("/").append(name);
- }
- return sb.toString();
- } else {
- return address;
- }
- }
-
-
- private String _original;
-
- private void rewriteMessage(Message m)
- {
- _original = m.getAddress();
- if (_rewrites.apply(_original)) {
- m.setAddress(_rewrites.result());
- } else {
- m.setAddress(defaultRewrite(_original));
- }
- }
-
- private void restoreMessage(Message m)
- {
- m.setAddress(_original);
- }
-
- private String routeAddress(String addr)
- {
- if (_routes.apply(addr)) {
- return _routes.result();
- } else {
- return addr;
- }
- }
-
- public void put(Message m) throws MessengerException
- {
- if (_driver == null) {
- throw new IllegalStateException("cannot put while messenger is stopped");
- }
-
- if(_logger.isLoggable(Level.FINE))
- {
- _logger.fine(this + " about to put message: " + m);
- }
-
- StoreEntry entry = _outgoingStore.put( m.getAddress() );
- _outgoingTracker = new TrackerImpl(TrackerImpl.Type.OUTGOING,
- _outgoingStore.trackEntry(entry));
-
- String routedAddress = routeAddress(m.getAddress());
- Address address = new Address(routedAddress);
- if (address.getHost() == null)
- {
- throw new MessengerException("unable to send to address: " + routedAddress);
- }
-
- rewriteMessage(m);
-
- try {
- adjustReplyTo(m);
-
- int encoded;
- byte[] buffer = new byte[5*1024];
- while (true)
- {
- try
- {
- encoded = m.encode(buffer, 0, buffer.length);
- break;
- } catch (java.nio.BufferOverflowException e) {
- buffer = new byte[buffer.length*2];
- }
- }
- entry.setEncodedMsg( buffer, encoded );
- }
- finally
- {
- restoreMessage(m);
- }
-
- Sender sender = getLink(address, new SenderFinder(address.getName()));
- pumpOut(m.getAddress(), sender);
- }
-
- private void reclaimLink(Link link)
- {
- if (link instanceof Receiver)
- {
- int credit = link.getCredit();
- if (credit > 0)
- {
- _credit += credit;
- _distributed -= credit;
- }
- }
-
- Delivery delivery = link.head();
- while (delivery != null)
- {
- StoreEntry entry = (StoreEntry) delivery.getContext();
- if (entry != null)
- {
- entry.setDelivery(null);
- if (delivery.isBuffered()) {
- entry.setStatus(Status.ABORTED);
- }
- }
- delivery = delivery.next();
- }
- linkRemoved(link);
- }
-
- private int pumpOut( String address, Sender sender )
- {
- StoreEntry entry = _outgoingStore.get( address );
- if (entry == null) {
- sender.drained();
- return 0;
- }
-
- byte[] tag = String.valueOf(_nextTag++).getBytes();
- Delivery delivery = sender.delivery(tag);
- entry.setDelivery( delivery );
- _logger.log(Level.FINE, "Sending on delivery: " + delivery);
- int n = sender.send( entry.getEncodedMsg(), 0, entry.getEncodedLength());
- if (n < 0) {
- _outgoingStore.freeEntry( entry );
- _logger.log(Level.WARNING, "Send error: " + n);
- return n;
- } else {
- sender.advance();
- _outgoingStore.freeEntry( entry );
- return 0;
- }
- }
-
- public void send() throws TimeoutException
- {
- send(-1);
- }
-
- public void send(int n) throws TimeoutException
- {
- if (_driver == null) {
- throw new IllegalStateException("cannot send while messenger is stopped");
- }
-
- if(_logger.isLoggable(Level.FINE))
- {
- _logger.fine(this + " about to send");
- }
-
- if (n == -1)
- _sendThreshold = 0;
- else
- {
- _sendThreshold = outgoing() - n;
- if (_sendThreshold < 0)
- _sendThreshold = 0;
- }
-
- waitUntil(_sentSettled);
- }
-
- public void recv(int n) throws TimeoutException
- {
- if (_driver == null) {
- throw new IllegalStateException("cannot recv while messenger is stopped");
- }
-
- if (_logger.isLoggable(Level.FINE) && n != -1)
- {
- _logger.fine(this + " about to wait for up to " + n + " messages to be received");
- }
-
- if (n == -1)
- {
- _credit_mode = LinkCreditMode.LINK_CREDIT_AUTO;
- }
- else
- {
- _credit_mode = LinkCreditMode.LINK_CREDIT_EXPLICIT;
- if (n > _distributed)
- _credit = n - _distributed;
- else // cancel unallocated
- _credit = 0;
- }
-
- distributeCredit();
-
- waitUntil(_messageAvailable);
- }
-
- public void recv() throws TimeoutException
- {
- recv(-1);
- }
-
- public int receiving()
- {
- return _credit + _distributed;
- }
-
- public Message get()
- {
- StoreEntry entry = _incomingStore.get( null );
- if (entry != null)
- {
- Message message = Proton.message();
- message.decode( entry.getEncodedMsg(), 0, entry.getEncodedLength() );
-
- _incomingTracker = new TrackerImpl(TrackerImpl.Type.INCOMING,
- _incomingStore.trackEntry(entry));
-
- _incomingStore.freeEntry( entry );
- return message;
- }
- return null;
- }
-
- private int pumpIn(String address, Receiver receiver)
- {
- Delivery delivery = receiver.current();
- if (delivery.isReadable() && !delivery.isPartial())
- {
- StoreEntry entry = _incomingStore.put( address );
- entry.setDelivery( delivery );
-
- _logger.log(Level.FINE, "Readable delivery found: " + delivery);
-
- int size = delivery.pending();
- byte[] buffer = new byte[size];
- int read = receiver.recv( buffer, 0, buffer.length );
- if (read != size) {
- throw new IllegalStateException();
- }
- entry.setEncodedMsg( buffer, size );
- receiver.advance();
-
- // account for the used credit, replenish if
- // low (< 20% maximum per-link batch) and
- // extra credit available
- assert(_distributed > 0);
- _distributed--;
- if (!receiver.getDrain() && _blocked.isEmpty() && _credit > 0)
- {
- final int max = perLinkCredit();
- final int lo_thresh = (int)(max * 0.2 + 0.5);
- if (receiver.getRemoteCredit() < lo_thresh)
- {
- final int more = Math.min(_credit, max - receiver.getRemoteCredit());
- _credit -= more;
- _distributed += more;
- receiver.flow(more);
- }
- }
- // check if blocked
- if (receiver.getRemoteCredit() == 0 && _credited.contains(receiver))
- {
- _credited.remove(receiver);
- if (receiver.getDrain())
- {
- receiver.setDrain(false);
- assert( _draining > 0 );
- _draining--;
- }
- _blocked.add(receiver);
- }
- }
- return 0;
- }
-
- public void subscribe(String source) throws MessengerException
- {
- if (_driver == null) {
- throw new IllegalStateException("messenger is stopped");
- }
-
- String routed = routeAddress(source);
- Address address = new Address(routed);
-
- String hostName = address.getHost();
- if (hostName == null) throw new MessengerException("Invalid address (hostname cannot be null): " + routed);
- int port = Integer.valueOf(address.getImpliedPort());
- if (address.isPassive())
- {
- if(_logger.isLoggable(Level.FINE))
- {
- _logger.fine(this + " about to subscribe to source " + source + " using address " + hostName + ":" + port);
- }
- ListenerContext ctx = new ListenerContext(address);
- _driver.createListener(hostName, port, ctx);
- }
- else
- {
- if(_logger.isLoggable(Level.FINE))
- {
- _logger.fine(this + " about to subscribe to source " + source);
- }
- getLink(address, new ReceiverFinder(address.getName()));
- }
- }
-
- public int outgoing()
- {
- return _outgoingStore.size() + queued(true);
- }
-
- public int incoming()
- {
- return _incomingStore.size() + queued(false);
- }
-
- public int getIncomingWindow()
- {
- return _incomingStore.getWindow();
- }
-
- public void setIncomingWindow(int window)
- {
- _incomingStore.setWindow(window);
- }
-
- public int getOutgoingWindow()
- {
- return _outgoingStore.getWindow();
- }
-
- public void setOutgoingWindow(int window)
- {
- _outgoingStore.setWindow(window);
- }
-
- public Tracker incomingTracker()
- {
- return _incomingTracker;
- }
- public Tracker outgoingTracker()
- {
- return _outgoingTracker;
- }
-
- private Store getTrackerStore(Tracker tracker)
- {
- return ((TrackerImpl)tracker).isOutgoing() ? _outgoingStore : _incomingStore;
- }
-
- @Override
- public void reject(Tracker tracker, int flags)
- {
- int id = ((TrackerImpl)tracker).getSequence();
- getTrackerStore(tracker).update(id, Status.REJECTED, flags, false, false);
- }
-
- @Override
- public void accept(Tracker tracker, int flags)
- {
- int id = ((TrackerImpl)tracker).getSequence();
- getTrackerStore(tracker).update(id, Status.ACCEPTED, flags, false, false);
- }
-
- @Override
- public void settle(Tracker tracker, int flags)
- {
- int id = ((TrackerImpl)tracker).getSequence();
- getTrackerStore(tracker).update(id, Status.UNKNOWN, flags, true, true);
- }
-
- public Status getStatus(Tracker tracker)
- {
- int id = ((TrackerImpl)tracker).getSequence();
- StoreEntry e = getTrackerStore(tracker).getEntry(id);
- if (e != null)
- {
- return e.getStatus();
- }
- return Status.UNKNOWN;
- }
-
- @Override
- public void route(String pattern, String address)
- {
- _routes.rule(pattern, address);
- }
-
- @Override
- public void rewrite(String pattern, String address)
- {
- _rewrites.rule(pattern, address);
- }
-
- private int queued(boolean outgoing)
- {
- int count = 0;
- if (_driver != null) {
- for (Connector<?> c : _driver.connectors())
- {
- Connection connection = c.getConnection();
- for (Link link : new Links(connection, ACTIVE, ANY))
- {
- if (outgoing)
- {
- if (link instanceof Sender) count += link.getQueued();
- }
- else
- {
- if (link instanceof Receiver) count += link.getQueued();
- }
- }
- }
- }
- return count;
- }
-
- private void bringDestruction()
- {
- for (Connector<?> c : _awaitingDestruction)
- {
- c.destroy();
- }
- _awaitingDestruction.clear();
- }
-
- private void processAllConnectors()
- {
- distributeCredit();
- for (Connector<?> c : _driver.connectors())
- {
- processEndpoints(c);
- try
- {
- if (c.process()) {
- _worked = true;
- }
- }
- catch (IOException e)
- {
- _logger.log(Level.SEVERE, "Error processing connection", e);
- }
- }
- bringDestruction();
- distributeCredit();
- }
-
- private void processActive()
- {
- //process active listeners
- for (Listener<?> l = _driver.listener(); l != null; l = _driver.listener())
- {
- _worked = true;
- Connector<?> c = l.accept();
- Connection connection = Proton.connection();
- connection.setContainer(_name);
- ListenerContext ctx = (ListenerContext) l.getContext();
- connection.setContext(new ConnectionContext(ctx.getAddress(), c));
- c.setConnection(connection);
- Transport transport = c.getTransport();
- //TODO: full SASL
- Sasl sasl = c.sasl();
- if (sasl != null)
- {
- sasl.server();
- sasl.setMechanisms(new String[]{"ANONYMOUS"});
- sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
- }
- transport.ssl(ctx.getDomain());
- connection.open();
- }
- // process connectors, reclaiming credit on closed connectors
- for (Connector<?> c = _driver.connector(); c != null; c = _driver.connector())
- {
- _worked = true;
- if (c.isClosed())
- {
- _awaitingDestruction.add(c);
- reclaimCredit(c.getConnection());
- }
- else
- {
- _logger.log(Level.FINE, "Processing active connector " + c);
- try
- {
- c.process();
- processEndpoints(c);
- c.process();
- }
- catch (IOException e)
- {
- _logger.log(Level.SEVERE, "Error processing connection", e);
- }
- }
- }
- bringDestruction();
- distributeCredit();
- }
-
- private void processEndpoints(Connector c)
- {
- Connection connection = c.getConnection();
-
- if (connection.getLocalState() == EndpointState.UNINITIALIZED)
- {
- connection.open();
- }
-
- Delivery delivery = connection.getWorkHead();
- while (delivery != null)
- {
- Link link = delivery.getLink();
- if (delivery.isUpdated())
- {
- if (link instanceof Sender)
- {
- delivery.disposition(delivery.getRemoteState());
- }
- StoreEntry e = (StoreEntry) delivery.getContext();
- if (e != null) e.updated();
- }
-
- if (delivery.isReadable())
- {
- pumpIn( link.getSource().getAddress(), (Receiver)link );
- }
-
- Delivery next = delivery.getWorkNext();
- delivery.clear();
- delivery = next;
- }
-
- for (Session session : new Sessions(connection, UNINIT, ANY))
- {
- session.open();
- _logger.log(Level.FINE, "Opened session " + session);
- }
- for (Link link : new Links(connection, UNINIT, ANY))
- {
- //TODO: the following is not correct; should only copy those properties that we understand
- //TODO: is this any better:
- if (link.getRemoteSource() != null) {
- link.setSource(link.getRemoteSource().copy());
- }
- if (link.getRemoteTarget() != null) {
- link.setTarget(link.getRemoteTarget().copy());
- }
- linkAdded(link);
- link.open();
- _logger.log(Level.FINE, "Opened link " + link);
- }
-
- distributeCredit();
-
- for (Link link : new Links(connection, ACTIVE, ACTIVE))
- {
- if (link instanceof Sender)
- {
- pumpOut(link.getTarget().getAddress(), (Sender)link);
- }
- }
-
- for (Session session : new Sessions(connection, ACTIVE, CLOSED))
- {
- session.close();
- }
-
- for (Link link : new Links(connection, ANY, CLOSED))
- {
- if (link.getLocalState() == EndpointState.ACTIVE)
- {
- link.close();
- }
- else
- {
- reclaimLink(link);
- }
- }
-
- if (connection.getRemoteState() == EndpointState.CLOSED)
- {
- if (connection.getLocalState() == EndpointState.ACTIVE)
- {
- connection.close();
- }
- }
- }
-
- private boolean waitUntil(Predicate condition) throws TimeoutException
- {
- if (_blocking) {
- boolean done = waitUntil(condition, _timeout);
- if (!done) {
- _logger.log(Level.SEVERE, String.format
- ("Timeout when waiting for condition %s after %s ms",
- condition, _timeout));
- throw new TimeoutException();
- }
- return done;
- } else {
- return waitUntil(condition, 0);
- }
- }
-
- private boolean waitUntil(Predicate condition, long timeout)
- {
- if (_driver == null) {
- throw new IllegalStateException("cannot wait while messenger is stopped");
- }
-
- processAllConnectors();
-
- // wait until timeout expires or until test is true
- long now = System.currentTimeMillis();
- final long deadline = timeout < 0 ? Long.MAX_VALUE : now + timeout;
- boolean done = false;
-
- while (true)
- {
- done = condition.test();
- if (done) break;
-
- long remaining;
- if (timeout < 0)
- remaining = -1;
- else {
- remaining = deadline - now;
- if (remaining < 0) break;
- }
-
- // Update the credit scheduler. If the scheduler detects
- // credit imbalance on the links, wake up in time to
- // service credit drain
- distributeCredit();
- if (_next_drain != 0)
- {
- long wakeup = (_next_drain > now) ? _next_drain - now : 0;
- remaining = (remaining == -1) ? wakeup : Math.min(remaining, wakeup);
- }
-
- boolean woken;
- woken = _driver.doWait(remaining);
- processActive();
- if (woken) {
- throw new InterruptException();
- }
- now = System.currentTimeMillis();
- }
-
- return done;
- }
-
- private Connection lookup(Address address)
- {
- for (Connector<?> c : _driver.connectors())
- {
- Connection connection = c.getConnection();
- ConnectionContext ctx = (ConnectionContext) connection.getContext();
- if (ctx.matches(address))
- {
- return connection;
- }
- }
- return null;
- }
-
- private void reclaimCredit(Connection connection)
- {
- for (Link link : new Links(connection, ANY, ANY))
- {
- reclaimLink(link);
- }
- }
-
- private void distributeCredit()
- {
- if (_receivers == 0) return;
-
- if (_credit_mode == LinkCreditMode.LINK_CREDIT_AUTO)
- {
- // replenish, but limit the max total messages buffered
- final int max = _receivers * _credit_batch;
- final int used = _distributed + incoming();
- if (max > used)
- _credit = max - used;
- }
-
- // reclaim any credit left over after draining links has completed
- if (_draining > 0)
- {
- Iterator<Receiver> itr = _credited.iterator();
- while (itr.hasNext())
- {
- Receiver link = (Receiver) itr.next();
- if (link.getDrain())
- {
- if (!link.draining())
- {
- // drain completed for this link
- int drained = link.drained();
- assert(_distributed >= drained);
- _distributed -= drained;
- _credit += drained;
- link.setDrain(false);
- _draining--;
- itr.remove();
- _blocked.add(link);
- }
- }
- }
- }
-
- // distribute available credit to blocked links
- final int batch = perLinkCredit();
- while (_credit > 0 && !_blocked.isEmpty())
- {
- Receiver link = _blocked.get(0);
- _blocked.remove(0);
-
- final int more = Math.min(_credit, batch);
- _distributed += more;
- _credit -= more;
-
- link.flow(more);
- _credited.add(link);
-
- // flow changed, must process it
- ConnectionContext ctx = (ConnectionContext) link.getSession().getConnection().getContext();
- try
- {
- ctx.getConnector().process();
- } catch (IOException e) {
- _logger.log(Level.SEVERE, "Error processing connection", e);
- }
- }
-
- if (_blocked.isEmpty())
- {
- _next_drain = 0;
- }
- else
- {
- // not enough credit for all links - start draining granted credit
- if (_draining == 0)
- {
- // don't do it too often - pace ourselves (it's expensive)
- if (_next_drain == 0)
- {
- _next_drain = System.currentTimeMillis() + 250;
- }
- else if (_next_drain <= System.currentTimeMillis())
- {
- // initiate drain, free up at most enough to satisfy blocked
- _next_drain = 0;
- int needed = _blocked.size() * batch;
-
- for (Receiver link : _credited)
- {
- if (!link.getDrain()) {
- link.setDrain(true);
- needed -= link.getRemoteCredit();
- _draining++;
- // drain requested on link, must process it
- ConnectionContext ctx = (ConnectionContext) link.getSession().getConnection().getContext();
- try
- {
- ctx.getConnector().process();
- } catch (IOException e) {
- _logger.log(Level.SEVERE, "Error processing connection", e);
- }
- if (needed <= 0) break;
- }
- }
- }
- }
- }
- }
-
- private interface Predicate
- {
- boolean test();
- }
-
- private class SentSettled implements Predicate
- {
- public boolean test()
- {
- //are all sent messages settled?
- int total = _outgoingStore.size();
-
- for (Connector<?> c : _driver.connectors())
- {
- // TBD
- // check if transport is done generating output
- // pn_transport_t *transport = pn_connector_transport(ctor);
- // if (transport) {
- // if (!pn_transport_quiesced(transport)) {
- // pn_connector_process(ctor);
- // return false;
- // }
- // }
-
- Connection connection = c.getConnection();
- for (Link link : new Links(connection, ACTIVE, ANY))
- {
- if (link instanceof Sender)
- {
- total += link.getQueued();
- }
- }
-
- // TBD: there is no per-link unsettled
- // deliveries iterator, so for now get the
- // deliveries by walking the outgoing trackers
- Iterator<StoreEntry> entries = _outgoingStore.trackedEntries();
- while (entries.hasNext() && total <= _sendThreshold)
- {
- StoreEntry e = (StoreEntry) entries.next();
- if (e != null )
- {
- Delivery d = e.getDelivery();
- if (d != null)
- {
- if (d.getRemoteState() == null && !d.remotelySettled())
- {
- total++;
- }
- }
- }
- }
- }
- return total <= _sendThreshold;
- }
- }
-
- private class MessageAvailable implements Predicate
- {
- public boolean test()
- {
- //do we have at least one pending message?
- if (_incomingStore.size() > 0) return true;
- for (Connector<?> c : _driver.connectors())
- {
- Connection connection = c.getConnection();
- Delivery delivery = connection.getWorkHead();
- while (delivery != null)
- {
- if (delivery.isReadable() && !delivery.isPartial())
- {
- return true;
- }
- else
- {
- delivery = delivery.getWorkNext();
- }
- }
- }
- // if no connections, or not listening, exit as there won't ever be a message
- if (!_driver.listeners().iterator().hasNext() && !_driver.connectors().iterator().hasNext())
- return true;
-
- return false;
- }
- }
-
- private class AllClosed implements Predicate
- {
- public boolean test()
- {
- if (_driver == null) {
- return true;
- }
-
- for (Connector<?> c : _driver.connectors()) {
- if (!c.isClosed()) {
- return false;
- }
- }
-
- _driver.destroy();
- _driver = null;
-
- return true;
- }
- }
-
- private boolean _worked = false;
-
- private class WorkPred implements Predicate
- {
- public boolean test()
- {
- return _worked;
- }
- }
-
- private final SentSettled _sentSettled = new SentSettled();
- private final MessageAvailable _messageAvailable = new MessageAvailable();
- private final AllClosed _allClosed = new AllClosed();
- private final WorkPred _workPred = new WorkPred();
-
- private interface LinkFinder<C extends Link>
- {
- C test(Link link);
- C create(Session session);
- }
-
- private class SenderFinder implements LinkFinder<Sender>
- {
- private final String _path;
-
- SenderFinder(String path)
- {
- _path = path == null ? "" : path;
- }
-
- public Sender test(Link link)
- {
- if (link instanceof Sender && matchTarget((Target) link.getTarget(), _path))
- {
- return (Sender) link;
- }
- else
- {
- return null;
- }
- }
-
- public Sender create(Session session)
- {
- Sender sender = session.sender(_path);
- Target target = new Target();
- target.setAddress(_path);
- sender.setTarget(target);
- // the C implemenation does this:
- Source source = new Source();
- source.setAddress(_path);
- sender.setSource(source);
- if (getOutgoingWindow() > 0)
- {
- // use explicit settlement via dispositions (not pre-settled)
- sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
- sender.setReceiverSettleMode(ReceiverSettleMode.SECOND); // desired
- }
- return sender;
- }
- }
-
- private class ReceiverFinder implements LinkFinder<Receiver>
- {
- private final String _path;
-
- ReceiverFinder(String path)
- {
- _path = path == null ? "" : path;
- }
-
- public Receiver test(Link link)
- {
- if (link instanceof Receiver && matchSource((Source) link.getSource(), _path))
- {
- return (Receiver) link;
- }
- else
- {
- return null;
- }
- }
-
- public Receiver create(Session session)
- {
- Receiver receiver = session.receiver(_path);
- Source source = new Source();
- source.setAddress(_path);
- receiver.setSource(source);
- // the C implemenation does this:
- Target target = new Target();
- target.setAddress(_path);
- receiver.setTarget(target);
- if (getIncomingWindow() > 0)
- {
- // use explicit settlement via dispositions (not pre-settled)
- receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED); // desired
- receiver.setReceiverSettleMode(ReceiverSettleMode.SECOND);
- }
- return receiver;
- }
- }
-
- private <C extends Link> C getLink(Address address, LinkFinder<C> finder)
- {
- Connection connection = lookup(address);
- if (connection == null)
- {
- String host = address.getHost();
- int port = Integer.valueOf(address.getImpliedPort());
- Connector<?> connector = _driver.createConnector(host, port, null);
- _logger.log(Level.FINE, "Connecting to " + host + ":" + port);
- connection = Proton.connection();
- connection.setContainer(_name);
- connection.setHostname(host);
- connection.setContext(new ConnectionContext(address, connector));
- connector.setConnection(connection);
- Sasl sasl = connector.sasl();
- if (sasl != null)
- {
- sasl.client();
- sasl.setMechanisms(new String[]{"ANONYMOUS"});
- }
- if ("amqps".equalsIgnoreCase(address.getScheme())) {
- Transport transport = connector.getTransport();
- SslDomain domain = makeDomain(address, SslDomain.Mode.CLIENT);
- if (_trustedDb != null) {
- domain.setPeerAuthentication(SslDomain.VerifyMode.VERIFY_PEER);
- //domain.setPeerAuthentication(SslDomain.VerifyMode.VERIFY_PEER_NAME);
- } else {
- domain.setPeerAuthentication(SslDomain.VerifyMode.ANONYMOUS_PEER);
- }
- Ssl ssl = transport.ssl(domain);
- //ssl.setPeerHostname(host);
- }
- connection.open();
- }
-
- for (Link link : new Links(connection, ACTIVE, ANY))
- {
- C result = finder.test(link);
- if (result != null) return result;
- }
- Session session = connection.session();
- session.open();
- C link = finder.create(session);
- linkAdded(link);
- link.open();
- return link;
- }
-
- private static class Links implements Iterable<Link>
- {
- private final Connection _connection;
- private final EnumSet<EndpointState> _local;
- private final EnumSet<EndpointState> _remote;
-
- Links(Connection connection, EnumSet<EndpointState> local, EnumSet<EndpointState> remote)
- {
- _connection = connection;
- _local = local;
- _remote = remote;
- }
-
- public java.util.Iterator<Link> iterator()
- {
- return new LinkIterator(_connection, _local, _remote);
- }
- }
-
- private static class LinkIterator implements java.util.Iterator<Link>
- {
- private final EnumSet<EndpointState> _local;
- private final EnumSet<EndpointState> _remote;
- private Link _next;
-
- LinkIterator(Connection connection, EnumSet<EndpointState> local, EnumSet<EndpointState> remote)
- {
- _local = local;
- _remote = remote;
- _next = connection.linkHead(_local, _remote);
- }
-
- public boolean hasNext()
- {
- return _next != null;
- }
-
- public Link next()
- {
- try
- {
- return _next;
- }
- finally
- {
- _next = _next.next(_local, _remote);
- }
- }
-
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
- }
-
- private static class Sessions implements Iterable<Session>
- {
- private final Connection _connection;
- private final EnumSet<EndpointState> _local;
- private final EnumSet<EndpointState> _remote;
-
- Sessions(Connection connection, EnumSet<EndpointState> local, EnumSet<EndpointState> remote)
- {
- _connection = connection;
- _local = local;
- _remote = remote;
- }
-
- public java.util.Iterator<Session> iterator()
- {
- return new SessionIterator(_connection, _local, _remote);
- }
- }
-
- private static class SessionIterator implements java.util.Iterator<Session>
- {
- private final EnumSet<EndpointState> _local;
- private final EnumSet<EndpointState> _remote;
- private Session _next;
-
- SessionIterator(Connection connection, EnumSet<EndpointState> local, EnumSet<EndpointState> remote)
- {
- _local = local;
- _remote = remote;
- _next = connection.sessionHead(_local, _remote);
- }
-
- public boolean hasNext()
- {
- return _next != null;
- }
-
- public Session next()
- {
- try
- {
- return _next;
- }
- finally
- {
- _next = _next.next(_local, _remote);
- }
- }
-
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
- }
-
- private void adjustReplyTo(Message m)
- {
- String original = m.getReplyTo();
- if (original != null) {
- if (original.startsWith("~/"))
- {
- m.setReplyTo("amqp://" + _name + "/" + original.substring(2));
- }
- else if (original.equals("~"))
- {
- m.setReplyTo("amqp://" + _name);
- }
- }
- }
-
- private static boolean matchTarget(Target target, String path)
- {
- if (target == null) return path.isEmpty();
- else return path.equals(target.getAddress());
- }
-
- private static boolean matchSource(Source source, String path)
- {
- if (source == null) return path.isEmpty();
- else return path.equals(source.getAddress());
- }
-
- @Override
- public String toString()
- {
- StringBuilder builder = new StringBuilder();
- builder.append("MessengerImpl [_name=").append(_name).append("]");
- return builder.toString();
- }
-
- // compute the maximum amount of credit each receiving link is
- // entitled to. The actual credit given to the link depends on
- // what amount of credit is actually available.
- private int perLinkCredit()
- {
- if (_receivers == 0) return 0;
- int total = _credit + _distributed;
- return Math.max(total/_receivers, 1);
- }
-
- // a new link has been created, account for it.
- private void linkAdded(Link link)
- {
- if (link instanceof Receiver)
- {
- _receivers++;
- _blocked.add((Receiver)link);
- link.setContext(Boolean.TRUE);
- }
- }
-
- // a link is being removed, account for it.
- private void linkRemoved(Link _link)
- {
- if (_link instanceof Receiver && (Boolean) _link.getContext())
- {
- _link.setContext(Boolean.FALSE);
- Receiver link = (Receiver)_link;
- assert _receivers > 0;
- _receivers--;
- if (link.getDrain())
- {
- link.setDrain(false);
- assert _draining > 0;
- _draining--;
- }
- if (_blocked.contains(link))
- _blocked.remove(link);
- else if (_credited.contains(link))
- _credited.remove(link);
- else
- assert(false);
- }
- }
-
- private static class ConnectionContext
- {
- private Address _address;
- private Connector _connector;
-
- public ConnectionContext(Address address, Connector connector)
- {
- _address = address;
- _connector = connector;
- }
-
- public Address getAddress()
- {
- return _address;
- }
-
- public boolean matches(Address address)
- {
- String host = address.getHost();
- String port = address.getImpliedPort();
- Connection conn = _connector.getConnection();
- return host.equals(conn.getRemoteContainer()) ||
- (_address.getHost().equals(host) && _address.getImpliedPort().equals(port));
- }
-
- public Connector getConnector()
- {
- return _connector;
- }
- }
-
- private SslDomain makeDomain(Address address, SslDomain.Mode mode)
- {
- SslDomain domain = Proton.sslDomain();
- domain.init(mode);
- if (_certificate != null) {
- domain.setCredentials(_certificate, _privateKey, _password);
- }
- if (_trustedDb != null) {
- domain.setTrustedCaDb(_trustedDb);
- }
-
- if ("amqps".equalsIgnoreCase(address.getScheme())) {
- domain.allowUnsecuredClient(false);
- } else {
- domain.allowUnsecuredClient(true);
- }
-
- return domain;
- }
-
-
- private class ListenerContext
- {
- private Address _address;
- private SslDomain _domain;
-
- public ListenerContext(Address address)
- {
- _address = address;
- _domain = makeDomain(address, SslDomain.Mode.SERVER);
- }
-
- public SslDomain getDomain()
- {
- return _domain;
- }
-
- public Address getAddress()
- {
- return _address;
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2afe2ec0/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Store.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Store.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Store.java
deleted file mode 100644
index b60e8ed..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Store.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.qpid.proton.messenger.impl;
-
-import java.util.List;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.Collection;
-import java.util.Iterator;
-
-import org.apache.qpid.proton.messenger.Status;
-import org.apache.qpid.proton.messenger.Messenger;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.amqp.messaging.Accepted;
-import org.apache.qpid.proton.amqp.messaging.Rejected;
-
-/**
- * @deprecated Messenger will be removed from upcoming proton-j releases.
- */
-class Store
-{
- private static final Accepted ACCEPTED = Accepted.getInstance();
- private static final Rejected REJECTED = new Rejected();
-
- private LinkedList<StoreEntry> _store = new LinkedList<StoreEntry>();
- private HashMap<String, LinkedList<StoreEntry>> _stream = new HashMap<String, LinkedList<StoreEntry>>();
-
- // for incoming/outgoing window tracking
- int _window;
- int _lwm;
- int _hwm;
- private HashMap<Integer, StoreEntry> _tracked = new HashMap<Integer, StoreEntry>();
-
- Store()
- {
- }
-
- private boolean isTracking( Integer id )
- {
- return id != null && (id.intValue() - _lwm >= 0) && (_hwm - id.intValue() > 0);
- }
-
- int size()
- {
- return _store.size();
- }
-
- int getWindow()
- {
- return _window;
- }
-
- void setWindow(int window)
- {
- _window = window;
- }
-
- StoreEntry put(String address)
- {
- if (address == null) address = "";
- StoreEntry entry = new StoreEntry(this, address);
- _store.add( entry );
- LinkedList<StoreEntry> list = _stream.get( address );
- if (list != null) {
- list.add( entry );
- } else {
- list = new LinkedList<StoreEntry>();
- list.add( entry );
- _stream.put( address, list );
- }
- entry.stored();
- return entry;
- }
-
- StoreEntry get(String address)
- {
- if (address != null) {
- LinkedList<StoreEntry> list = _stream.get( address );
- if (list != null) return list.peekFirst();
- } else {
- return _store.peekFirst();
- }
- return null;
- }
-
- StoreEntry getEntry(int id)
- {
- return _tracked.get(id);
- }
-
- Iterator<StoreEntry> trackedEntries()
- {
- return _tracked.values().iterator();
- }
-
- void freeEntry(StoreEntry entry)
- {
- if (entry.isStored()) {
- _store.remove( entry );
- LinkedList<StoreEntry> list = _stream.get( entry.getAddress() );
- if (list != null) list.remove( entry );
- entry.notStored();
- }
- // note well: may still be in _tracked map if still in window!
- }
-
- public int trackEntry(StoreEntry entry)
- {
- assert( entry.getStore() == this );
- entry.setId(_hwm++);
- _tracked.put(entry.getId(), entry);
- slideWindow();
- return entry.getId();
- }
-
- private void slideWindow()
- {
- if (_window >= 0)
- {
- while (_hwm - _lwm > _window)
- {
- StoreEntry old = getEntry(_lwm);
- if (old != null)
- {
- _tracked.remove( old.getId() );
- Delivery d = old.getDelivery();
- if (d != null) {
- if (d.getLocalState() == null)
- d.disposition(ACCEPTED);
- d.settle();
- }
- }
- _lwm++;
- }
- }
- }
-
- int update(int id, Status status, int flags, boolean settle, boolean match )
- {
- if (!isTracking(id)) return 0;
-
- int start = (Messenger.CUMULATIVE & flags) != 0 ? _lwm : id;
- for (int i = start; (id - i) >= 0; i++)
- {
- StoreEntry e = getEntry(i);
- if (e != null)
- {
- Delivery d = e.getDelivery();
- if (d != null)
- {
- if (d.getLocalState() == null)
- {
- if (match)
- {
- d.disposition(d.getRemoteState());
- }
- else
- {
- switch (status)
- {
- case ACCEPTED:
- d.disposition(ACCEPTED);
- break;
- case REJECTED:
- d.disposition(REJECTED);
- break;
- default:
- break;
- }
- }
- e.updated();
- }
- }
- if (settle)
- {
- if (d != null)
- {
- d.settle();
- }
- _tracked.remove(e.getId());
- }
- }
- }
-
- while (_hwm - _lwm > 0 && !_tracked.containsKey(_lwm))
- {
- _lwm++;
- }
-
- return 0;
- }
-}
-
-
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2afe2ec0/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/StoreEntry.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/StoreEntry.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/StoreEntry.java
deleted file mode 100644
index 1687b94..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/StoreEntry.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.qpid.proton.messenger.impl;
-
-import org.apache.qpid.proton.messenger.Tracker;
-import org.apache.qpid.proton.messenger.Status;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.amqp.messaging.Accepted;
-import org.apache.qpid.proton.amqp.messaging.Modified;
-import org.apache.qpid.proton.amqp.messaging.Rejected;
-import org.apache.qpid.proton.amqp.messaging.Released;
-import org.apache.qpid.proton.amqp.messaging.Received;
-import org.apache.qpid.proton.amqp.transport.DeliveryState;
-
-/**
- * @deprecated Messenger will be removed from upcoming proton-j releases.
- */
-class StoreEntry
-{
- private Store _store;
- private Integer _id;
- private String _address;
- private byte[] _encodedMsg;
- private int _encodedLength;
- private Delivery _delivery;
- private Status _status = Status.UNKNOWN;
- private Object _context;
- private boolean _inStore = false;
-
- public StoreEntry(Store store, String address)
- {
- _store = store;
- _address = address;
- }
-
- public Store getStore()
- {
- return _store;
- }
-
- public boolean isStored()
- {
- return _inStore;
- }
-
- public void stored()
- {
- _inStore = true;
- }
-
- public void notStored()
- {
- _inStore = false;
- }
-
- public String getAddress()
- {
- return _address;
- }
-
- public byte[] getEncodedMsg()
- {
- return _encodedMsg;
- }
-
- public int getEncodedLength()
- {
- return _encodedLength;
- }
-
- public void setEncodedMsg( byte[] encodedMsg, int length )
- {
- _encodedMsg = encodedMsg;
- _encodedLength = length;
- }
-
- public void setId(int id)
- {
- _id = new Integer(id);
- }
-
- public Integer getId()
- {
- return _id;
- }
-
- public void setDelivery( Delivery d )
- {
- if (_delivery != null)
- {
- _delivery.setContext(null);
- }
- _delivery = d;
- if (_delivery != null)
- {
- _delivery.setContext(this);
- }
- updated();
- }
-
- public Delivery getDelivery()
- {
- return _delivery;
- }
-
- public Status getStatus()
- {
- return _status;
- }
-
- public void setStatus(Status status)
- {
- _status = status;
- }
-
- private static Status _disp2status(DeliveryState disp)
- {
- if (disp == null) return Status.PENDING;
-
- if (disp instanceof Received)
- return Status.PENDING;
- if (disp instanceof Accepted)
- return Status.ACCEPTED;
- if (disp instanceof Rejected)
- return Status.REJECTED;
- if (disp instanceof Released)
- return Status.RELEASED;
- if (disp instanceof Modified)
- return Status.MODIFIED;
- assert(false);
- return null;
- }
-
- public void updated()
- {
- if (_delivery != null)
- {
- if (_delivery.getRemoteState() != null)
- {
- _status = _disp2status(_delivery.getRemoteState());
- }
- else if (_delivery.remotelySettled())
- {
- DeliveryState disp = _delivery.getLocalState();
- if (disp == null) {
- _status = Status.SETTLED;
- } else {
- _status = _disp2status(_delivery.getLocalState());
- }
- }
- else
- {
- _status = Status.PENDING;
- }
- }
- }
-
- public void setContext(Object context)
- {
- _context = context;
- }
-
- public Object getContext()
- {
- return _context;
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2afe2ec0/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerImpl.java
deleted file mode 100644
index 2d8b584..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/TrackerImpl.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.qpid.proton.messenger.impl;
-
-import org.apache.qpid.proton.messenger.Tracker;
-
-/**
- * @deprecated Messenger will be removed from upcoming proton-j releases.
- */
-class TrackerImpl implements Tracker
-{
- public enum Type {
- OUTGOING,
- INCOMING
- }
-
- private Type _type;
- private int _sequence;
-
- TrackerImpl(Type type, int sequence)
- {
- _type = type;
- _sequence = sequence;
- }
-
- boolean isOutgoing()
- {
- return _type == Type.OUTGOING;
- }
-
- int getSequence()
- {
- return _sequence;
- }
-
- public String toString()
- {
- return (isOutgoing() ? "O:" : "I:") + Integer.toString(_sequence);
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2afe2ec0/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Transform.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Transform.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Transform.java
deleted file mode 100644
index c3a08ea..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Transform.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.messenger.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-
-/**
- * Transform
- *
- * @deprecated Messenger will be removed from upcoming proton-j releases.
- */
-class Transform
-{
-
- private static class Rule {
-
- String _pattern;
- String _substitution;
-
- Pattern _compiled;
- StringBuilder _sb = new StringBuilder();
- boolean _matched = false;
- String _result = null;
-
- Rule(String pattern, String substitution)
- {
- _pattern = pattern;
- _substitution = substitution;
- _compiled = Pattern.compile(_pattern.replace("*", "(.*)").replace("%", "([^/]*)"));
- }
-
- boolean apply(String src) {
- _matched = false;
- _result = null;
- Matcher m = _compiled.matcher(src);
- if (m.matches()) {
- _matched = true;
- if (_substitution != null) {
- _sb.setLength(0);
- int limit = _substitution.length();
- int idx = 0;
- while (idx < limit) {
- char c = _substitution.charAt(idx);
- switch (c) {
- case '$':
- idx++;
- if (idx < limit) {
- c = _substitution.charAt(idx);
- } else {
- throw new IllegalStateException("substition index truncated");
- }
-
- if (c == '$') {
- _sb.append(c);
- idx++;
- } else {
- int num = 0;
- while (Character.isDigit(c)) {
- num *= 10;
- num += c - '0';
- idx++;
- c = idx < limit ? _substitution.charAt(idx) : '\0';
- }
- if (num > 0) {
- _sb.append(m.group(num));
- } else {
- throw new IllegalStateException
- ("bad substitution index at character[" +
- idx + "]: " + _substitution);
- }
- }
- break;
- default:
- _sb.append(c);
- idx++;
- break;
- }
- }
- _result = _sb.toString();
- }
- }
-
- return _matched;
- }
-
- boolean matched() {
- return _matched;
- }
-
- String result() {
- return _result;
- }
-
- }
-
- private List<Rule> _rules = new ArrayList<Rule>();
- private Rule _matched = null;
-
- public void rule(String pattern, String substitution)
- {
- _rules.add(new Rule(pattern, substitution));
- }
-
- public boolean apply(String src)
- {
- _matched = null;
-
- for (Rule rule: _rules) {
- if (rule.apply(src)) {
- _matched = rule;
- break;
- }
- }
-
- return _matched != null;
- }
-
- public boolean matched()
- {
- return _matched != null;
- }
-
- public String result()
- {
- return _matched != null ? _matched.result() : null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2afe2ec0/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
index c5abbd8..b9fd1de 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
@@ -40,7 +40,6 @@ import org.apache.qpid.proton.reactor.Reactor;
import org.apache.qpid.proton.reactor.impl.ReactorImpl;
import org.apache.qpid.proton.reactor.Selectable;
import org.apache.qpid.proton.reactor.Selectable.Callback;
-import org.apache.qpid.proton.messenger.impl.Address;
@SuppressWarnings("deprecation")
public class AcceptorImpl implements Acceptor {
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2afe2ec0/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Address.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Address.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Address.java
new file mode 100644
index 0000000..619912f
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Address.java
@@ -0,0 +1,219 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton.reactor.impl;
+
+
+/**
+ * Address
+ *
+ */
+public class Address
+{
+
+ private String _address;
+ private boolean _passive;
+ private String _scheme;
+ private String _user;
+ private String _pass;
+ private String _host;
+ private String _port;
+ private String _name;
+
+ public void clear()
+ {
+ _passive = false;
+ _scheme = null;
+ _user = null;
+ _pass = null;
+ _host = null;
+ _port = null;
+ _name = null;
+ }
+
+ /**
+ * @deprecated Messenger will be removed from upcoming proton-j releases.
+ */
+ public Address()
+ {
+ clear();
+ }
+
+ /**
+ * @deprecated Messenger will be removed from upcoming proton-j releases.
+ */
+ public Address(String address)
+ {
+ clear();
+ int start = 0;
+ int schemeEnd = address.indexOf("://", start);
+ if (schemeEnd >= 0) {
+ _scheme = address.substring(start, schemeEnd);
+ start = schemeEnd + 3;
+ }
+
+ String uphp;
+ int slash = address.indexOf("/", start);
+ if (slash >= 0) {
+ uphp = address.substring(start, slash);
+ _name = address.substring(slash + 1);
+ } else {
+ uphp = address.substring(start);
+ }
+
+ String hp;
+ int at = uphp.indexOf('@');
+ if (at >= 0) {
+ String up = uphp.substring(0, at);
+ hp = uphp.substring(at + 1);
+
+ int colon = up.indexOf(':');
+ if (colon >= 0) {
+ _user = up.substring(0, colon);
+ _pass = up.substring(colon + 1);
+ } else {
+ _user = up;
+ }
+ } else {
+ hp = uphp;
+ }
+
+ if (hp.startsWith("[")) {
+ int close = hp.indexOf(']');
+ if (close >= 0) {
+ _host = hp.substring(1, close);
+ if (hp.substring(close + 1).startsWith(":")) {
+ _port = hp.substring(close + 2);
+ }
+ }
+ }
+
+ if (_host == null) {
+ int colon = hp.indexOf(':');
+ if (colon >= 0) {
+ _host = hp.substring(0, colon);
+ _port = hp.substring(colon + 1);
+ } else {
+ _host = hp;
+ }
+ }
+
+ if (_host.startsWith("~")) {
+ _host = _host.substring(1);
+ _passive = true;
+ }
+ }
+
+ public String toString()
+ {
+ String str = new String();
+ if (_scheme != null) str += _scheme + "://";
+ if (_user != null) str += _user;
+ if (_pass != null) str += ":" + _pass;
+ if (_user != null || _pass != null) str += "@";
+ if (_host != null) {
+ if (_host.contains(":")) str += "[" + _host + "]";
+ else str += _host;
+ }
+ if (_port != null) str += ":" + _port;
+ if (_name != null) str += "/" + _name;
+ return str;
+ }
+
+ public boolean isPassive()
+ {
+ return _passive;
+ }
+
+ public String getScheme()
+ {
+ return _scheme;
+ }
+
+ public String getUser()
+ {
+ return _user;
+ }
+
+ public String getPass()
+ {
+ return _pass;
+ }
+
+ public String getHost()
+ {
+ return _host;
+ }
+
+ public String getPort()
+ {
+ return _port;
+ }
+
+ public String getImpliedPort()
+ {
+ if (_port == null) {
+ return getDefaultPort();
+ } else {
+ return getPort();
+ }
+ }
+
+ public String getDefaultPort()
+ {
+ if ("amqps".equals(_scheme)) return "5671";
+ else return "5672";
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public void setScheme(String scheme)
+ {
+ _scheme= scheme;
+ }
+
+ public void setUser(String user)
+ {
+ _user= user;
+ }
+
+ public void setPass(String pass)
+ {
+ _pass= pass;
+ }
+
+ public void setHost(String host)
+ {
+ _host= host;
+ }
+
+ public void setPort(String port)
+ {
+ _port= port;
+ }
+
+ public void setName(String name)
+ {
+ _name= name;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2afe2ec0/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
index 2dd7e1a..1282083 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
@@ -45,7 +45,6 @@ import org.apache.qpid.proton.reactor.Selectable.Callback;
import org.apache.qpid.proton.reactor.Selector;
import org.apache.qpid.proton.reactor.Acceptor;
import org.apache.qpid.proton.reactor.impl.AcceptorImpl;
-import org.apache.qpid.proton.messenger.impl.Address;
@SuppressWarnings("deprecation")
public class IOHandler extends BaseHandler {
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2afe2ec0/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
index 30c8df9..9d38b85 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
@@ -51,7 +51,6 @@ import org.apache.qpid.proton.reactor.Selectable;
import org.apache.qpid.proton.reactor.Selectable.Callback;
import org.apache.qpid.proton.reactor.Selector;
import org.apache.qpid.proton.reactor.Task;
-import org.apache.qpid.proton.messenger.impl.Address;
@SuppressWarnings("deprecation")
public class ReactorImpl implements Reactor, Extendable {
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2afe2ec0/proton-j/src/test/java/org/apache/qpid/proton/messenger/impl/AddressTest.java
----------------------------------------------------------------------
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/messenger/impl/AddressTest.java b/proton-j/src/test/java/org/apache/qpid/proton/messenger/impl/AddressTest.java
deleted file mode 100644
index 77154b6..0000000
--- a/proton-j/src/test/java/org/apache/qpid/proton/messenger/impl/AddressTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.proton.messenger.impl;
-
-import static org.junit.Assert.*;
-
-import org.junit.Test;
-
-public class AddressTest {
-
- @SuppressWarnings("deprecation")
- private void testParse(String url, String scheme, String user, String pass, String host, String port, String name)
- {
- Address address = new Address(url);
- assertEquals(scheme, address.getScheme());
- assertEquals(user, address.getUser());
- assertEquals(pass, address.getPass());
- assertEquals(host, address.getHost());
- assertEquals(port, address.getPort());
- assertEquals(url, address.toString());
- }
-
- @Test
- public void addressTests()
- {
- testParse("host", null, null, null, "host", null, null);
- testParse("host:423", null, null, null, "host", "423", null);
- testParse("user@host", null, "user", null, "host", null, null);
- testParse("user:1243^&^:pw@host:423", null, "user", "1243^&^:pw", "host", "423", null);
- testParse("user:1243^&^:pw@host:423/Foo.bar:90087", null, "user", "1243^&^:pw", "host", "423", "Foo.bar:90087");
- testParse("user:1243^&^:pw@host:423/Foo.bar:90087@somewhere", null, "user", "1243^&^:pw", "host", "423", "Foo.bar:90087@somewhere");
- testParse("[::1]", null, null, null, "::1", null, null);
- testParse("[::1]:amqp", null, null, null, "::1", "amqp", null);
- testParse("user@[::1]", null, "user", null, "::1", null, null);
- testParse("user@[::1]:amqp", null, "user", null, "::1", "amqp", null);
- testParse("user:1243^&^:pw@[::1]:amqp", null, "user", "1243^&^:pw", "::1", "amqp", null);
- testParse("user:1243^&^:pw@[::1]:amqp/Foo.bar:90087", null, "user", "1243^&^:pw", "::1", "amqp", "Foo.bar:90087");
- testParse("user:1243^&^:pw@[::1:amqp/Foo.bar:90087", null, "user", "1243^&^:pw", "[", ":1:amqp", "Foo.bar:90087");
- testParse("user:1243^&^:pw@::1]:amqp/Foo.bar:90087", null, "user", "1243^&^:pw", "", ":1]:amqp", "Foo.bar:90087");
- testParse("amqp://user@[::1]", "amqp", "user", null, "::1", null, null);
- testParse("amqp://user@[::1]:amqp", "amqp", "user", null, "::1", "amqp", null);
- testParse("amqp://user@[1234:52:0:1260:f2de:f1ff:fe59:8f87]:amqp", "amqp", "user", null, "1234:52:0:1260:f2de:f1ff:fe59:8f87", "amqp", null);
- testParse("amqp://user:1243^&^:pw@[::1]:amqp", "amqp", "user", "1243^&^:pw", "::1", "amqp", null);
- testParse("amqp://user:1243^&^:pw@[::1]:amqp/Foo.bar:90087", "amqp", "user", "1243^&^:pw", "::1", "amqp", "Foo.bar:90087");
- testParse("amqp://host", "amqp", null, null, "host", null, null);
- testParse("amqp://user@host", "amqp", "user", null, "host", null, null);
- testParse("amqp://user@host/path:%", "amqp", "user", null, "host", null, "path:%");
- testParse("amqp://user@host:5674/path:%", "amqp", "user", null, "host", "5674", "path:%");
- testParse("amqp://user@host/path:%", "amqp", "user", null, "host", null, "path:%");
- testParse("amqp://bigbird@host/queue@host", "amqp", "bigbird", null, "host", null, "queue@host");
- testParse("amqp://host/queue@host", "amqp", null, null, "host", null, "queue@host");
- testParse("amqp://host:9765/queue@host", "amqp", null, null, "host", "9765", "queue@host");
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org