You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/10/09 19:08:01 UTC
svn commit: r703208 [2/2] - in /incubator/qpid/trunk/qpid/java:
client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/
client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/
client/example/src/main/java/org/apache/qp...
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=703208&r1=703207&r2=703208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java Thu Oct 9 10:07:59 2008
@@ -20,14 +20,22 @@
*/
package org.apache.qpid.transport;
+import org.apache.qpid.transport.network.ConnectionBinding;
+import org.apache.qpid.transport.network.io.IoTransport;
import org.apache.qpid.transport.util.Logger;
+import org.apache.qpid.transport.util.Waiter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.nio.ByteBuffer;
+import java.util.UUID;
+
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslServer;
+
+import static org.apache.qpid.transport.Connection.State.*;
/**
@@ -44,23 +52,164 @@
implements Receiver<ProtocolEvent>, Sender<ProtocolEvent>
{
+ enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD }
+
private static final Logger log = Logger.get(Connection.class);
- final private Sender<ProtocolEvent> sender;
- final private ConnectionDelegate delegate;
+ class DefaultConnectionListener implements ConnectionListener
+ {
+ public void opened(Connection conn) {}
+ public void exception(Connection conn, ConnectionException exception)
+ {
+ throw exception;
+ }
+ public void closed(Connection conn) {}
+ }
+
+ private ConnectionDelegate delegate;
+ private Sender<ProtocolEvent> sender;
+
+ final private Map<Integer,Channel> channels = new HashMap<Integer,Channel>();
+
+ private State state = NEW;
+ private Object lock = new Object();
+ private long timeout = 60000;
+ private ConnectionListener listener = new DefaultConnectionListener();
+ private Throwable error = null;
+
private int channelMax = 1;
+ private String locale;
+ private SaslServer saslServer;
+ private SaslClient saslClient;
+
// want to make this final
private int _connectionId;
- final private Map<Integer,Channel> channels = new HashMap<Integer,Channel>();
+ public Connection() {}
- public Connection(Sender<ProtocolEvent> sender,
- ConnectionDelegate delegate)
+ public void setConnectionDelegate(ConnectionDelegate delegate)
{
- this.sender = sender;
this.delegate = delegate;
}
+ public void setConnectionListener(ConnectionListener listener)
+ {
+ if (listener == null)
+ {
+ this.listener = new DefaultConnectionListener();
+ }
+ else
+ {
+ this.listener = listener;
+ }
+ }
+
+ public Sender<ProtocolEvent> getSender()
+ {
+ return sender;
+ }
+
+ public void setSender(Sender<ProtocolEvent> sender)
+ {
+ this.sender = sender;
+ }
+
+ void setState(State state)
+ {
+ synchronized (lock)
+ {
+ this.state = state;
+ lock.notifyAll();
+ }
+ }
+
+ void setLocale(String locale)
+ {
+ this.locale = locale;
+ }
+
+ String getLocale()
+ {
+ return locale;
+ }
+
+ void setSaslServer(SaslServer saslServer)
+ {
+ this.saslServer = saslServer;
+ }
+
+ SaslServer getSaslServer()
+ {
+ return saslServer;
+ }
+
+ void setSaslClient(SaslClient saslClient)
+ {
+ this.saslClient = saslClient;
+ }
+
+ SaslClient getSaslClient()
+ {
+ return saslClient;
+ }
+
+ public void connect(String host, int port, String vhost, String username, String password)
+ {
+ synchronized (lock)
+ {
+ state = OPENING;
+
+ delegate = new ClientDelegate(vhost, username, password);
+
+ IoTransport.connect(host, port, ConnectionBinding.get(this));
+ send(new ProtocolHeader(1, 0, 10));
+
+ Waiter w = new Waiter(lock, timeout);
+ while (w.hasTime() && state == OPENING && error == null)
+ {
+ w.await();
+ }
+
+ if (error != null)
+ {
+ Throwable t = error;
+ error = null;
+ close();
+ throw new ConnectionException(t);
+ }
+
+ switch (state)
+ {
+ case OPENING:
+ close();
+ throw new ConnectionException("connect() timed out");
+ case OPEN:
+ break;
+ case CLOSED:
+ throw new ConnectionException("connect() aborted");
+ default:
+ throw new IllegalStateException(String.valueOf(state));
+ }
+ }
+
+ listener.opened(this);
+ }
+
+ public Session createSession()
+ {
+ return createSession(0);
+ }
+
+ public Session createSession(long expiryInSeconds)
+ {
+ Channel ch = getChannel();
+ Session ssn = new Session(UUID.randomUUID().toString().getBytes());
+ ssn.attach(ch);
+ ssn.sessionAttach(ssn.getName());
+ ssn.sessionRequestTimeout(expiryInSeconds);
+ return ssn;
+ }
+
public void setConnectionId(int id)
{
_connectionId = id;
@@ -86,7 +235,12 @@
public void send(ProtocolEvent event)
{
log.debug("SEND: [%s] %s", this, event);
- sender.send(event);
+ Sender s = sender;
+ if (s == null)
+ {
+ throw new ConnectionException("connection closed");
+ }
+ s.send(event);
}
public void flush()
@@ -107,7 +261,7 @@
public Channel getChannel()
{
- synchronized (channels)
+ synchronized (lock)
{
for (int i = 0; i < getChannelMax(); i++)
{
@@ -123,7 +277,7 @@
public Channel getChannel(int number)
{
- synchronized (channels)
+ synchronized (lock)
{
Channel channel = channels.get(number);
if (channel == null)
@@ -137,45 +291,146 @@
void removeChannel(int number)
{
- synchronized (channels)
+ synchronized (lock)
{
channels.remove(number);
}
}
+ public void exception(ConnectionException e)
+ {
+ synchronized (lock)
+ {
+ switch (state)
+ {
+ case OPENING:
+ case CLOSING:
+ error = e;
+ lock.notifyAll();
+ break;
+ default:
+ listener.exception(this, e);
+ break;
+ }
+ }
+ }
+
public void exception(Throwable t)
{
- delegate.exception(t);
+ synchronized (lock)
+ {
+ switch (state)
+ {
+ case OPENING:
+ case CLOSING:
+ error = t;
+ lock.notifyAll();
+ break;
+ default:
+ listener.exception(this, new ConnectionException(t));
+ break;
+ }
+ }
}
void closeCode(ConnectionClose close)
{
- synchronized (channels)
+ synchronized (lock)
{
for (Channel ch : channels.values())
{
ch.closeCode(close);
}
+ ConnectionCloseCode code = close.getReplyCode();
+ if (code != ConnectionCloseCode.NORMAL)
+ {
+ exception(new ConnectionException(close));
+ }
}
}
public void closed()
{
log.debug("connection closed: %s", this);
- synchronized (channels)
+
+ if (state == OPEN)
+ {
+ exception(new ConnectionException("connection aborted"));
+ }
+
+ synchronized (lock)
{
List<Channel> values = new ArrayList<Channel>(channels.values());
for (Channel ch : values)
{
ch.closed();
}
+
+ sender = null;
+ setState(CLOSED);
}
- delegate.closed();
+
+ listener.closed(this);
}
public void close()
{
- sender.close();
+ synchronized (lock)
+ {
+ switch (state)
+ {
+ case OPEN:
+ Channel ch = getChannel(0);
+ state = CLOSING;
+ ch.connectionClose(ConnectionCloseCode.NORMAL, null);
+ Waiter w = new Waiter(lock, timeout);
+ while (w.hasTime() && state == CLOSING && error == null)
+ {
+ w.await();
+ }
+
+ if (error != null)
+ {
+ close();
+ throw new ConnectionException(error);
+ }
+
+ switch (state)
+ {
+ case CLOSING:
+ close();
+ throw new ConnectionException("close() timed out");
+ case CLOSED:
+ break;
+ default:
+ throw new IllegalStateException(String.valueOf(state));
+ }
+ break;
+ case CLOSED:
+ break;
+ default:
+ if (sender != null)
+ {
+ sender.close();
+ w = new Waiter(lock, timeout);
+ while (w.hasTime() && sender != null && error == null)
+ {
+ w.await();
+ }
+
+ if (error != null)
+ {
+ throw new ConnectionException(error);
+ }
+
+ if (sender != null)
+ {
+ throw new ConnectionException("close() timed out");
+ }
+ }
+ break;
+ }
+ }
}
public String toString()
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java?rev=703208&r1=703207&r2=703208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java Thu Oct 9 10:07:59 2008
@@ -22,22 +22,7 @@
import org.apache.qpid.transport.util.Logger;
-import org.apache.qpid.SecurityHelper;
-import org.apache.qpid.QpidException;
-
-import java.io.UnsupportedEncodingException;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
+import static org.apache.qpid.transport.Connection.State.*;
/**
@@ -57,231 +42,26 @@
private static final Logger log = Logger.get(ConnectionDelegate.class);
- private String _username = "guest";
- private String _password = "guest";;
- private String _mechanism;
- private String _virtualHost;
- private SaslClient saslClient;
- private SaslServer saslServer;
- private String _locale = "utf8";
- private int maxFrame = 64*1024;
- private Condition _negotiationComplete;
- private Lock _negotiationCompleteLock;
-
- public abstract SessionDelegate getSessionDelegate();
-
- public abstract void exception(Throwable t);
-
- public abstract void closed();
-
- public void setCondition(Lock negotiationCompleteLock,Condition negotiationComplete)
- {
- _negotiationComplete = negotiationComplete;
- _negotiationCompleteLock = negotiationCompleteLock;
- }
-
- public void init(Channel ch, ProtocolHeader hdr)
- {
- ch.getConnection().send(new ProtocolHeader (1, hdr.getMajor(), hdr.getMinor()));
- List<Object> plain = new ArrayList<Object>();
- plain.add("PLAIN");
- List<Object> utf8 = new ArrayList<Object>();
- utf8.add("utf8");
- ch.connectionStart(null, plain, utf8);
- }
-
- // ----------------------------------------------
- // Client side
- //-----------------------------------------------
- @Override public void connectionStart(Channel context, ConnectionStart struct)
- {
- String mechanism = null;
- byte[] response = null;
- try
- {
- mechanism = SecurityHelper.chooseMechanism(struct.getMechanisms());
- saslClient = Sasl.createSaslClient(new String[]{ mechanism },null, "AMQP", "localhost", null,
- SecurityHelper.createCallbackHandler(mechanism,_username,_password ));
- response = saslClient.evaluateChallenge(new byte[0]);
- }
- catch (UnsupportedEncodingException e)
- {
- // need error handling
- }
- catch (SaslException e)
- {
- // need error handling
- }
- catch (QpidException e)
- {
- // need error handling
- }
-
- Map<String,Object> props = new HashMap<String,Object>();
- context.connectionStartOk(props, mechanism, response, _locale);
- }
-
- @Override public void connectionSecure(Channel context, ConnectionSecure struct)
- {
- try
- {
- byte[] response = saslClient.evaluateChallenge(struct.getChallenge());
- context.connectionSecureOk(response);
- }
- catch (SaslException e)
- {
- // need error handling
- }
- }
-
- @Override public void connectionTune(Channel context, ConnectionTune struct)
- {
- context.getConnection().setChannelMax(struct.getChannelMax());
- context.connectionTuneOk(struct.getChannelMax(), struct.getMaxFrameSize(), struct.getHeartbeatMax());
- context.connectionOpen(_virtualHost, null, Option.INSIST);
- }
-
-
- @Override public void connectionOpenOk(Channel context, ConnectionOpenOk struct)
- {
- List<Object> knownHosts = struct.getKnownHosts();
- if(_negotiationCompleteLock != null)
- {
- _negotiationCompleteLock.lock();
- try
- {
- _negotiationComplete.signalAll();
- }
- finally
- {
- _negotiationCompleteLock.unlock();
- }
- }
- }
-
- public void connectionRedirect(Channel context, ConnectionRedirect struct)
- {
- // not going to bother at the moment
- }
-
- // ----------------------------------------------
- // Server side
- //-----------------------------------------------
- @Override public void connectionStartOk(Channel context, ConnectionStartOk struct)
- {
- //set the client side locale on the server side
- _locale = struct.getLocale();
- _mechanism = struct.getMechanism();
-
- //try
- //{
- //saslServer = Sasl.createSaslServer(_mechanism, "AMQP", "ABC",null,SecurityHelper.createCallbackHandler(_mechanism,_username,_password));
- //byte[] challenge = saslServer.evaluateResponse(struct.getResponse().getBytes());
- byte[] challenge = null;
- if ( challenge == null)
- {
- context.connectionTune(Integer.MAX_VALUE, maxFrame, 0, Integer.MAX_VALUE);
- }
- else
- {
- try
- {
- context.connectionSecure(challenge);
- }
- catch(Exception e)
- {
-
- }
- }
-
-
- /*}
- catch (SaslException e)
- {
- // need error handling
- }
- catch (QpidException e)
- {
- // need error handling
- }*/
- }
-
- @Override public void connectionSecureOk(Channel context, ConnectionSecureOk struct)
+ public SessionDelegate getSessionDelegate()
{
- try
- {
- saslServer = Sasl.createSaslServer(_mechanism, "AMQP", "ABC",new HashMap(),SecurityHelper.createCallbackHandler(_mechanism,_username,_password));
- byte[] challenge = saslServer.evaluateResponse(struct.getResponse());
- if ( challenge == null)
- {
- context.connectionTune(Integer.MAX_VALUE, maxFrame, 0, Integer.MAX_VALUE);
- }
- else
- {
- try
- {
- context.connectionSecure(challenge);
- }
- catch(Exception e)
- {
-
- }
- }
-
-
- }
- catch (SaslException e)
- {
- // need error handling
- }
- catch (QpidException e)
- {
- // need error handling
- }
+ return new SessionDelegate();
}
-
- @Override public void connectionOpen(Channel context, ConnectionOpen struct)
- {
- List<Object> hosts = new ArrayList<Object>();
- hosts.add("amqp:1223243232325");
- context.connectionOpenOk(hosts);
- }
+ public abstract void init(Channel ch, ProtocolHeader hdr);
@Override public void connectionClose(Channel ch, ConnectionClose close)
{
- ch.getConnection().closeCode(close);
+ Connection conn = ch.getConnection();
ch.connectionCloseOk();
+ conn.getSender().close();
+ conn.closeCode(close);
+ conn.setState(CLOSE_RCVD);
}
- public String getPassword()
- {
- return _password;
- }
-
- public void setPassword(String password)
- {
- _password = password;
- }
-
- public String getUsername()
- {
- return _username;
- }
-
- public void setUsername(String username)
- {
- _username = username;
- }
-
- public String getVirtualHost()
- {
- return _virtualHost;
- }
-
- public void setVirtualHost(String host)
+ @Override public void connectionCloseOk(Channel ch, ConnectionCloseOk ok)
{
- _virtualHost = host;
+ Connection conn = ch.getConnection();
+ conn.getSender().close();
}
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionException.java?rev=703208&r1=703207&r2=703208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionException.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionException.java Thu Oct 9 10:07:59 2008
@@ -26,17 +26,37 @@
*
*/
-public class ConnectionException extends RuntimeException
+public class ConnectionException extends TransportException
{
private ConnectionClose close;
- public ConnectionException(ConnectionClose close)
+ public ConnectionException(String message, ConnectionClose close, Throwable cause)
{
- super(close.getReplyText());
+ super(message, cause);
this.close = close;
}
+ public ConnectionException(String message)
+ {
+ this(message, null, null);
+ }
+
+ public ConnectionException(String message, Throwable cause)
+ {
+ this(message, null, cause);
+ }
+
+ public ConnectionException(Throwable cause)
+ {
+ this(cause.getMessage(), null, cause);
+ }
+
+ public ConnectionException(ConnectionClose close)
+ {
+ this(close.getReplyText(), close, null);
+ }
+
public ConnectionClose getClose()
{
return close;
Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionListener.java?rev=703208&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionListener.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionListener.java Thu Oct 9 10:07:59 2008
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+
+/**
+ * ConnectionListener
+ *
+ */
+
+public interface ConnectionListener
+{
+
+ void opened(Connection connection);
+
+ void exception(Connection connection, ConnectionException exception);
+
+ void closed(Connection connection);
+
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionListener.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java?rev=703208&r1=703207&r2=703208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java Thu Oct 9 10:07:59 2008
@@ -43,25 +43,16 @@
public static final void main(String[] args) throws IOException
{
- ConnectionDelegate delegate = new ConnectionDelegate()
+ ConnectionDelegate delegate = new ServerDelegate()
{
public SessionDelegate getSessionDelegate()
{
return new Echo();
}
- public void exception(Throwable t)
- {
- t.printStackTrace();
- }
- public void closed() {}
};
- //hack
- delegate.setUsername("guest");
- delegate.setPassword("guest");
-
IoAcceptor ioa = new IoAcceptor
- ("0.0.0.0", 5672, new ConnectionBinding(delegate));
+ ("0.0.0.0", 5672, ConnectionBinding.get(delegate));
ioa.start();
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolVersionException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolVersionException.java?rev=703208&r1=703207&r2=703208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolVersionException.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolVersionException.java Thu Oct 9 10:07:59 2008
@@ -26,7 +26,7 @@
*
*/
-public final class ProtocolVersionException extends TransportException
+public final class ProtocolVersionException extends ConnectionException
{
private final byte major;
Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java?rev=703208&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java Thu Oct 9 10:07:59 2008
@@ -0,0 +1,145 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+import java.util.Collections;
+
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+
+import java.io.UnsupportedEncodingException;
+
+import org.apache.qpid.QpidException;
+
+import org.apache.qpid.SecurityHelper;
+
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+
+import static org.apache.qpid.transport.Connection.State.*;
+
+
+/**
+ * ServerDelegate
+ *
+ */
+
+public class ServerDelegate extends ConnectionDelegate
+{
+
+ private SaslServer saslServer;
+
+ public void init(Channel ch, ProtocolHeader hdr)
+ {
+ Connection conn = ch.getConnection();
+ conn.send(new ProtocolHeader(1, 0, 10));
+ List<Object> utf8 = new ArrayList<Object>();
+ utf8.add("utf8");
+ ch.connectionStart(null, Collections.EMPTY_LIST, utf8);
+ }
+
+ @Override public void connectionStartOk(Channel ch, ConnectionStartOk ok)
+ {
+ Connection conn = ch.getConnection();
+ conn.setLocale(ok.getLocale());
+ String mechanism = ok.getMechanism();
+
+ if (mechanism == null || mechanism.length() == 0)
+ {
+ ch.connectionTune
+ (Integer.MAX_VALUE,
+ org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
+ 0, Integer.MAX_VALUE);
+ return;
+ }
+
+ try
+ {
+ SaslServer ss = Sasl.createSaslServer
+ (mechanism, "AMQP", "localhost", null, null);
+ if (ss == null)
+ {
+ ch.connectionClose
+ (ConnectionCloseCode.CONNECTION_FORCED,
+ "null SASL mechanism: " + mechanism);
+ return;
+ }
+ conn.setSaslServer(ss);
+ secure(ch, ok.getResponse());
+ }
+ catch (SaslException e)
+ {
+ conn.exception(e);
+ }
+ }
+
+ private void secure(Channel ch, byte[] response)
+ {
+ Connection conn = ch.getConnection();
+ SaslServer ss = conn.getSaslServer();
+ try
+ {
+ byte[] challenge = ss.evaluateResponse(response);
+ if (ss.isComplete())
+ {
+ ss.dispose();
+ ch.connectionTune
+ (Integer.MAX_VALUE,
+ org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
+ 0, Integer.MAX_VALUE);
+ }
+ else
+ {
+ ch.connectionSecure(challenge);
+ }
+ }
+ catch (SaslException e)
+ {
+ conn.exception(e);
+ }
+ }
+
+ @Override public void connectionSecureOk(Channel ch, ConnectionSecureOk ok)
+ {
+ secure(ch, ok.getResponse());
+ }
+
+ @Override public void connectionTuneOk(Channel ch, ConnectionTuneOk ok)
+ {
+
+ }
+
+ @Override public void connectionOpen(Channel ch, ConnectionOpen open)
+ {
+ Connection conn = ch.getConnection();
+ ch.connectionOpenOk(Collections.EMPTY_LIST);
+ conn.setState(OPEN);
+ }
+
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=703208&r1=703207&r2=703208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java Thu Oct 9 10:07:59 2008
@@ -49,6 +49,26 @@
private static final Logger log = Logger.get(Session.class);
+ class DefaultSessionListener implements SessionListener
+ {
+
+ public void opened(Session ssn) {}
+
+ public void message(Session ssn, MessageTransfer xfr)
+ {
+ log.info("message: %s", xfr);
+ }
+
+ public void exception(Session ssn, SessionException exc)
+ {
+ throw exc;
+ }
+
+ public void closed(Session ssn) {}
+ }
+
+ public static final int UNLIMITED_CREDIT = 0xFFFFFFFF;
+
private static boolean ENABLE_REPLAY = false;
static
@@ -65,6 +85,7 @@
}
private byte[] name;
+ private SessionListener listener = new DefaultSessionListener();
private long timeout = 60000;
private boolean autoSync = false;
@@ -97,6 +118,23 @@
return name;
}
+ public void setSessionListener(SessionListener listener)
+ {
+ if (listener == null)
+ {
+ this.listener = new DefaultSessionListener();
+ }
+ else
+ {
+ this.listener = listener;
+ }
+ }
+
+ public SessionListener getSessionListener()
+ {
+ return listener;
+ }
+
public void setAutoSync(boolean value)
{
synchronized (commands)
@@ -270,8 +308,8 @@
{
if (closed.get())
{
- List<ExecutionException> exc = getExceptions();
- if (!exc.isEmpty())
+ ExecutionException exc = getException();
+ if (exc != null)
{
throw new SessionException(exc);
}
@@ -361,7 +399,7 @@
{
if (closed.get())
{
- throw new SessionException(getExceptions());
+ throw new SessionException(getException());
}
else
{
@@ -375,8 +413,7 @@
private Map<Integer,ResultFuture<?>> results =
new HashMap<Integer,ResultFuture<?>>();
- private List<ExecutionException> exceptions =
- new ArrayList<ExecutionException>();
+ private ExecutionException exception = null;
void result(int command, Struct result)
{
@@ -388,11 +425,17 @@
future.set(result);
}
- void addException(ExecutionException exc)
+ void setException(ExecutionException exc)
{
- synchronized (exceptions)
+ synchronized (results)
{
- exceptions.add(exc);
+ if (exception != null)
+ {
+ throw new IllegalStateException
+ (String.format
+ ("too many exceptions: %s, %s", exception, exc));
+ }
+ exception = exc;
}
}
@@ -403,11 +446,11 @@
this.close = close;
}
- List<ExecutionException> getExceptions()
+ ExecutionException getException()
{
- synchronized (exceptions)
+ synchronized (results)
{
- return new ArrayList<ExecutionException>(exceptions);
+ return exception;
}
}
@@ -473,7 +516,7 @@
}
else if (closed.get())
{
- throw new SessionException(getExceptions());
+ throw new SessionException(getException());
}
else
{
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java?rev=703208&r1=703207&r2=703208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java Thu Oct 9 10:07:59 2008
@@ -33,7 +33,7 @@
public SessionClosedException()
{
- super(Collections.EMPTY_LIST);
+ super(null);
}
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java?rev=703208&r1=703207&r2=703208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java Thu Oct 9 10:07:59 2008
@@ -29,7 +29,7 @@
* @author Rafael H. Schloming
*/
-public abstract class SessionDelegate
+public class SessionDelegate
extends MethodDelegate<Session>
implements ProtocolDelegate<Session>
{
@@ -57,7 +57,7 @@
@Override public void executionException(Session ssn, ExecutionException exc)
{
- ssn.addException(exc);
+ ssn.setException(exc);
}
@Override public void sessionCompleted(Session ssn, SessionCompleted cmp)
@@ -122,4 +122,9 @@
ssn.syncPoint();
}
+ @Override public void messageTransfer(Session ssn, MessageTransfer xfr)
+ {
+ ssn.getSessionListener().message(ssn, xfr);
+ }
+
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionException.java?rev=703208&r1=703207&r2=703208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionException.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionException.java Thu Oct 9 10:07:59 2008
@@ -27,20 +27,20 @@
*
*/
-public class SessionException extends RuntimeException
+public class SessionException extends TransportException
{
- private List<ExecutionException> exceptions;
+ private ExecutionException exception;
- public SessionException(List<ExecutionException> exceptions)
+ public SessionException(ExecutionException exception)
{
- super(exceptions.isEmpty() ? "" : exceptions.toString());
- this.exceptions = exceptions;
+ super(String.valueOf(exception));
+ this.exception = exception;
}
- public List<ExecutionException> getExceptions()
+ public ExecutionException getException()
{
- return exceptions;
+ return exception;
}
}
Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java?rev=703208&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java Thu Oct 9 10:07:59 2008
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+
+/**
+ * SessionListener
+ *
+ */
+
+public interface SessionListener
+{
+
+ void opened(Session session);
+
+ void message(Session ssn, MessageTransfer xfr);
+
+ void exception(Session session, SessionException exception);
+
+ void closed(Session session);
+
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionListener.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java?rev=703208&r1=703207&r2=703208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java Thu Oct 9 10:07:59 2008
@@ -103,28 +103,17 @@
public static final void main(String[] args) throws IOException
{
- ConnectionDelegate delegate = new ConnectionDelegate()
+ ConnectionDelegate delegate = new ServerDelegate()
{
public SessionDelegate getSessionDelegate()
{
return new Sink();
}
-
- public void exception(Throwable t)
- {
- t.printStackTrace();
- }
-
- public void closed() {}
};
- //hack
- delegate.setUsername("guest");
- delegate.setPassword("guest");
-
IoAcceptor ioa = new IoAcceptor
- ("0.0.0.0", 5672, new ConnectionBinding(delegate));
+ ("0.0.0.0", 5672, ConnectionBinding.get(delegate));
System.out.println
(String.format
(FORMAT_HDR, "Session", "Count/MBytes", "Cumulative Rate", "Interval Rate"));
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java?rev=703208&r1=703207&r2=703208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java Thu Oct 9 10:07:59 2008
@@ -33,23 +33,46 @@
*
*/
-public class ConnectionBinding implements Binding<Connection,ByteBuffer>
+public abstract class ConnectionBinding
+ implements Binding<Connection,ByteBuffer>
{
- private static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
-
- private final ConnectionDelegate delegate;
+ public static Binding<Connection,ByteBuffer> get(final Connection connection)
+ {
+ return new ConnectionBinding()
+ {
+ public Connection connection()
+ {
+ return connection;
+ }
+ };
+ }
- public ConnectionBinding(ConnectionDelegate delegate)
+ public static Binding<Connection,ByteBuffer> get(final ConnectionDelegate delegate)
{
- this.delegate = delegate;
+ return new ConnectionBinding()
+ {
+ public Connection connection()
+ {
+ Connection conn = new Connection();
+ conn.setConnectionDelegate(delegate);
+ return conn;
+ }
+ };
}
+ public static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
+
+ public abstract Connection connection();
+
public Connection endpoint(Sender<ByteBuffer> sender)
{
+ Connection conn = connection();
+
// XXX: hardcoded max-frame
- return new Connection
- (new Disassembler(sender, MAX_FRAME_SIZE), delegate);
+ Disassembler dis = new Disassembler(sender, MAX_FRAME_SIZE);
+ conn.setSender(dis);
+ return conn;
}
public Receiver<ByteBuffer> receiver(Connection conn)
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java?rev=703208&r1=703207&r2=703208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java Thu Oct 9 10:07:59 2008
@@ -61,7 +61,7 @@
start();
}
- void close()
+ void close(boolean block)
{
if (!closed.getAndSet(true))
{
@@ -75,7 +75,7 @@
{
socket.shutdownInput();
}
- if (Thread.currentThread() != this)
+ if (block && Thread.currentThread() != this)
{
join(timeout);
if (isAlive())
@@ -121,6 +121,7 @@
}
}
}
+ socket.close();
}
catch (Throwable t)
{
@@ -129,7 +130,6 @@
finally
{
receiver.closed();
- transport.getSender().close();
}
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java?rev=703208&r1=703207&r2=703208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java Thu Oct 9 10:07:59 2008
@@ -196,17 +196,12 @@
throw new TransportException("join timed out");
}
}
- transport.getReceiver().close();
- socket.close();
+ transport.getReceiver().close(false);
}
catch (InterruptedException e)
{
throw new TransportException(e);
}
- catch (IOException e)
- {
- throw new TransportException(e);
- }
if (reportException && exception != null)
{
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java?rev=703208&r1=703207&r2=703208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java Thu Oct 9 10:07:59 2008
@@ -108,7 +108,7 @@
public static final Connection connect(String host, int port,
ConnectionDelegate delegate)
{
- return connect(host, port, new ConnectionBinding(delegate));
+ return connect(host, port, ConnectionBinding.get(delegate));
}
public static void connect_0_9(AMQVersionAwareProtocolSession session, String host, int port)
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java?rev=703208&r1=703207&r2=703208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java Thu Oct 9 10:07:59 2008
@@ -262,13 +262,13 @@
ConnectionDelegate delegate)
throws IOException
{
- accept(host, port, new ConnectionBinding(delegate));
+ accept(host, port, ConnectionBinding.get(delegate));
}
public static final Connection connect(String host, int port,
ConnectionDelegate delegate)
{
- return connect(host, port, new ConnectionBinding(delegate));
+ return connect(host, port, ConnectionBinding.get(delegate));
}
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java?rev=703208&r1=703207&r2=703208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java Thu Oct 9 10:07:59 2008
@@ -87,8 +87,9 @@
}
NioSender sender = new NioSender(_ch);
- Connection con = new Connection
- (new Disassembler(sender, 64*1024 - 1), delegate);
+ Connection con = new Connection();
+ con.setSender(new Disassembler(sender, 64*1024 - 1));
+ con.setConnectionDelegate(delegate);
con.setConnectionId(_count.incrementAndGet());
_handlers.put(con.getConnectionId(),sender);
Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Waiter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Waiter.java?rev=703208&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Waiter.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Waiter.java Thu Oct 9 10:07:59 2008
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.util;
+
+
+/**
+ * Waiter
+ *
+ */
+
+public final class Waiter
+{
+
+ private final Object lock;
+ private final long timeout;
+ private final long start;
+ private long elapsed;
+
+ public Waiter(Object lock, long timeout)
+ {
+ this.lock = lock;
+ this.timeout = timeout;
+ this.start = System.currentTimeMillis();
+ this.elapsed = 0;
+ }
+
+ public boolean hasTime()
+ {
+ return elapsed < timeout;
+ }
+
+ public void await()
+ {
+ try
+ {
+ lock.wait(timeout - elapsed);
+ }
+ catch (InterruptedException e)
+ {
+ // pass
+ }
+ elapsed = System.currentTimeMillis() - start;
+ }
+
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Waiter.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java?rev=703208&r1=703207&r2=703208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java Thu Oct 9 10:07:59 2008
@@ -50,38 +50,30 @@
port = AvailablePortFinder.getNextAvailable(12000);
- ConnectionDelegate server = new ConnectionDelegate() {
- public void init(Channel ch, ProtocolHeader hdr) {
+ ConnectionDelegate server = new ServerDelegate() {
+ @Override public void connectionOpen(Channel ch, ConnectionOpen open)
+ {
+ super.connectionOpen(ch, open);
ch.getConnection().close();
}
-
- public SessionDelegate getSessionDelegate() {
- return new SessionDelegate() {};
- }
- public void exception(Throwable t) {
- log.error(t, "exception caught");
- }
- public void closed() {}
};
IoAcceptor ioa = new IoAcceptor
- ("localhost", port, new ConnectionBinding(server));
+ ("localhost", port, ConnectionBinding.get(server));
ioa.start();
}
private Connection connect(final Condition closed)
{
- Connection conn = IoTransport.connect("localhost", port, new ConnectionDelegate()
+ Connection conn = new Connection();
+ conn.setConnectionListener(new ConnectionListener()
{
- public SessionDelegate getSessionDelegate()
+ public void opened(Connection conn) {}
+ public void exception(Connection conn, ConnectionException exc)
{
- return new SessionDelegate() {};
+ exc.printStackTrace();
}
- public void exception(Throwable t)
- {
- t.printStackTrace();
- }
- public void closed()
+ public void closed(Connection conn)
{
if (closed != null)
{
@@ -89,8 +81,7 @@
}
}
});
-
- conn.send(new ProtocolHeader(1, 0, 10));
+ conn.connect("localhost", port, null, "guest", "guest");
return conn;
}
Modified: incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java?rev=703208&r1=703207&r2=703208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java (original)
+++ incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/configuration/QpidDatasource.java Thu Oct 9 10:07:59 2008
@@ -28,140 +28,95 @@
import org.apache.commons.pool.ObjectPool;
import org.apache.commons.pool.impl.GenericObjectPool;
import org.apache.commons.pool.impl.GenericObjectPoolFactory;
-import org.apache.qpid.ErrorCode;
-import org.apache.qpid.QpidException;
-import org.apache.qpid.nclient.Client;
-import org.apache.qpid.nclient.ClosedListener;
-import org.apache.qpid.nclient.Connection;
-import org.apache.qpid.nclient.DtxSession;
-import org.apache.qpid.nclient.Session;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionException;
import org.apache.qpid.transport.util.Logger;
/**
* Qpid datasource.
- * Basically it is a connection pool manager used for optimizing broker connections usage.
- *
+ * Basically it is a connection pool manager used for optimizing broker connections usage.
+ *
* @author Andrea Gazzarini
*/
-public final class QpidDatasource
+public final class QpidDatasource
{
private final static Logger LOGGER = Logger.get(QpidDatasource.class);
-
+
/**
* A connection decorator used for adding pool interaction behaviour to an existing connection.
- *
+ *
* @author Andrea Gazzarini
*/
- public class ConnectionDecorator implements Connection,ClosedListener
+ class PooledConnection extends Connection
{
- private final Connection _decoratee;
private final UUID _brokerId;
private boolean _valid;
-
+
/**
* Builds a new decorator with the given connection.
- *
+ *
* @param brokerId the broker identifier.
* @param decoratee the underlying connection.
*/
- private ConnectionDecorator(UUID brokerId, Connection decoratee)
+ private PooledConnection(UUID brokerId)
{
- this._decoratee = decoratee;
this._brokerId = brokerId;
- _decoratee.setClosedListener(this);
_valid = true;
}
-
+
/**
* Returns true if the underlying connection is still valid and can be used.
- *
+ *
* @return true if the underlying connection is still valid and can be used.
*/
boolean isValid()
{
return _valid;
}
-
+
+ void reallyClose()
+ {
+ super.close();
+ }
+
/**
* Returns the connection to the pool. That is, marks this connections as available.
* After that, this connection will be available for further operations.
*/
- public void close () throws QpidException
+ public void close()
{
try
{
pools.get(_brokerId).returnObject(this);
LOGGER.debug("<QMAN-200012> : Connection %s returned to the pool.", this);
- } catch (Exception exception)
+ }
+ catch (Exception e)
{
- throw new QpidException("Error while closing connection.",ErrorCode.CONNECTION_ERROR,exception);
- }
- }
-
- /**
- * Do nothing : underlying connection is already connected.
- */
- public void connect (String host, int port, String virtualHost, String username, String password)
- throws QpidException
- {
- // DO NOTHING : DECORATEE CONNECTION IS ALREADY CONNECTED.
- }
-
- /**
- * Do nothing : underlying connection is already connected.
- */
- public void connect (String url) throws QpidException
- {
- // DO NOTHING : DECORATEE CONNECTION IS ALREADY CONNECTED.
+ throw new ConnectionException(e);
+ }
}
- /**
- * @see Connection#createDTXSession(int)
- */
- public DtxSession createDTXSession (int expiryInSeconds)
+ public void exception(Throwable t)
{
- return _decoratee.createDTXSession(expiryInSeconds);
- }
-
- /**
- * @see Connection#createSession(long)
- */
- public Session createSession (long expiryInSeconds)
- {
- return _decoratee.createSession(expiryInSeconds);
+ super.exception(t);
+ _valid = false;
}
+ }
- /**
- * Do nothing : closed listener has been already injected.
- */
- public void setClosedListener (ClosedListener exceptionListner)
- {
- }
-
- /**
- * Callback method used for error notifications while underlying connection is closing.
- */
- public void onClosed (ErrorCode errorCode, String reason, Throwable t)
- {
- _valid = false;
- LOGGER.error(t,"<QMAN-100012> : Error on closing connection. Reason is : %s, error code is %s",reason,errorCode.getCode());
- }
- };
-
/**
- * This is the connection factory, that is, the factory used to manage the lifecycle (create, validate & destroy) of
+ * This is the connection factory, that is, the factory used to manage the lifecycle (create, validate & destroy) of
* the broker connection(s).
- *
+ *
* @author Andrea Gazzarini
*/
class QpidConnectionFactory extends BasePoolableObjectFactory
- {
+ {
private final BrokerConnectionData _connectionData;
private final UUID _brokerId;
-
+
/**
* Builds a new connection factory with the given parameters.
- *
+ *
* @param brokerId the broker identifier.
* @param connectionData the connecton data.
*/
@@ -170,35 +125,35 @@
this._connectionData = connectionData;
this._brokerId = brokerId;
}
-
+
/**
* Creates a new underlying connection.
*/
@Override
public Connection makeObject () throws Exception
{
- Connection connection = Client.createConnection();
+ PooledConnection connection = new PooledConnection(_brokerId);
connection.connect(
- _connectionData.getHost(),
- _connectionData.getPort(),
- _connectionData.getVirtualHost(),
- _connectionData.getUsername(),
+ _connectionData.getHost(),
+ _connectionData.getPort(),
+ _connectionData.getVirtualHost(),
+ _connectionData.getUsername(),
_connectionData.getPassword());
- return new ConnectionDecorator(_brokerId,connection);
+ return connection;
}
-
+
/**
* Validates the underlying connection.
*/
@Override
public boolean validateObject (Object obj)
{
- ConnectionDecorator connection = (ConnectionDecorator) obj;
+ PooledConnection connection = (PooledConnection) obj;
boolean isValid = connection.isValid();
LOGGER.debug("<QMAN-200013> : Test connection on reserve. Is valid? %s",isValid);
return isValid;
}
-
+
/**
* Closes the underlying connection.
*/
@@ -207,8 +162,8 @@
{
try
{
- ConnectionDecorator connection = (ConnectionDecorator) obj;
- connection._decoratee.close();
+ PooledConnection connection = (PooledConnection) obj;
+ connection.reallyClose();
LOGGER.debug("<QMAN-200014> : Connection has been destroyed.");
} catch (Exception e)
{
@@ -216,21 +171,21 @@
}
}
}
-
+
// Singleton instance.
private static QpidDatasource instance = new QpidDatasource();
// Each entry contains a connection pool for a specific broker.
private Map<UUID, ObjectPool> pools = new HashMap<UUID, ObjectPool>();
-
+
// Private constructor.
private QpidDatasource()
{
}
-
+
/**
* Gets an available connection from the pool of the given broker.
- *
+ *
* @param brokerId the broker identifier.
* @return a valid connection to the broker associated with the given identifier.
*/
@@ -238,20 +193,20 @@
{
return (Connection) pools.get(brokerId).borrowObject();
}
-
+
/**
* Entry point method for retrieving the singleton instance of this datasource.
- *
+ *
* @return the qpid datasource singleton instance.
*/
- public static QpidDatasource getInstance()
+ public static QpidDatasource getInstance()
{
return instance;
}
-
+
/**
* Adds a connection pool to this datasource.
- *
+ *
* @param brokerId the broker identifier that will be associated with the new connection pool.
* @param connectionData the broker connection data.
* @throws Exception when the pool cannot be created.
@@ -265,12 +220,12 @@
true,
false);
ObjectPool pool = factory.createPool();
-
+
for (int i = 0; i < connectionData.getInitialPoolCapacity(); i++)
{
pool.returnObject(pool.borrowObject());
}
-
+
pools.put(brokerId,pool);
}
}
\ No newline at end of file
Modified: incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java?rev=703208&r1=703207&r2=703208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java (original)
+++ incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management/domain/services/QpidService.java Thu Oct 9 10:07:59 2008
@@ -22,42 +22,47 @@
import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.qpid.QpidException;
import org.apache.qpid.management.Constants;
import org.apache.qpid.management.Names;
import org.apache.qpid.management.configuration.Configuration;
import org.apache.qpid.management.configuration.QpidDatasource;
-import org.apache.qpid.nclient.Connection;
-import org.apache.qpid.nclient.Session;
import org.apache.qpid.nclient.util.MessageListener;
import org.apache.qpid.nclient.util.MessagePartListenerAdapter;
+import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquireMode;
import org.apache.qpid.transport.MessageCreditUnit;
+import org.apache.qpid.transport.MessageTransfer;
import org.apache.qpid.transport.Option;
+import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.SessionListener;
import org.apache.qpid.transport.util.Logger;
/**
* Qpid Broker facade.
- *
+ *
* @author Andrea Gazzarini
*/
-public class QpidService
+public class QpidService implements SessionListener
{
private final static Logger LOGGER = Logger.get(QpidService.class);
// Inner static class used for logging and avoid conditional logic (isDebugEnabled()) duplication.
- private static class Log
- {
+ private static class Log
+ {
/**
* Logs the content f the message.
* This will be written on log only if DEBUG level is enabled.
- *
+ *
* @param messageContent the raw content of the message.
*/
- static void logMessageContent(byte [] messageContent)
+ static void logMessageContent(byte [] messageContent)
{
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
@@ -65,56 +70,81 @@
Arrays.toString(messageContent));
}
}
-
+
/**
* Logs the content f the message.
* This will be written on log only if DEBUG level is enabled.
- *
+ *
* @param messageContent the raw content of the message.
*/
- static void logMessageContent(ByteBuffer messageContent)
+ static void logMessageContent(ByteBuffer messageContent)
{
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"<QMAN-200002> : Message has been sent to management exchange.");
}
- }
+ }
}
-
+
private UUID _brokerId;
private Connection _connection;
private Session _session;
-
+ private Map<String,MessagePartListenerAdapter> _listeners;
+
/**
* Builds a new service with the given connection data.
- *
+ *
* @param connectionData the connection data of the broker.
*/
- public QpidService(UUID brokerId)
+ public QpidService(UUID brokerId)
{
this._brokerId = brokerId;
}
-
+
/**
* Estabilishes a connection with the broker.
- *
+ *
* @throws QpidException in case of connection failure.
*/
public void connect() throws Exception
{
_connection = QpidDatasource.getInstance().getConnection(_brokerId);
+ _listeners = new ConcurrentHashMap<String,MessagePartListenerAdapter>();
_session = _connection.createSession(Constants.NO_EXPIRATION);
+ _session.setSessionListener(this);
}
-
+
+ public void opened(Session ssn) {}
+
+ public void message(Session ssn, MessageTransfer xfr)
+ {
+ MessagePartListenerAdapter l = _listeners.get(xfr.getDestination());
+ if (l == null)
+ {
+ LOGGER.error("unhandled message: %s", xfr);
+ }
+ else
+ {
+ l.messageTransfer(xfr);
+ }
+ }
+
+ public void exception(Session ssn, SessionException exc)
+ {
+ LOGGER.error(exc, "session %s exception", ssn);
+ }
+
+ public void closed(Session ssn) {}
+
/**
- * All the previously entered outstanding commands are asynchronous.
+ * All the previously entered outstanding commands are asynchronous.
* Synchronous behavior is achieved through invoking this method.
*/
- public void sync()
+ public void sync()
{
_session.sync();
}
-
+
/**
* Closes communication with broker.
*/
@@ -124,48 +154,50 @@
{
_session.close();
_session = null;
+ _listeners = null;
} catch (Exception e)
{
}
try
{
- _connection.close();
+ _connection.close();
_connection = null;
} catch (Exception e)
{
}
}
-
+
/**
* Associate a message listener with a destination therefore creating a new subscription.
- *
+ *
* @param queueName The name of the queue that the subscriber is receiving messages from.
* @param destinationName the name of the destination, or delivery tag, for the subscriber.
- * @param listener the listener for this destination.
- *
+ * @param listener the listener for this destination.
+ *
* @see Session#messageSubscribe(String, String, short, short, org.apache.qpid.nclient.MessagePartListener, java.util.Map, org.apache.qpid.transport.Option...)
*/
- public void createSubscription(String queueName, String destinationName,MessageListener listener)
+ public void createSubscription(String queueName, String destinationName, MessageListener listener)
{
- _session.messageSubscribe(
- queueName,
- destinationName,
- Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
- Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
- new MessagePartListenerAdapter(listener), null);
-
- _session.messageFlow(destinationName, MessageCreditUnit.BYTE, Session.MESSAGE_FLOW_MAX_BYTES);
- _session.messageFlow(destinationName, MessageCreditUnit.MESSAGE, Integer.MAX_VALUE);
-
+ _listeners.put(destinationName, new MessagePartListenerAdapter(listener));
+ _session.messageSubscribe
+ (queueName,
+ destinationName,
+ MessageAcceptMode.NONE,
+ MessageAcquireMode.PRE_ACQUIRED,
+ null, 0, null);
+
+ _session.messageFlow(destinationName, MessageCreditUnit.BYTE, Session.UNLIMITED_CREDIT);
+ _session.messageFlow(destinationName, MessageCreditUnit.MESSAGE, Session.UNLIMITED_CREDIT);
+
LOGGER.debug(
- "<QMAN-200003> : New subscription between queue %s and destination %s has been declared.",
+ "<QMAN-200003> : New subscription between queue %s and destination %s has been declared.",
queueName,
destinationName);
}
-
+
/**
* Removes a previously declared consumer from the broker.
- *
+ *
* @param destinationName the name of the destination, or delivery tag, for the subscriber.
* @see Session#messageCancel(String, Option...)
*/
@@ -173,10 +205,10 @@
{
_session.messageCancel(destinationName);
LOGGER.debug(
- "<QMAN-200026> : Subscription named %s has been removed from remote broker.",
+ "<QMAN-200026> : Subscription named %s has been removed from remote broker.",
destinationName);
- }
-
+ }
+
/**
* Declares a queue on the broker with the given name.
*
@@ -200,27 +232,27 @@
_session.queueDelete(queueName);
LOGGER.debug("<QMAN-2000025> : Queue with name %s has been removed.",queueName);
}
-
+
/**
* Binds (on the broker) a queue with an exchange.
*
- * @param queueName the name of the queue to bind.
+ * @param queueName the name of the queue to bind.
* @param exchangeName the exchange name.
- * @param routingKey the routing key used for the binding.
+ * @param routingKey the routing key used for the binding.
* @see Session#exchangeBind(String, String, String, java.util.Map, Option...)
*/
public void declareBinding(String queueName, String exchangeName, String routingKey)
{
_session.exchangeBind(queueName, exchangeName, routingKey, null);
LOGGER.debug(
- "<QMAN-200005> : New binding with %s as routing key has been declared between queue %s and exchange %s.",
+ "<QMAN-200005> : New binding with %s as routing key has been declared between queue %s and exchange %s.",
routingKey,queueName,
exchangeName);
}
-
+
/**
* Removes a previously declare binding between an exchange and a queue.
- *
+ *
* @param queueName the name of the queue.
* @param exchangeName the name of the exchange.
* @param routingKey the routing key used for binding.
@@ -229,42 +261,42 @@
{
_session.exchangeUnbind(queueName, exchangeName, routingKey);
LOGGER.debug(
- "<QMAN-200005> : Binding with %s as routing key has been removed between queue %s and exchange %s.",
+ "<QMAN-200005> : Binding with %s as routing key has been removed between queue %s and exchange %s.",
routingKey,queueName,
exchangeName);
}
-
+
/**
* Sends a command message with the given data on the management queue.
- *
+ *
* @param messageData the command message content.
*/
- public void sendCommandMessage(byte [] messageData)
+ public void sendCommandMessage(byte [] messageData)
{
_session.messageTransfer(
Names.MANAGEMENT_EXCHANGE,
MessageAcceptMode.EXPLICIT,
MessageAcquireMode.PRE_ACQUIRED,
Configuration.getInstance().getCommandMessageHeader(),
- messageData);
-
+ messageData);
+
Log.logMessageContent (messageData);
}
-
+
/**
* Sends a command message with the given data on the management queue.
- *
+ *
* @param messageData the command message content.
*/
- public void sendCommandMessage(ByteBuffer messageData)
+ public void sendCommandMessage(ByteBuffer messageData)
{
_session.messageTransfer(
Names.MANAGEMENT_EXCHANGE,
MessageAcceptMode.EXPLICIT,
MessageAcquireMode.PRE_ACQUIRED,
Configuration.getInstance().getCommandMessageHeader(),
- messageData);
-
+ messageData);
+
Log.logMessageContent (messageData);
- }
+ }
}
\ No newline at end of file
Modified: incubator/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java?rev=703208&r1=703207&r2=703208&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java (original)
+++ incubator/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java Thu Oct 9 10:07:59 2008
@@ -640,71 +640,47 @@
}
private static final org.apache.qpid.transport.Connection getConnection
- (Options opts, final SessionDelegate delegate)
+ (Options opts)
{
- final Object lock = new Object();
org.apache.qpid.transport.Connection conn =
- IoTransport.connect(opts.broker, opts.port,
- new ClientDelegate()
- {
- public SessionDelegate getSessionDelegate()
- {
- return delegate;
- }
- public void exception(Throwable t)
- {
- t.printStackTrace();
- }
- public void closed() {}
- @Override public void connectionOpenOk(Channel ch,
- ConnectionOpenOk ok)
- {
- synchronized (lock)
- {
- lock.notify();
- }
- }
- });
- conn.send(new ProtocolHeader(1, 0, 10));
-
- synchronized (lock)
+ new org.apache.qpid.transport.Connection();
+ conn.connect(opts.broker, opts.port, null, "guest", "guest");
+ return conn;
+ }
+
+ private static abstract class NativeListener implements SessionListener
+ {
+
+ public void opened(org.apache.qpid.transport.Session ssn) {}
+
+ public void exception(org.apache.qpid.transport.Session ssn,
+ SessionException exc)
{
- try
- {
- lock.wait();
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
+ exc.printStackTrace();
}
- return conn;
+ public void closed(org.apache.qpid.transport.Session ssn) {}
+
}
private static final void native_publisher(Options opts) throws Exception
{
final long[] echos = { 0 };
- org.apache.qpid.transport.Connection conn = getConnection
- (opts,
- new SessionDelegate() {
- @Override public void messageTransfer
- (org.apache.qpid.transport.Session ssn,
- MessageTransfer mt)
- {
- synchronized (echos)
- {
- echos[0]++;
- echos.notify();
- }
- ssn.processed(mt);
- }
- });
-
- Channel ch = conn.getChannel(0);
- org.apache.qpid.transport.Session ssn = new org.apache.qpid.transport.Session("spam-session".getBytes());
- ssn.attach(ch);
- ssn.sessionAttach(ssn.getName());
+ org.apache.qpid.transport.Connection conn = getConnection(opts);
+ org.apache.qpid.transport.Session ssn = conn.createSession();
+ ssn.setSessionListener(new NativeListener()
+ {
+ public void message(org.apache.qpid.transport.Session ssn,
+ MessageTransfer xfr)
+ {
+ synchronized (echos)
+ {
+ echos[0]++;
+ echos.notify();
+ }
+ ssn.processed(xfr);
+ }
+ });
ssn.invoke(new QueueDeclare().queue("test-queue").durable(false));
ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false));
@@ -794,6 +770,7 @@
ssn.messageCancel("echo-queue");
ssn.sync();
+ ssn.close();
conn.close();
}
@@ -805,57 +782,51 @@
dp.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
final MessageProperties mp = new MessageProperties();
final Object done = new Object();
- org.apache.qpid.transport.Connection conn = getConnection
- (opts,
- new SessionDelegate() {
-
- private long count = 0;
- private long lastTime = 0;
- private long start;
-
- @Override public void messageTransfer
- (org.apache.qpid.transport.Session ssn,
- MessageTransfer mt)
- {
- if (count == 0)
- {
- start = System.currentTimeMillis();
- }
-
- boolean sample = opts.sample > 0 && (count % opts.sample) == 0;
- long time = sample ? System.currentTimeMillis() : 0;
-
- if (opts.window > 0 && (count % opts.window) == 0)
- {
- ssn.messageTransfer("amq.direct",
- MessageAcceptMode.NONE,
- MessageAcquireMode.PRE_ACQUIRED,
- new Header(dp, mp),
- echo);
- }
-
- if (sample)
- {
- sample(opts, Column.RIGHT, "NC", count, start, time, lastTime);
- lastTime = time;
- }
- ssn.processed(mt);
- count++;
-
- if (opts.count > 0 && count >= opts.count)
- {
- synchronized (done)
- {
- done.notify();
- }
- }
- }
- });
-
- Channel ch = conn.getChannel(0);
- org.apache.qpid.transport.Session ssn = new org.apache.qpid.transport.Session("listener-session".getBytes());
- ssn.attach(ch);
- ssn.sessionAttach(ssn.getName());
+ org.apache.qpid.transport.Connection conn = getConnection(opts);
+ org.apache.qpid.transport.Session ssn = conn.createSession();
+ ssn.setSessionListener(new NativeListener()
+ {
+ private long count = 0;
+ private long lastTime = 0;
+ private long start;
+
+ public void message(org.apache.qpid.transport.Session ssn,
+ MessageTransfer xfr)
+ {
+ if (count == 0)
+ {
+ start = System.currentTimeMillis();
+ }
+
+ boolean sample = opts.sample > 0 && (count % opts.sample) == 0;
+ long time = sample ? System.currentTimeMillis() : 0;
+
+ if (opts.window > 0 && (count % opts.window) == 0)
+ {
+ ssn.messageTransfer("amq.direct",
+ MessageAcceptMode.NONE,
+ MessageAcquireMode.PRE_ACQUIRED,
+ new Header(dp, mp),
+ echo);
+ }
+
+ if (sample)
+ {
+ sample(opts, Column.RIGHT, "NC", count, start, time, lastTime);
+ lastTime = time;
+ }
+ ssn.processed(xfr);
+ count++;
+
+ if (opts.count > 0 && count >= opts.count)
+ {
+ synchronized (done)
+ {
+ done.notify();
+ }
+ }
+ }
+ });
ssn.invoke(new QueueDeclare().queue("test-queue").durable(false));
ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false));
@@ -879,6 +850,7 @@
ssn.messageCancel("test-queue");
ssn.sync();
+ ssn.close();
conn.close();
}