You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/10/13 18:07:01 UTC
svn commit: r704147 - in /incubator/qpid/trunk/qpid/java/common: ./
src/main/java/org/apache/qpid/ src/main/java/org/apache/qpid/transport/
src/main/java/org/apache/qpid/transport/network/io/
src/test/java/org/apache/qpid/transport/
Author: rhs
Date: Mon Oct 13 09:07:01 2008
New Revision: 704147
URL: http://svn.apache.org/viewvc?rev=704147&view=rev
Log:
QPID-1339:
- Removed the Channel class in order to simplify the state management
surrounding Sessions and Connections.
- Consolidated the ChannelDelegate into the ConnectionDelegate.
- Modified MethodDelegate to invoke a generic handle method as the
default action for each dispatched method.
- Modified the code generator to produce a separate ConnectionInvoker
and SessionInvoker.
- Modified the invoker template to use package level visibility for
all controls rather than public visibility.
Removed:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Channel.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ChannelDelegate.java
Modified:
incubator/qpid/trunk/qpid/java/common/Composite.tpl
incubator/qpid/trunk/qpid/java/common/Invoker.tpl
incubator/qpid/trunk/qpid/java/common/MethodDelegate.tpl
incubator/qpid/trunk/qpid/java/common/codegen
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionException.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
Modified: incubator/qpid/trunk/qpid/java/common/Composite.tpl
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/Composite.tpl?rev=704147&r1=704146&r2=704147&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/Composite.tpl (original)
+++ incubator/qpid/trunk/qpid/java/common/Composite.tpl Mon Oct 13 09:07:01 2008
@@ -130,9 +130,13 @@
}
}
- public <C> void dispatch(C context, MethodDelegate<C> delegate) {
+${
+
+if base == "Method":
+ out(""" public <C> void dispatch(C context, MethodDelegate<C> delegate) {
delegate.$(dromedary(name))(context, this);
- }
+ }""")
+}
${
for f in fields:
Modified: incubator/qpid/trunk/qpid/java/common/Invoker.tpl
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/Invoker.tpl?rev=704147&r1=704146&r2=704147&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/Invoker.tpl (original)
+++ incubator/qpid/trunk/qpid/java/common/Invoker.tpl Mon Oct 13 09:07:01 2008
@@ -5,14 +5,12 @@
import java.util.Map;
import java.util.UUID;
-public abstract class Invoker {
-
- protected abstract void invoke(Method method);
- protected abstract <T> Future<T> invoke(Method method, Class<T> resultClass);
-
+public abstract class $(invoker) {
${
from genutil import *
+results = False
+
for c in composites:
name = cname(c)
fields = get_fields(c)
@@ -20,6 +18,7 @@
args = get_arguments(c, fields)
result = c["result"]
if result:
+ results = True
if not result["@type"]:
rname = cname(result["struct"])
else:
@@ -32,11 +31,22 @@
jreturn = ""
jclass = ""
+ if c.name == "command":
+ access = "public "
+ else:
+ access = ""
+
out("""
- public final $jresult $(dromedary(name))($(", ".join(params))) {
+ $(access)final $jresult $(dromedary(name))($(", ".join(params))) {
$(jreturn)invoke(new $name($(", ".join(args)))$jclass);
}
""")
}
-
+ protected abstract void invoke(Method method);
+${
+if results:
+ out("""
+ protected abstract <T> Future<T> invoke(Method method, Class<T> resultClass);
+""")
+}
}
Modified: incubator/qpid/trunk/qpid/java/common/MethodDelegate.tpl
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/MethodDelegate.tpl?rev=704147&r1=704146&r2=704147&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/MethodDelegate.tpl (original)
+++ incubator/qpid/trunk/qpid/java/common/MethodDelegate.tpl Mon Oct 13 09:07:01 2008
@@ -2,11 +2,15 @@
public abstract class MethodDelegate<C> {
+ public abstract void handle(C context, Method method);
${
from genutil import *
for c in composites:
name = cname(c)
- out(" public void $(dromedary(name))(C context, $name struct) {}\n")
+ out("""
+ public void $(dromedary(name))(C context, $name method) {
+ handle(context, method);
+ }""")
}
}
Modified: incubator/qpid/trunk/qpid/java/common/codegen
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/codegen?rev=704147&r1=704146&r2=704147&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/codegen (original)
+++ incubator/qpid/trunk/qpid/java/common/codegen Mon Oct 13 09:07:01 2008
@@ -43,14 +43,20 @@
commands = spec.query["amqp/class/command", excludes]
composites = structs + controls + commands
+actions = controls + commands
+connection = [c for c in actions if c.parent["@name"] == "connection"]
+session = [c for c in actions if c.parent["@name"] != "connection"]
for c in composites:
name = cname(c)
execute("%s.java" % name, "Composite.tpl", type = c, name = name)
-execute("MethodDelegate.java", "MethodDelegate.tpl", composites = composites)
+execute("MethodDelegate.java", "MethodDelegate.tpl", composites = actions)
execute("Option.java", "Option.tpl", composites = composites)
-execute("Invoker.java", "Invoker.tpl", composites = controls + commands)
+execute("ConnectionInvoker.java", "Invoker.tpl", invoker = "ConnectionInvoker",
+ composites = connection)
+execute("SessionInvoker.java", "Invoker.tpl", invoker = "SessionInvoker",
+ composites = session)
execute("StructFactory.java", "StructFactory.tpl", composites = composites)
def is_enum(nd):
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java?rev=704147&r1=704146&r2=704147&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java Mon Oct 13 09:07:01 2008
@@ -33,48 +33,28 @@
* @author Rafael H. Schloming
*/
-class ToyClient extends SessionDelegate
+class ToyClient implements SessionListener
{
+ public void opened(Session ssn) {}
- @Override public void messageReject(Session ssn, MessageReject reject)
+ public void exception(Session ssn, SessionException exc)
{
- for (Range range : reject.getTransfers())
- {
- for (long l = range.getLower(); l <= range.getUpper(); l++)
- {
- System.out.println("message rejected: " +
- ssn.getCommand((int) l));
- }
- }
+ exc.printStackTrace();
}
- @Override public void messageTransfer(Session ssn, MessageTransfer xfr)
+ public void message(Session ssn, MessageTransfer xfr)
{
System.out.println("msg: " + xfr);
}
+ public void closed(Session ssn) {}
+
public static final void main(String[] args)
{
- Connection conn = MinaHandler.connect("0.0.0.0", 5672,
- new ClientDelegate(null, "guest", "guest")
- {
- public SessionDelegate getSessionDelegate()
- {
- return new ToyClient();
- }
- public void exception(Throwable t)
- {
- t.printStackTrace();
- }
- public void closed() {}
- });
- conn.send(new ProtocolHeader
- (1, 0, 10));
-
- Channel ch = conn.getChannel(0);
- Session ssn = new Session("my-session".getBytes());
- ssn.attach(ch);
- ssn.sessionAttach(ssn.getName());
+ Connection conn = new Connection();
+ conn.connect("0.0.0.0", 5672, null, "guest", "guest");
+ Session ssn = conn.createSession();
+ ssn.setSessionListener(new ToyClient());
ssn.queueDeclare("asdf", null, null);
ssn.sync();
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java?rev=704147&r1=704146&r2=704147&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java Mon Oct 13 09:07:01 2008
@@ -20,6 +20,10 @@
*/
package org.apache.qpid.transport;
+import java.nio.ByteBuffer;
+
+import static org.apache.qpid.transport.util.Functions.*;
+
/**
* Binary
@@ -51,6 +55,13 @@
this(bytes, 0, bytes.length);
}
+ public final byte[] getBytes()
+ {
+ byte[] result = new byte[size];
+ System.arraycopy(bytes, offset, result, 0, size);
+ return result;
+ }
+
public final byte[] array()
{
return bytes;
@@ -126,4 +137,9 @@
return true;
}
+ public String toString()
+ {
+ return str(ByteBuffer.wrap(bytes, offset, size));
+ }
+
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java?rev=704147&r1=704146&r2=704147&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java Mon Oct 13 09:07:01 2008
@@ -62,23 +62,20 @@
this.password = password;
}
- public void init(Channel ch, ProtocolHeader hdr)
+ public void init(Connection conn, ProtocolHeader hdr)
{
if (!(hdr.getMajor() == 0 && hdr.getMinor() == 10))
{
- Connection conn = ch.getConnection();
conn.exception(new ProtocolVersionException(hdr.getMajor(), hdr.getMinor()));
}
-
}
- @Override public void connectionStart(Channel ch, ConnectionStart start)
+ @Override public void connectionStart(Connection conn, ConnectionStart start)
{
- Connection conn = ch.getConnection();
List<Object> mechanisms = start.getMechanisms();
if (mechanisms == null || mechanisms.isEmpty())
{
- ch.connectionStartOk
+ conn.connectionStartOk
(Collections.EMPTY_MAP, null, null, conn.getLocale());
return;
}
@@ -97,7 +94,7 @@
byte[] response = sc.hasInitialResponse() ?
sc.evaluateChallenge(new byte[0]) : null;
- ch.connectionStartOk
+ conn.connectionStartOk
(Collections.EMPTY_MAP, sc.getMechanismName(), response,
conn.getLocale());
}
@@ -107,14 +104,13 @@
}
}
- @Override public void connectionSecure(Channel ch, ConnectionSecure secure)
+ @Override public void connectionSecure(Connection conn, ConnectionSecure secure)
{
- Connection conn = ch.getConnection();
SaslClient sc = conn.getSaslClient();
try
{
byte[] response = sc.evaluateChallenge(secure.getChallenge());
- ch.connectionSecureOk(response);
+ conn.connectionSecureOk(response);
}
catch (SaslException e)
{
@@ -122,20 +118,19 @@
}
}
- @Override public void connectionTune(Channel ch, ConnectionTune tune)
+ @Override public void connectionTune(Connection conn, ConnectionTune tune)
{
- Connection conn = ch.getConnection();
conn.setChannelMax(tune.getChannelMax());
- ch.connectionTuneOk(tune.getChannelMax(), tune.getMaxFrameSize(), tune.getHeartbeatMax());
- ch.connectionOpen(vhost, null, Option.INSIST);
+ conn.connectionTuneOk(tune.getChannelMax(), tune.getMaxFrameSize(), tune.getHeartbeatMax());
+ conn.connectionOpen(vhost, null, Option.INSIST);
}
- @Override public void connectionOpenOk(Channel ch, ConnectionOpenOk ok)
+ @Override public void connectionOpenOk(Connection conn, ConnectionOpenOk ok)
{
- ch.getConnection().setState(OPEN);
+ conn.setState(OPEN);
}
- @Override public void connectionRedirect(Channel ch, ConnectionRedirect redir)
+ @Override public void connectionRedirect(Connection conn, ConnectionRedirect redir)
{
throw new UnsupportedOperationException();
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=704147&r1=704146&r2=704147&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java Mon Oct 13 09:07:01 2008
@@ -24,6 +24,7 @@
import org.apache.qpid.transport.network.io.IoTransport;
import org.apache.qpid.transport.util.Logger;
import org.apache.qpid.transport.util.Waiter;
+import org.apache.qpid.util.Strings;
import java.util.ArrayList;
import java.util.HashMap;
@@ -48,7 +49,7 @@
* short instead of Short
*/
-public class Connection
+public class Connection extends ConnectionInvoker
implements Receiver<ProtocolEvent>, Sender<ProtocolEvent>
{
@@ -69,7 +70,8 @@
private ConnectionDelegate delegate;
private Sender<ProtocolEvent> sender;
- final private Map<Integer,Channel> channels = new HashMap<Integer,Channel>();
+ final private Map<Binary,Session> sessions = new HashMap<Binary,Session>();
+ final private Map<Integer,Session> channels = new HashMap<Integer,Session>();
private State state = NEW;
private Object lock = new Object();
@@ -200,14 +202,45 @@
return createSession(0);
}
- public Session createSession(long expiryInSeconds)
+ public Session createSession(long timeout)
{
- Channel ch = getChannel();
- Session ssn = new Session(UUID.randomUUID().toString().getBytes());
- ssn.attach(ch);
- ssn.sessionAttach(ssn.getName());
- ssn.sessionRequestTimeout(expiryInSeconds);
- return ssn;
+ return createSession(UUID.randomUUID().toString(), timeout);
+ }
+
+ public Session createSession(String name)
+ {
+ return createSession(name, 0);
+ }
+
+ public Session createSession(String name, long timeout)
+ {
+ return createSession(Strings.toUTF8(name), timeout);
+ }
+
+ public Session createSession(byte[] name, long timeout)
+ {
+ return createSession(new Binary(name), timeout);
+ }
+
+ public Session createSession(Binary name, long timeout)
+ {
+ synchronized (lock)
+ {
+ Session ssn = new Session(this, name);
+ sessions.put(name, ssn);
+ map(ssn);
+ ssn.sessionAttach(name.getBytes());
+ ssn.sessionRequestTimeout(timeout);
+ return ssn;
+ }
+ }
+
+ void removeSession(Session ssn)
+ {
+ synchronized (lock)
+ {
+ sessions.remove(ssn.getName());
+ }
}
public void setConnectionId(int id)
@@ -228,8 +261,7 @@
public void received(ProtocolEvent event)
{
log.debug("RECV: [%s] %s", this, event);
- Channel channel = getChannel(event.getChannel());
- channel.received(event);
+ event.delegate(this, delegate);
}
public void send(ProtocolEvent event)
@@ -249,6 +281,22 @@
sender.flush();
}
+ protected void invoke(Method method)
+ {
+ method.setChannel(0);
+ send(method);
+ if (!method.isBatch())
+ {
+ flush();
+ }
+ }
+
+ public void dispatch(Method method)
+ {
+ Session ssn = getSession(method.getChannel());
+ ssn.received(method);
+ }
+
public int getChannelMax()
{
return channelMax;
@@ -259,7 +307,7 @@
channelMax = max;
}
- public Channel getChannel()
+ private int map(Session ssn)
{
synchronized (lock)
{
@@ -267,7 +315,8 @@
{
if (!channels.containsKey(i))
{
- return getChannel(i);
+ map(ssn, i);
+ return i;
}
}
@@ -275,25 +324,28 @@
}
}
- public Channel getChannel(int number)
+ void map(Session ssn, int channel)
{
synchronized (lock)
{
- Channel channel = channels.get(number);
- if (channel == null)
- {
- channel = new Channel(this, number, delegate.getSessionDelegate());
- channels.put(number, channel);
- }
- return channel;
+ channels.put(channel, ssn);
+ ssn.setChannel(channel);
+ }
+ }
+
+ void unmap(Session ssn)
+ {
+ synchronized (lock)
+ {
+ channels.remove(ssn.getChannel());
}
}
- void removeChannel(int number)
+ Session getSession(int channel)
{
synchronized (lock)
{
- channels.remove(number);
+ return channels.get(channel);
}
}
@@ -324,9 +376,9 @@
{
synchronized (lock)
{
- for (Channel ch : channels.values())
+ for (Session ssn : channels.values())
{
- ch.closeCode(close);
+ ssn.closeCode(close);
}
ConnectionCloseCode code = close.getReplyCode();
if (code != ConnectionCloseCode.NORMAL)
@@ -347,10 +399,10 @@
synchronized (lock)
{
- List<Channel> values = new ArrayList<Channel>(channels.values());
- for (Channel ch : values)
+ List<Session> values = new ArrayList<Session>(channels.values());
+ for (Session ssn : values)
{
- ch.closed();
+ ssn.closed();
}
sender = null;
@@ -367,9 +419,8 @@
switch (state)
{
case OPEN:
- Channel ch = getChannel(0);
state = CLOSING;
- ch.connectionClose(ConnectionCloseCode.NORMAL, null);
+ connectionClose(ConnectionCloseCode.NORMAL, null);
Waiter w = new Waiter(lock, timeout);
while (w.hasTime() && state == CLOSING && error == null)
{
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java?rev=704147&r1=704146&r2=704147&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java Mon Oct 13 09:07:01 2008
@@ -37,31 +37,64 @@
*
* the connectionClose is kind of different for both sides
*/
-public abstract class ConnectionDelegate extends MethodDelegate<Channel>
+public abstract class ConnectionDelegate
+ extends MethodDelegate<Connection>
+ implements ProtocolDelegate<Connection>
{
private static final Logger log = Logger.get(ConnectionDelegate.class);
- public SessionDelegate getSessionDelegate()
+ public void control(Connection conn, Method method)
{
- return new SessionDelegate();
+ method.dispatch(conn, this);
}
- public abstract void init(Channel ch, ProtocolHeader hdr);
+ public void command(Connection conn, Method method)
+ {
+ method.dispatch(conn, this);
+ }
+
+ public void error(Connection conn, ProtocolError error)
+ {
+ conn.exception(new ConnectionException(error.getMessage()));
+ }
- @Override public void connectionClose(Channel ch, ConnectionClose close)
+ public void handle(Connection conn, Method method)
{
- Connection conn = ch.getConnection();
- ch.connectionCloseOk();
+ conn.dispatch(method);
+ }
+
+ @Override public void connectionClose(Connection conn, ConnectionClose close)
+ {
+ conn.connectionCloseOk();
conn.getSender().close();
conn.closeCode(close);
conn.setState(CLOSE_RCVD);
}
- @Override public void connectionCloseOk(Channel ch, ConnectionCloseOk ok)
+ @Override public void connectionCloseOk(Connection conn, ConnectionCloseOk ok)
{
- Connection conn = ch.getConnection();
conn.getSender().close();
}
+ @Override public void sessionAttached(Connection conn, SessionAttached atc)
+ {
+
+ }
+
+ @Override public void sessionDetach(Connection conn, SessionDetach dtc)
+ {
+ Session ssn = conn.getSession(dtc.getChannel());
+ conn.unmap(ssn);
+ ssn.sessionDetached(dtc.getName(), SessionDetachCode.NORMAL);
+ ssn.closed();
+ }
+
+ @Override public void sessionDetached(Connection conn, SessionDetached dtc)
+ {
+ Session ssn = conn.getSession(dtc.getChannel());
+ conn.unmap(ssn);
+ ssn.closed();
+ }
+
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java?rev=704147&r1=704146&r2=704147&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java Mon Oct 13 09:07:01 2008
@@ -32,22 +32,33 @@
*
*/
-public class Echo extends SessionDelegate
+public class Echo implements SessionListener
{
- public void messageTransfer(Session ssn, MessageTransfer xfr)
+ public void opened(Session ssn) {}
+
+ public void message(Session ssn, MessageTransfer xfr)
{
ssn.invoke(xfr);
ssn.processed(xfr);
}
+ public void exception(Session ssn, SessionException exc)
+ {
+ exc.printStackTrace();
+ }
+
+ public void closed(Session ssn) {}
+
public static final void main(String[] args) throws IOException
{
ConnectionDelegate delegate = new ServerDelegate()
{
- public SessionDelegate getSessionDelegate()
+ @Override public Session getSession(Connection conn, SessionAttach atc)
{
- return new Echo();
+ Session ssn = super.getSession(conn, atc);
+ ssn.setSessionListener(new Echo());
+ return ssn;
}
};
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java?rev=704147&r1=704146&r2=704147&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java Mon Oct 13 09:07:01 2008
@@ -55,24 +55,22 @@
private SaslServer saslServer;
- public void init(Channel ch, ProtocolHeader hdr)
+ public void init(Connection conn, ProtocolHeader hdr)
{
- Connection conn = ch.getConnection();
conn.send(new ProtocolHeader(1, 0, 10));
List<Object> utf8 = new ArrayList<Object>();
utf8.add("utf8");
- ch.connectionStart(null, Collections.EMPTY_LIST, utf8);
+ conn.connectionStart(null, Collections.EMPTY_LIST, utf8);
}
- @Override public void connectionStartOk(Channel ch, ConnectionStartOk ok)
+ @Override public void connectionStartOk(Connection conn, ConnectionStartOk ok)
{
- Connection conn = ch.getConnection();
conn.setLocale(ok.getLocale());
String mechanism = ok.getMechanism();
if (mechanism == null || mechanism.length() == 0)
{
- ch.connectionTune
+ conn.connectionTune
(Integer.MAX_VALUE,
org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
0, Integer.MAX_VALUE);
@@ -85,13 +83,13 @@
(mechanism, "AMQP", "localhost", null, null);
if (ss == null)
{
- ch.connectionClose
+ conn.connectionClose
(ConnectionCloseCode.CONNECTION_FORCED,
"null SASL mechanism: " + mechanism);
return;
}
conn.setSaslServer(ss);
- secure(ch, ok.getResponse());
+ secure(conn, ok.getResponse());
}
catch (SaslException e)
{
@@ -99,9 +97,8 @@
}
}
- private void secure(Channel ch, byte[] response)
+ private void secure(Connection conn, byte[] response)
{
- Connection conn = ch.getConnection();
SaslServer ss = conn.getSaslServer();
try
{
@@ -109,14 +106,14 @@
if (ss.isComplete())
{
ss.dispose();
- ch.connectionTune
+ conn.connectionTune
(Integer.MAX_VALUE,
org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
0, Integer.MAX_VALUE);
}
else
{
- ch.connectionSecure(challenge);
+ conn.connectionSecure(challenge);
}
}
catch (SaslException e)
@@ -125,21 +122,32 @@
}
}
- @Override public void connectionSecureOk(Channel ch, ConnectionSecureOk ok)
+ @Override public void connectionSecureOk(Connection conn, ConnectionSecureOk ok)
{
- secure(ch, ok.getResponse());
+ secure(conn, ok.getResponse());
}
- @Override public void connectionTuneOk(Channel ch, ConnectionTuneOk ok)
+ @Override public void connectionTuneOk(Connection conn, ConnectionTuneOk ok)
{
}
- @Override public void connectionOpen(Channel ch, ConnectionOpen open)
+ @Override public void connectionOpen(Connection conn, ConnectionOpen open)
{
- Connection conn = ch.getConnection();
- ch.connectionOpenOk(Collections.EMPTY_LIST);
+ conn.connectionOpenOk(Collections.EMPTY_LIST);
conn.setState(OPEN);
}
+ public Session getSession(Connection conn, SessionAttach atc)
+ {
+ return new Session(conn, new Binary(atc.getName()));
+ }
+
+ @Override public void sessionAttach(Connection conn, SessionAttach atc)
+ {
+ Session ssn = getSession(conn, atc);
+ conn.map(ssn, atc.getChannel());
+ ssn.sessionAttached(atc.getName());
+ }
+
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=704147&r1=704146&r2=704147&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java Mon Oct 13 09:07:01 2008
@@ -44,7 +44,7 @@
* @author Rafael H. Schloming
*/
-public class Session extends Invoker
+public class Session extends SessionInvoker
{
private static final Logger log = Logger.get(Session.class);
@@ -84,14 +84,14 @@
}
}
- private byte[] name;
+ private Connection connection;
+ private Binary name;
+ private int channel;
+ private SessionDelegate delegate = new SessionDelegate();
private SessionListener listener = new DefaultSessionListener();
private long timeout = 60000;
private boolean autoSync = false;
- // channel may be null
- Channel channel;
-
// incoming command count
int commandsIn = 0;
// completed incoming commands
@@ -108,16 +108,27 @@
private AtomicBoolean closed = new AtomicBoolean(false);
- public Session(byte[] name)
+ Session(Connection connection, Binary name)
{
+ this.connection = connection;
this.name = name;
}
- public byte[] getName()
+ public Binary getName()
{
return name;
}
+ int getChannel()
+ {
+ return channel;
+ }
+
+ void setChannel(int channel)
+ {
+ this.channel = channel;
+ }
+
public void setSessionListener(SessionListener listener)
{
if (listener == null)
@@ -212,8 +223,12 @@
{
maxProcessed = max(maxProcessed, upper);
}
- flush = lt(old, syncPoint) && ge(maxProcessed, syncPoint);
- syncPoint = maxProcessed;
+ boolean synced = ge(maxProcessed, syncPoint);
+ flush = lt(old, syncPoint) && synced;
+ if (synced)
+ {
+ syncPoint = maxProcessed;
+ }
}
if (flush)
{
@@ -266,12 +281,6 @@
}
}
- public void attach(Channel channel)
- {
- this.channel = channel;
- channel.setSession(this);
- }
-
public Method getCommand(int id)
{
synchronized (commands)
@@ -304,6 +313,22 @@
}
}
+ void received(Method m)
+ {
+ m.delegate(this, delegate);
+ }
+
+ private void send(Method m)
+ {
+ m.setChannel(channel);
+ connection.send(m);
+
+ if (!m.isBatch())
+ {
+ connection.flush();
+ }
+ }
+
public void invoke(Method m)
{
if (closed.get())
@@ -342,7 +367,7 @@
m.setSync(true);
}
needSync = !m.isSync();
- channel.method(m);
+ send(m);
if (autoSync)
{
sync();
@@ -358,7 +383,7 @@
}
else
{
- channel.method(m);
+ send(m);
}
}
@@ -564,7 +589,7 @@
public void close()
{
sessionRequestTimeout(0);
- sessionDetach(name);
+ sessionDetach(name.getBytes());
synchronized (commands)
{
long start = System.currentTimeMillis();
@@ -576,12 +601,19 @@
commands.wait(timeout - elapsed);
elapsed = System.currentTimeMillis() - start;
}
+
+ if (!closed.get())
+ {
+ throw new SessionException("close() timed out");
+ }
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
}
+
+ connection.removeSession(this);
}
public void exception(Throwable t)
@@ -606,13 +638,11 @@
}
}
}
- channel.setSession(null);
- channel = null;
}
public String toString()
{
- return String.format("ssn:%s", str(name));
+ return String.format("ssn:%s", name);
}
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java?rev=704147&r1=704146&r2=704147&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java Mon Oct 13 09:07:01 2008
@@ -38,7 +38,7 @@
public SessionClosedException(Throwable cause)
{
- super(null, cause);
+ super("session closed", null, cause);
}
@Override public void rethrow()
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java?rev=704147&r1=704146&r2=704147&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java Mon Oct 13 09:07:01 2008
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.transport;
-import org.apache.qpid.transport.network.Frame;
+import org.apache.qpid.transport.util.Logger;
/**
@@ -33,6 +33,8 @@
extends MethodDelegate<Session>
implements ProtocolDelegate<Session>
{
+ private static final Logger log = Logger.get(SessionDelegate.class);
+
public void init(Session ssn, ProtocolHeader hdr) { }
public void control(Session ssn, Method method) {
@@ -50,15 +52,12 @@
public void error(Session ssn, ProtocolError error) { }
- @Override public void executionResult(Session ssn, ExecutionResult result)
+ public void handle(Session ssn, Method method)
{
- ssn.result(result.getCommandId(), result.getValue());
+ log.warn("UNHANDLED: [%s] %s", ssn, method);
}
- @Override public void executionException(Session ssn, ExecutionException exc)
- {
- ssn.setException(exc);
- }
+ @Override public void sessionTimeout(Session ssn, SessionTimeout t) {}
@Override public void sessionCompleted(Session ssn, SessionCompleted cmp)
{
@@ -122,6 +121,16 @@
ssn.syncPoint();
}
+ @Override public void executionResult(Session ssn, ExecutionResult result)
+ {
+ ssn.result(result.getCommandId(), result.getValue());
+ }
+
+ @Override public void executionException(Session ssn, ExecutionException exc)
+ {
+ ssn.setException(exc);
+ }
+
@Override public void messageTransfer(Session ssn, MessageTransfer xfr)
{
ssn.getSessionListener().message(ssn, xfr);
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionException.java?rev=704147&r1=704146&r2=704147&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionException.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionException.java Mon Oct 13 09:07:01 2008
@@ -32,15 +32,20 @@
private ExecutionException exception;
- public SessionException(ExecutionException exception, Throwable cause)
+ public SessionException(String message, ExecutionException exception, Throwable cause)
{
- super(String.valueOf(exception), cause);
+ super(message, cause);
this.exception = exception;
}
public SessionException(ExecutionException exception)
{
- this(exception, null);
+ this(String.valueOf(exception), exception, null);
+ }
+
+ public SessionException(String message)
+ {
+ this(message, null, null);
}
public ExecutionException getException()
@@ -50,7 +55,7 @@
@Override public void rethrow()
{
- throw new SessionException(exception, this);
+ throw new SessionException(getMessage(), exception, this);
}
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java?rev=704147&r1=704146&r2=704147&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Sink.java Mon Oct 13 09:07:01 2008
@@ -31,7 +31,7 @@
*
*/
-public class Sink extends SessionDelegate
+public class Sink implements SessionListener
{
private static final String FORMAT_HDR = "%-12s %-18s %-18s %-18s";
@@ -85,7 +85,9 @@
return String.format("%d/%.2f", count, ((double) bytes)/(1024*1024));
}
- public void messageTransfer(Session ssn, MessageTransfer xfr)
+ public void opened(Session ssn) {}
+
+ public void message(Session ssn, MessageTransfer xfr)
{
count++;
bytes += xfr.getBody().remaining();
@@ -101,14 +103,22 @@
ssn.processed(xfr);
}
+ public void exception(Session ssn, SessionException exc)
+ {
+ exc.printStackTrace();
+ }
+
+ public void closed(Session ssn) {}
+
public static final void main(String[] args) throws IOException
{
ConnectionDelegate delegate = new ServerDelegate()
{
-
- public SessionDelegate getSessionDelegate()
+ @Override public Session getSession(Connection conn, SessionAttach atc)
{
- return new Sink();
+ Session ssn = super.getSession(conn, atc);
+ ssn.setSessionListener(new Sink());
+ return ssn;
}
};
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java?rev=704147&r1=704146&r2=704147&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java Mon Oct 13 09:07:01 2008
@@ -181,6 +181,11 @@
{
if (!closed.getAndSet(true))
{
+ synchronized (notFull)
+ {
+ notFull.notify();
+ }
+
synchronized (notEmpty)
{
notEmpty.notify();
Modified: incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java?rev=704147&r1=704146&r2=704147&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java Mon Oct 13 09:07:01 2008
@@ -51,10 +51,10 @@
port = AvailablePortFinder.getNextAvailable(12000);
ConnectionDelegate server = new ServerDelegate() {
- @Override public void connectionOpen(Channel ch, ConnectionOpen open)
+ @Override public void connectionOpen(Connection conn, ConnectionOpen open)
{
- super.connectionOpen(ch, open);
- ch.getConnection().close();
+ super.connectionOpen(conn, open);
+ conn.close();
}
};
@@ -94,13 +94,9 @@
fail("never got notified of connection close");
}
- Channel ch = conn.getChannel(0);
- Session ssn = new Session("test".getBytes());
- ssn.attach(ch);
-
try
{
- ssn.sessionAttach(ssn.getName());
+ conn.connectionCloseOk();
fail("writing to a closed socket succeeded");
}
catch (TransportException e)