You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2007/09/13 23:43:04 UTC
svn commit: r575474 [2/2] - in /incubator/qpid/trunk/qpid/java:
client/src/main/java/org/apache/qpid/client/
client/src/main/java/org/apache/qpid/client/message/
client/src/main/java/org/apache/qpidity/client/
client/src/main/java/org/apache/qpidity/cl...
Copied: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java (from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Connection.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java?p2=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java&p1=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Connection.java&r1=573657&r2=575474&rev=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Connection.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java Thu Sep 13 14:42:57 2007
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
import java.util.HashMap;
import java.util.Map;
@@ -37,25 +37,19 @@
*/
// RA making this public until we sort out the package issues
-public class Connection implements ProtocolActions
+public class Connection
+ implements Receiver<ConnectionEvent>, Sender<ConnectionEvent>
{
- final private Handler<ByteBuffer> input;
- final private Handler<ByteBuffer> output;
+ final private Sender<ConnectionEvent> sender;
final private ConnectionDelegate delegate;
final private Map<Integer,Channel> channels = new HashMap<Integer,Channel>();
- // XXX: hardcoded versions
- private ProtocolHeader header = new ProtocolHeader((byte) 1, (byte) 0, (byte) 10);
- // XXX
- private int maxFrame = 64*1024;
-
- public Connection(Handler<ByteBuffer> output,
- ConnectionDelegate delegate,
- InputHandler.State state)
+
+ public Connection(Sender<ConnectionEvent> sender,
+ ConnectionDelegate delegate)
{
- this.input = new InputHandler(this, state);
- this.output = output;
+ this.sender = sender;
this.delegate = delegate;
}
@@ -63,79 +57,48 @@
{
return delegate;
}
-
- public Connection(Handler<ByteBuffer> output,
- ConnectionDelegate delegate)
- {
- this(output, delegate, InputHandler.State.PROTO_HDR);
- }
- public Handler<ByteBuffer> getInputHandler()
+ public void received(ConnectionEvent event)
{
- return input;
+ Channel channel = getChannel(event.getChannel());
+ channel.received(event.getProtocolEvent());
}
- public Handler<ByteBuffer> getOutputHandler()
+ public void send(ConnectionEvent event)
{
- return output;
+ sender.send(event);
}
- public ProtocolHeader getHeader()
- {
- return header;
- }
-
- public byte getMajor()
- {
- return header.getMajor();
- }
-
- public byte getMinor()
- {
- return header.getMinor();
- }
-
- public int getMaxFrame()
- {
- return maxFrame;
- }
-
- public void init(ProtocolHeader hdr)
+ public Channel getChannel(int number)
{
- System.out.println(header);
- if (hdr.getMajor() != header.getMajor() &&
- hdr.getMinor() != header.getMinor())
+ synchronized (channels)
{
- output.handle(header.toByteBuffer());
- // XXX: how do we close the connection?
+ Channel channel = channels.get(number);
+ if (channel == null)
+ {
+ channel = new Channel(this, number, delegate.getSessionDelegate());
+ channels.put(number, channel);
+ }
+ return channel;
}
-
- // not sure if this is the right place
- System.out.println("\n--------------------Broker Start Connection Negotiation -----------------------\n");
-
- getChannel(0).connectionStart(header.getMajor(), header.getMinor(), null, "PLAIN", "utf8");
}
- public Channel getChannel(int number)
+ void removeChannel(int number)
{
- Channel channel = channels.get(number);
- if (channel == null)
+ synchronized (channels)
{
- channel = new Channel(this, number, delegate.getSessionDelegate());
- channels.put(number, channel);
+ channels.remove(number);
}
- return channel;
}
- public void frame(Frame frame)
+ public void closed()
{
- Channel channel = getChannel(frame.getChannel());
- channel.handle(frame);
+ System.out.println("connection closed: " + this);
}
- public void error(ProtocolError error)
+ public void close()
{
- throw new RuntimeException(error.getMessage());
+ sender.close();
}
}
Copied: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java (from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java?p2=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java&p1=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java&r1=573657&r2=575474&rev=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java Thu Sep 13 14:42:57 2007
@@ -18,7 +18,10 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
+
+import org.apache.qpidity.SecurityHelper;
+import org.apache.qpidity.QpidException;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
@@ -41,7 +44,7 @@
/**
* Currently only implemented client specific methods
* the server specific methods are dummy impls for testing
- *
+ *
* the connectionClose is kind of different for both sides
*/
public abstract class ConnectionDelegate extends Delegate<Channel>
@@ -56,23 +59,47 @@
private int maxFrame = 64*1024;
private Condition _negotiationComplete;
private Lock _negotiationCompleteLock;
-
+
public abstract SessionDelegate getSessionDelegate();
-
+
public void setCondition(Lock negotiationCompleteLock,Condition negotiationComplete)
{
_negotiationComplete = negotiationComplete;
_negotiationCompleteLock = negotiationCompleteLock;
}
-
+
+ @Override public void init(Channel ch, ProtocolHeader hdr)
+ {
+ System.out.println(hdr);
+ // XXX: hardcoded version
+ if (hdr.getMajor() != 0 && hdr.getMinor() != 10)
+ {
+ // XXX
+ ch.getConnection().send(new ConnectionEvent(0, new ProtocolHeader(1, 0, 10)));
+ ch.getConnection().close();
+ }
+ else
+ {
+
+ System.out.println("\n--------------------Broker Start Connection Negotiation -----------------------\n");
+
+ ch.connectionStart(hdr.getMajor(), hdr.getMinor(), null, "PLAIN", "utf8");
+ }
+ }
+
+ @Override public void error(Channel ch, ProtocolError error)
+ {
+ throw new RuntimeException(error.getMessage());
+ }
+
// ----------------------------------------------
- // Client side
+ // Client side
//-----------------------------------------------
- @Override public void connectionStart(Channel context, ConnectionStart struct)
+ @Override public void connectionStart(Channel context, ConnectionStart struct)
{
System.out.println("\n--------------------Client Start Connection Negotiation -----------------------\n");
System.out.println("The broker has sent connection-start");
-
+
String mechanism = null;
String response = null;
try
@@ -94,15 +121,15 @@
{
// need error handling
}
-
- Map<String,?> props = new HashMap<String,String>();
+
+ Map<String,?> props = new HashMap<String,String>();
context.connectionStartOk(props, mechanism, response, _locale);
}
-
- @Override public void connectionSecure(Channel context, ConnectionSecure struct)
+
+ @Override public void connectionSecure(Channel context, ConnectionSecure struct)
{
System.out.println("The broker has sent connection-secure with chanllenge " + struct.getChallenge());
-
+
try
{
String response = new String(saslClient.evaluateChallenge(struct.getChallenge().getBytes()),_locale);
@@ -115,20 +142,20 @@
catch (SaslException e)
{
// need error handling
- }
+ }
}
-
- @Override public void connectionTune(Channel context, ConnectionTune struct)
+
+ @Override public void connectionTune(Channel context, ConnectionTune struct)
{
System.out.println("The broker has sent connection-tune " + struct.toString());
-
+
// should update the channel max given by the broker.
- context.connectionTuneOk(struct.getChannelMax(), struct.getFrameMax(), struct.getHeartbeat());
+ context.connectionTuneOk(struct.getChannelMax(), struct.getFrameMax(), struct.getHeartbeat());
context.connectionOpen(_virtualHost, null, Option.INSIST);
}
-
-
- @Override public void connectionOpenOk(Channel context, ConnectionOpenOk struct)
+
+
+ @Override public void connectionOpenOk(Channel context, ConnectionOpenOk struct)
{
String knownHosts = struct.getKnownHosts();
System.out.println("The broker has opened the connection for use");
@@ -147,23 +174,23 @@
}
System.out.println("\n-------------------- Client End Connection Negotiation -----------------------\n");
}
-
- public void connectionRedirect(Channel context, ConnectionRedirect struct)
+
+ public void connectionRedirect(Channel context, ConnectionRedirect struct)
{
// not going to bother at the moment
}
-
+
// ----------------------------------------------
- // Server side
+ // Server side
//-----------------------------------------------
- @Override public void connectionStartOk(Channel context, ConnectionStartOk struct)
+ @Override public void connectionStartOk(Channel context, ConnectionStartOk struct)
{
//set the client side locale on the server side
_locale = struct.getLocale();
_mechanism = struct.getMechanism();
-
+
System.out.println("The client has sent connection-start-ok");
-
+
//try
//{
//saslServer = Sasl.createSaslServer(_mechanism, "AMQP", "ABC",null,SecurityHelper.createCallbackHandler(_mechanism,_username,_password));
@@ -183,11 +210,11 @@
}
catch(Exception e)
{
-
+
}
}
-
-
+
+
/*}
catch (SaslException e)
{
@@ -198,13 +225,13 @@
// need error handling
}*/
}
-
- @Override public void connectionTuneOk(Channel context, ConnectionTuneOk struct)
+
+ @Override public void connectionTuneOk(Channel context, ConnectionTuneOk struct)
{
System.out.println("The client has excepted the tune params");
}
-
- @Override public void connectionSecureOk(Channel context, ConnectionSecureOk struct)
+
+ @Override public void connectionSecureOk(Channel context, ConnectionSecureOk struct)
{
System.out.println("The client has sent connection-secure-ok");
try
@@ -225,11 +252,11 @@
}
catch(Exception e)
{
-
+
}
}
-
-
+
+
}
catch (SaslException e)
{
@@ -240,17 +267,16 @@
// need error handling
}
}
-
-
- @Override public void connectionOpen(Channel context, ConnectionOpen struct)
+
+
+ @Override public void connectionOpen(Channel context, ConnectionOpen struct)
{
String hosts = "amqp:1223243232325";
System.out.println("The client has sent connection-open");
context.connectionOpenOk(hosts);
System.out.println("\n-------------------- Broker End Connection Negotiation -----------------------\n");
}
-
-
+
public String getPassword()
{
return _password;
Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionEvent.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionEvent.java?rev=575474&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionEvent.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionEvent.java Thu Sep 13 14:42:57 2007
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport;
+
+
+/**
+ * ConnectionEvent
+ *
+ */
+
+public class ConnectionEvent
+{
+
+ private final int channel;
+ private final ProtocolEvent event;
+
+ public ConnectionEvent(int channel, ProtocolEvent event)
+ {
+ this.channel = channel;
+ this.event = event;
+ }
+
+ public int getChannel()
+ {
+ return channel;
+ }
+
+ public ProtocolEvent getProtocolEvent()
+ {
+ return event;
+ }
+
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionEvent.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Data.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Data.java?rev=575474&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Data.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Data.java Thu Sep 13 14:42:57 2007
@@ -0,0 +1,84 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport;
+
+import org.apache.qpidity.transport.network.Frame;
+
+import java.nio.ByteBuffer;
+
+import java.util.Collections;
+
+
+/**
+ * Data
+ *
+ */
+
+public class Data implements ProtocolEvent
+{
+
+ private final Iterable<ByteBuffer> fragments;
+ private final boolean first;
+ private final boolean last;
+
+ public Data(Iterable<ByteBuffer> fragments, boolean first, boolean last)
+ {
+ this.fragments = fragments;
+ this.first = first;
+ this.last = last;
+ }
+
+ public Data(ByteBuffer buf, boolean first, boolean last)
+ {
+ this(Collections.singletonList(buf), first, last);
+ }
+
+ public Iterable<ByteBuffer> getFragments()
+ {
+ return fragments;
+ }
+
+ public boolean isFirst()
+ {
+ return first;
+ }
+
+ public boolean isLast()
+ {
+ return last;
+ }
+
+ public byte getEncodedTrack()
+ {
+ return Frame.L4;
+ }
+
+ public <C> void delegate(C context, Delegate<C> delegate)
+ {
+ delegate.data(context, this);
+ }
+
+ public <C> void delegate(C context, Switch sw)
+ {
+ sw.data(context, this);
+ }
+
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Data.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Future.java (from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Future.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Future.java?p2=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Future.java&p1=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Future.java&r1=573657&r2=575474&rev=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Future.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Future.java Thu Sep 13 14:42:57 2007
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
/**
Copied: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Header.java (from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Header.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Header.java?p2=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Header.java&p1=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Header.java&r1=573657&r2=575474&rev=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Header.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Header.java Thu Sep 13 14:42:57 2007
@@ -18,7 +18,11 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
+
+import org.apache.qpidity.transport.network.Frame;
+
+import java.util.List;
/**
@@ -27,4 +31,46 @@
* @author Rafael H. Schloming
*/
-public abstract class Header extends Struct {}
+public class Header implements ProtocolEvent {
+
+ private final List<Struct> structs;
+
+ public Header(List<Struct> structs)
+ {
+ this.structs = structs;
+ }
+
+ public List<Struct> getStructs()
+ {
+ return structs;
+ }
+
+ public <T> T get(Class<T> klass)
+ {
+ for (Struct st : structs)
+ {
+ if (klass.isInstance(st))
+ {
+ return klass.cast(st);
+ }
+ }
+
+ return null;
+ }
+
+ public byte getEncodedTrack()
+ {
+ return Frame.L4;
+ }
+
+ public <C> void delegate(C context, Delegate<C> delegate)
+ {
+ delegate.header(context, this);
+ }
+
+ public <C> void delegate(C context, Switch sw)
+ {
+ sw.header(context, this);
+ }
+
+}
Copied: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Method.java (from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Method.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Method.java?p2=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Method.java&p1=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Method.java&r1=573657&r2=575474&rev=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Method.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Method.java Thu Sep 13 14:42:57 2007
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
/**
@@ -27,7 +27,7 @@
* @author Rafael H. Schloming
*/
-public abstract class Method extends Struct
+public abstract class Method extends Struct implements ProtocolEvent
{
public static final Method create(int type)
@@ -54,11 +54,9 @@
public abstract byte getEncodedTrack();
- // XXX: do we need a segment base type?
- public byte getSegmentType()
+ public <C> void delegate(C context, Switch sw)
{
- // XXX
- return Frame.METHOD;
+ sw.method(context, this);
}
}
Copied: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolError.java (from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolError.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolError.java?p2=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolError.java&p1=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolError.java&r1=573657&r2=575474&rev=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolError.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolError.java Thu Sep 13 14:42:57 2007
@@ -18,7 +18,10 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
+
+import org.apache.qpidity.transport.network.NetworkDelegate;
+import org.apache.qpidity.transport.network.NetworkEvent;
/**
@@ -27,21 +30,43 @@
* @author Rafael H. Schloming
*/
-class ProtocolError
+public class ProtocolError implements NetworkEvent, ProtocolEvent
{
+ private final byte track;
private final String format;
private final Object[] args;
- public ProtocolError(String format, Object ... args)
+ public ProtocolError(byte track, String format, Object ... args)
{
+ this.track = track;
this.format = format;
this.args = args;
}
+ public byte getEncodedTrack()
+ {
+ return track;
+ }
+
public String getMessage()
{
return String.format(format, args);
+ }
+
+ public <C> void delegate(C context, Switch sw)
+ {
+ sw.error(context, this);
+ }
+
+ public void delegate(NetworkDelegate delegate)
+ {
+ delegate.error(this);
+ }
+
+ public <C> void delegate(C context, Delegate<C> delegate)
+ {
+ delegate.error(context, this);
}
}
Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolEvent.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolEvent.java?rev=575474&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolEvent.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolEvent.java Thu Sep 13 14:42:57 2007
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport;
+
+
+/**
+ * ProtocolEvent
+ *
+ */
+
+public interface ProtocolEvent
+{
+
+ public interface Switch<C>
+ {
+ void init(C context, ProtocolHeader header);
+ void method(C context, Method method);
+ void header(C context, Header header);
+ void data(C context, Data data);
+ void error(C context, ProtocolError error);
+ }
+
+ // XXX: could do this switching with cascading defaults for the
+ // specific dispatch methods
+ <C> void delegate(C context, Switch sw);
+
+ <C> void delegate(C context, Delegate<C> delegate);
+
+ byte getEncodedTrack();
+
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolEvent.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolHeader.java (from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolHeader.java?p2=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolHeader.java&p1=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java&r1=573657&r2=575474&rev=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolHeader.java Thu Sep 13 14:42:57 2007
@@ -18,10 +18,14 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
import java.nio.ByteBuffer;
+import org.apache.qpidity.transport.network.NetworkDelegate;
+import org.apache.qpidity.transport.network.NetworkEvent;
+import org.apache.qpidity.transport.network.Frame;
+
/**
* ProtocolHeader
@@ -31,7 +35,7 @@
//RA making this public until we sort out the package issues
-public class ProtocolHeader
+public class ProtocolHeader implements NetworkEvent, ProtocolEvent
{
private static final byte[] AMQP = {'A', 'M', 'Q', 'P' };
@@ -48,6 +52,11 @@
this.minor = minor;
}
+ public ProtocolHeader(int instance, int major, int minor)
+ {
+ this((byte) instance, (byte) major, (byte) minor);
+ }
+
public byte getInstance()
{
return instance;
@@ -63,6 +72,11 @@
return minor;
}
+ public byte getEncodedTrack()
+ {
+ return Frame.L1;
+ }
+
public ByteBuffer toByteBuffer()
{
ByteBuffer buf = ByteBuffer.allocate(8);
@@ -73,6 +87,21 @@
buf.put(minor);
buf.flip();
return buf;
+ }
+
+ public <C> void delegate(C context, Switch sw)
+ {
+ sw.init(context, this);
+ }
+
+ public void delegate(NetworkDelegate delegate)
+ {
+ delegate.init(this);
+ }
+
+ public <C> void delegate(C context, Delegate<C> delegate)
+ {
+ delegate.init(context, this);
}
public String toString()
Copied: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Range.java (from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Range.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Range.java?p2=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Range.java&p1=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Range.java&r1=573657&r2=575474&rev=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Range.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Range.java Thu Sep 13 14:42:57 2007
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
import static java.lang.Math.*;
Copied: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java (from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/RangeSet.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java?p2=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java&p1=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/RangeSet.java&r1=573657&r2=575474&rev=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/RangeSet.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java Thu Sep 13 14:42:57 2007
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
import java.util.Collection;
import java.util.Iterator;
Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Receiver.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Receiver.java?rev=575474&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Receiver.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Receiver.java Thu Sep 13 14:42:57 2007
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport;
+
+
+/**
+ * Receiver
+ *
+ */
+
+public interface Receiver<T>
+{
+
+ void received(T msg);
+
+ void closed();
+
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Receiver.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Result.java (from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Result.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Result.java?p2=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Result.java&p1=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Result.java&r1=573657&r2=575474&rev=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Result.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Result.java Thu Sep 13 14:42:57 2007
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
/**
Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Sender.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Sender.java?rev=575474&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Sender.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Sender.java Thu Sep 13 14:42:57 2007
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport;
+
+
+/**
+ * Sender
+ *
+ */
+
+public interface Sender<T>
+{
+
+ void send(T msg);
+
+ void close();
+
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Sender.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java (from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Session.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java?p2=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java&p1=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Session.java&r1=573657&r2=575474&rev=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Session.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java Thu Sep 13 14:42:57 2007
@@ -18,10 +18,14 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
+
+import org.apache.qpidity.transport.network.Frame;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
@@ -42,12 +46,12 @@
// completed incoming commands
private final RangeSet processed = new RangeSet();
private Range syncPoint = null;
-
+
// outgoing command count
private long commandsOut = 0;
private Map<Long,Method> commands = new HashMap<Long,Method>();
private long mark = 0;
-
+
public Map<Long,Method> getOutstandingCommands()
{
@@ -104,19 +108,19 @@
}
void flushProcessed()
- {
+ {
for (Range r: processed)
{
- System.out.println("Completed Range [" + r.getLower() + "," + r.getUpper() +"]" );
+ System.out.println("Completed Range [" + r.getLower() + "," + r.getUpper() +"]" );
}
- System.out.println("Notifying peer with execution complete");
+ System.out.println("Notifying peer with execution complete");
executionComplete(0, processed);
}
void syncPoint()
{
System.out.println("===========Request received to sync==========================");
-
+
Range range = new Range(0, getCommandsIn() - 1);
boolean flush;
synchronized (processed)
@@ -154,8 +158,8 @@
for (long id = lower; id <= upper; id++)
{
commands.remove(id);
- }
-
+ }
+
if (commands.isEmpty())
{
System.out.println("\n All outstanding commands are completed !!!! \n");
@@ -177,15 +181,25 @@
synchronized (commands)
{
System.out.println("sent command " + m.getClass().getName() + " command Id" + commandsOut);
- commands.put(commandsOut++, m);
+ commands.put(commandsOut++, m);
}
}
channel.method(m);
}
- public void headers(Struct ... headers)
+ public void header(Header header)
+ {
+ channel.header(header);
+ }
+
+ public void header(List<Struct> structs)
+ {
+ header(new Header(structs));
+ }
+
+ public void header(Struct ... structs)
{
- channel.headers(headers);
+ header(Arrays.asList(structs));
}
public void data(ByteBuffer buf)
@@ -210,7 +224,7 @@
public void sync()
{
- System.out.println("calling sync()");
+ System.out.println("calling sync()");
synchronized (commands)
{
if (!commands.isEmpty())
@@ -221,7 +235,7 @@
while (!commands.isEmpty())
{
try {
- System.out.println("\n============sync() waiting for commmands to be completed ==============\n");
+ System.out.println("\n============sync() waiting for commmands to be completed ==============\n");
commands.wait();
System.out.println("\n============sync() got notified=========================================\n");
}
@@ -312,6 +326,12 @@
return result != null;
}
+ }
+
+ public void close()
+ {
+ sessionClose();
+ channel.close();
}
}
Copied: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java (from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java?p2=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java&p1=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java&r1=573657&r2=575474&rev=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java Thu Sep 13 14:42:57 2007
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
/**
@@ -30,9 +30,7 @@
public abstract class SessionDelegate extends Delegate<Session>
{
- public abstract void headers(Session ssn, Struct ... headers);
-
- public abstract void data(Session ssn, Frame frame);
+ private static final Struct[] EMPTY_STRUCT_ARRAY = {};
@Override public void executionResult(Session ssn, ExecutionResult result)
{
@@ -47,7 +45,7 @@
for (Range range : ranges)
{
System.out.println("completed command range: " + range.getLower() + " to " + range.getUpper());
- ssn.complete(range.getLower(), range.getUpper());
+ ssn.complete(range.getLower(), range.getUpper());
}
}
ssn.complete(excmp.getCumulativeExecutionMark());
Copied: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Struct.java (from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Struct.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Struct.java?p2=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Struct.java&p1=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Struct.java&r1=573657&r2=575474&rev=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Struct.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Struct.java Thu Sep 13 14:42:57 2007
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
import org.apache.qpidity.codec.Encodable;
@@ -29,7 +29,7 @@
* @author Rafael H. Schloming
*/
-public abstract class Struct implements Delegator, Encodable
+public abstract class Struct implements Encodable
{
public static Struct create(int type)
Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java?rev=575474&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java Thu Sep 13 14:42:57 2007
@@ -0,0 +1,184 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport.network;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpidity.codec.FragmentDecoder;
+
+import org.apache.qpidity.transport.ConnectionEvent;
+import org.apache.qpidity.transport.Data;
+import org.apache.qpidity.transport.Header;
+import org.apache.qpidity.transport.Method;
+import org.apache.qpidity.transport.ProtocolError;
+import org.apache.qpidity.transport.ProtocolEvent;
+import org.apache.qpidity.transport.ProtocolHeader;
+import org.apache.qpidity.transport.Receiver;
+import org.apache.qpidity.transport.Struct;
+
+
+/**
+ * Assembler
+ *
+ */
+
+public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
+{
+
+ private final Receiver<ConnectionEvent> receiver;
+ private final byte major;
+ private final byte minor;
+ private final Map<Integer,List<ByteBuffer>> segments;
+
+ public Assembler(Receiver<ConnectionEvent> receiver, byte major, byte minor)
+ {
+ this.receiver = receiver;
+ this.major = major;
+ this.minor = minor;
+ segments = new HashMap<Integer,List<ByteBuffer>>();
+ }
+
+ private int segmentKey(Frame frame)
+ {
+ // XXX: can this overflow?
+ return (frame.getTrack() + 1) * frame.getChannel();
+ }
+
+ private List<ByteBuffer> getSegment(Frame frame)
+ {
+ return segments.get(segmentKey(frame));
+ }
+
+ private void setSegment(Frame frame, List<ByteBuffer> segment)
+ {
+ int key = segmentKey(frame);
+ if (segments.containsKey(key))
+ {
+ error(new ProtocolError(Frame.L2, "segment in progress: %s",
+ frame));
+ }
+ segments.put(segmentKey(frame), segment);
+ }
+
+ private void clearSegment(Frame frame)
+ {
+ segments.remove(segmentKey(frame));
+ }
+
+ private void emit(int channel, ProtocolEvent event)
+ {
+ receiver.received(new ConnectionEvent(channel, event));
+ }
+
+ private void emit(Frame frame, ProtocolEvent event)
+ {
+ emit(frame.getChannel(), event);
+ }
+
+ public void received(NetworkEvent event)
+ {
+ event.delegate(this);
+ }
+
+ public void closed()
+ {
+ this.receiver.closed();
+ }
+
+ public void init(ProtocolHeader header)
+ {
+ emit(0, header);
+ }
+
+ public void frame(Frame frame)
+ {
+ switch (frame.getType())
+ {
+ case Frame.BODY:
+ emit(frame, new Data(frame, frame.isFirstFrame(),
+ frame.isLastFrame()));
+ break;
+ default:
+ assemble(frame);
+ break;
+ }
+ }
+
+ public void error(ProtocolError error)
+ {
+ emit(0, error);
+ }
+
+ private void assemble(Frame frame)
+ {
+ List<ByteBuffer> segment;
+ if (frame.isFirstFrame())
+ {
+ segment = new ArrayList<ByteBuffer>();
+ setSegment(frame, segment);
+ }
+ else
+ {
+ segment = getSegment(frame);
+ }
+
+ for (ByteBuffer buf : frame)
+ {
+ segment.add(buf);
+ }
+
+ if (frame.isLastFrame())
+ {
+ clearSegment(frame);
+ emit(frame, decode(frame.getType(), segment));
+ }
+ }
+
+ private ProtocolEvent decode(byte type, List<ByteBuffer> segment)
+ {
+ FragmentDecoder dec =
+ new FragmentDecoder(major, minor, segment.iterator());
+
+ switch (type)
+ {
+ case Frame.METHOD:
+ int methodType = (int) dec.readLong();
+ Method method = Method.create(methodType);
+ method.read(dec, major, minor);
+ return method;
+ case Frame.HEADER:
+ List<Struct> structs = new ArrayList();
+ while (dec.hasRemaining())
+ {
+ structs.add(dec.readLongStruct());
+ }
+ return new Header(structs);
+ default:
+ throw new IllegalStateException("unknown frame type: " + type);
+ }
+ }
+
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java?rev=575474&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java Thu Sep 13 14:42:57 2007
@@ -0,0 +1,168 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport.network;
+
+import org.apache.qpidity.codec.BBEncoder;
+import org.apache.qpidity.codec.SizeEncoder;
+
+import org.apache.qpidity.transport.ConnectionEvent;
+import org.apache.qpidity.transport.Data;
+import org.apache.qpidity.transport.Header;
+import org.apache.qpidity.transport.Method;
+import org.apache.qpidity.transport.ProtocolHeader;
+import org.apache.qpidity.transport.ProtocolError;
+import org.apache.qpidity.transport.ProtocolEvent;
+import org.apache.qpidity.transport.Sender;
+import org.apache.qpidity.transport.Struct;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.qpidity.transport.network.Frame.*;
+
+import static java.lang.Math.*;
+
+
+/**
+ * Disassembler
+ *
+ */
+
+public class Disassembler
+ implements Sender<ConnectionEvent>, ProtocolEvent.Switch<ConnectionEvent>
+{
+
+ private final Sender<NetworkEvent> sender;
+ private final int maxFrame;
+ private final byte major;
+ private final byte minor;
+
+ public Disassembler(Sender<NetworkEvent> sender, byte major, byte minor,
+ int maxFrame)
+ {
+ this.sender = sender;
+ this.major = major;
+ this.minor = minor;
+ this.maxFrame = maxFrame;
+ }
+
+ public void send(ConnectionEvent event)
+ {
+ event.getProtocolEvent().delegate(event, this);
+ }
+
+ public void close()
+ {
+ sender.close();
+ }
+
+ private void fragment(byte flags, byte type, ConnectionEvent event,
+ ByteBuffer buf, boolean first, boolean last)
+ {
+ while (buf.hasRemaining())
+ {
+ ByteBuffer slice = buf.slice();
+ slice.limit(min(maxFrame, slice.remaining()));
+ buf.position(buf.position() + slice.remaining());
+
+ byte newflags = flags;
+ if (first)
+ {
+ newflags |= FIRST_FRAME;
+ first = false;
+ }
+ if (last && !buf.hasRemaining())
+ {
+ newflags |= LAST_FRAME;
+ }
+
+ Frame frame = new Frame(newflags, type,
+ event.getProtocolEvent().getEncodedTrack(),
+ event.getChannel());
+ frame.addFragment(slice);
+ sender.send(frame);
+ }
+ }
+
+ public void init(ConnectionEvent event, ProtocolHeader header)
+ {
+ sender.send(header);
+ }
+
+ public void method(ConnectionEvent event, Method method)
+ {
+ SizeEncoder sizer = new SizeEncoder(major, minor);
+ sizer.writeLong(method.getEncodedType());
+ method.write(sizer, major, minor);
+ sizer.flush();
+ int size = sizer.getSize();
+
+ ByteBuffer buf = ByteBuffer.allocate(size);
+ BBEncoder enc = new BBEncoder(major, minor, buf);
+ enc.writeLong(method.getEncodedType());
+ method.write(enc, major, minor);
+ enc.flush();
+ buf.flip();
+
+ byte flags = FIRST_SEG;
+
+ if (!method.hasPayload())
+ {
+ flags |= LAST_SEG;
+ }
+
+ fragment(flags, METHOD, event, buf, true, true);
+ }
+
+ public void header(ConnectionEvent event, Header header)
+ {
+ SizeEncoder sizer = new SizeEncoder(major, minor);
+ for (Struct st : header.getStructs())
+ {
+ sizer.writeLongStruct(st);
+ }
+
+ ByteBuffer buf = ByteBuffer.allocate(sizer.getSize());
+ BBEncoder enc = new BBEncoder(major, minor, buf);
+ for (Struct st : header.getStructs())
+ {
+ enc.writeLongStruct(st);
+ enc.flush();
+ }
+ buf.flip();
+
+ fragment((byte) 0x0, HEADER, event, buf, true, true);
+ }
+
+ public void data(ConnectionEvent event, Data data)
+ {
+ for (ByteBuffer buf : data.getFragments())
+ {
+ fragment(LAST_SEG, BODY, event, buf, data.isFirst(),
+ data.isLast());
+ }
+ }
+
+ public void error(ConnectionEvent event, ProtocolError error)
+ {
+ sender.send(error);
+ }
+
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java (from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Frame.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java?p2=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java&p1=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Frame.java&r1=573657&r2=575474&rev=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Frame.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java Thu Sep 13 14:42:57 2007
@@ -18,7 +18,9 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport.network;
+
+import org.apache.qpidity.transport.util.SliceIterator;
import java.nio.ByteBuffer;
@@ -26,7 +28,7 @@
import java.util.List;
import java.util.Iterator;
-import static org.apache.qpidity.Functions.*;
+import static org.apache.qpidity.transport.util.Functions.*;
/**
@@ -36,7 +38,7 @@
*/
// RA: changed it to public until we sort the package issues
-public class Frame implements Iterable<ByteBuffer>
+public class Frame implements NetworkEvent, Iterable<ByteBuffer>
{
public static final int HEADER_SIZE = 12;
@@ -82,6 +84,11 @@
size += fragment.remaining();
}
+ public byte getFlags()
+ {
+ return flags;
+ }
+
public int getChannel()
{
return channel;
@@ -135,6 +142,11 @@
public Iterator<ByteBuffer> iterator()
{
return getFragments();
+ }
+
+ public void delegate(NetworkDelegate delegate)
+ {
+ delegate.frame(this);
}
public String toString()
Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java?rev=575474&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java Thu Sep 13 14:42:57 2007
@@ -0,0 +1,239 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport.network;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpidity.transport.ProtocolError;
+import org.apache.qpidity.transport.ProtocolHeader;
+import org.apache.qpidity.transport.Receiver;
+
+import static org.apache.qpidity.transport.network.InputHandler.State.*;
+
+
+/**
+ * InputHandler
+ *
+ * @author Rafael H. Schloming
+ */
+
+public class InputHandler implements Receiver<ByteBuffer>
+{
+
+ public enum State
+ {
+ PROTO_HDR,
+ PROTO_HDR_M,
+ PROTO_HDR_Q,
+ PROTO_HDR_P,
+ PROTO_HDR_CLASS,
+ PROTO_HDR_INSTANCE,
+ PROTO_HDR_MAJOR,
+ PROTO_HDR_MINOR,
+ FRAME_HDR,
+ FRAME_HDR_TYPE,
+ FRAME_HDR_SIZE1,
+ FRAME_HDR_SIZE2,
+ FRAME_HDR_RSVD1,
+ FRAME_HDR_TRACK,
+ FRAME_HDR_CH1,
+ FRAME_HDR_CH2,
+ FRAME_HDR_RSVD2,
+ FRAME_HDR_RSVD3,
+ FRAME_HDR_RSVD4,
+ FRAME_HDR_RSVD5,
+ FRAME_PAYLOAD,
+ FRAME_FRAGMENT,
+ ERROR;
+ }
+
+ private final Receiver<NetworkEvent> receiver;
+ private State state;
+
+ private byte instance;
+ private byte major;
+ private byte minor;
+
+ private byte flags;
+ private byte type;
+ private byte track;
+ private int channel;
+ private int size;
+ private Frame frame;
+
+ public InputHandler(Receiver<NetworkEvent> receiver, State state)
+ {
+ this.receiver = receiver;
+ this.state = state;
+ }
+
+ public InputHandler(Receiver<NetworkEvent> receiver)
+ {
+ this(receiver, PROTO_HDR);
+ }
+
+ private void init()
+ {
+ receiver.received(new ProtocolHeader(instance, major, minor));
+ }
+
+ private void frame()
+ {
+ assert size == frame.getSize();
+ receiver.received(frame);
+ frame = null;
+ }
+
+ private void error(String fmt, Object ... args)
+ {
+ receiver.received(new ProtocolError(Frame.L1, fmt, args));
+ }
+
+ public void received(ByteBuffer buf)
+ {
+ while (buf.hasRemaining())
+ {
+ state = next(buf);
+ }
+ }
+
+ private State next(ByteBuffer buf)
+ {
+ switch (state) {
+ case PROTO_HDR:
+ return expect(buf, 'A', PROTO_HDR_M);
+ case PROTO_HDR_M:
+ return expect(buf, 'M', PROTO_HDR_Q);
+ case PROTO_HDR_Q:
+ return expect(buf, 'Q', PROTO_HDR_P);
+ case PROTO_HDR_P:
+ return expect(buf, 'P', PROTO_HDR_CLASS);
+ case PROTO_HDR_CLASS:
+ return expect(buf, 1, PROTO_HDR_INSTANCE);
+ case PROTO_HDR_INSTANCE:
+ instance = buf.get();
+ return PROTO_HDR_MAJOR;
+ case PROTO_HDR_MAJOR:
+ major = buf.get();
+ return PROTO_HDR_MINOR;
+ case PROTO_HDR_MINOR:
+ minor = buf.get();
+ init();
+ return FRAME_HDR;
+ case FRAME_HDR:
+ flags = buf.get();
+ return FRAME_HDR_TYPE;
+ case FRAME_HDR_TYPE:
+ type = buf.get();
+ return FRAME_HDR_SIZE1;
+ case FRAME_HDR_SIZE1:
+ size = buf.get() << 8;
+ return FRAME_HDR_SIZE2;
+ case FRAME_HDR_SIZE2:
+ size += buf.get();
+ size -= 12;
+ return FRAME_HDR_RSVD1;
+ case FRAME_HDR_RSVD1:
+ return expect(buf, 0, FRAME_HDR_TRACK);
+ case FRAME_HDR_TRACK:
+ byte b = buf.get();
+ if ((b & 0xF0) != 0) {
+ error("non-zero reserved bits in upper nibble of " +
+ "frame header byte 5: '%x'", b);
+ return ERROR;
+ } else {
+ track = (byte) (b & 0xF);
+ return FRAME_HDR_CH1;
+ }
+ case FRAME_HDR_CH1:
+ channel = buf.get() << 8;
+ return FRAME_HDR_CH2;
+ case FRAME_HDR_CH2:
+ channel += buf.get();
+ return FRAME_HDR_RSVD2;
+ case FRAME_HDR_RSVD2:
+ return expect(buf, 0, FRAME_HDR_RSVD3);
+ case FRAME_HDR_RSVD3:
+ return expect(buf, 0, FRAME_HDR_RSVD4);
+ case FRAME_HDR_RSVD4:
+ return expect(buf, 0, FRAME_HDR_RSVD5);
+ case FRAME_HDR_RSVD5:
+ return expect(buf, 0, FRAME_PAYLOAD);
+ case FRAME_PAYLOAD:
+ frame = new Frame(flags, type, track, channel);
+ if (size > buf.remaining()) {
+ frame.addFragment(buf.slice());
+ buf.position(buf.limit());
+ return FRAME_FRAGMENT;
+ } else {
+ ByteBuffer payload = buf.slice();
+ payload.limit(size);
+ buf.position(buf.position() + size);
+ frame.addFragment(payload);
+ frame();
+ return FRAME_HDR;
+ }
+ case FRAME_FRAGMENT:
+ int delta = size - frame.getSize();
+ if (delta > buf.remaining()) {
+ frame.addFragment(buf.slice());
+ buf.position(buf.limit());
+ return FRAME_FRAGMENT;
+ } else {
+ ByteBuffer fragment = buf.slice();
+ fragment.limit(delta);
+ buf.position(buf.position() + delta);
+ frame.addFragment(fragment);
+ frame();
+ return FRAME_HDR;
+ }
+ default:
+ throw new IllegalStateException();
+ }
+ }
+
+ private State expect(ByteBuffer buf, int expected, State next)
+ {
+ return expect(buf, (byte) expected, next);
+ }
+
+ private State expect(ByteBuffer buf, char expected, State next)
+ {
+ return expect(buf, (byte) expected, next);
+ }
+
+ private State expect(ByteBuffer buf, byte expected, State next)
+ {
+ byte b = buf.get();
+ if (b == expected) {
+ return next;
+ } else {
+ error("expecting '%x', got '%x'", expected, b);
+ return ERROR;
+ }
+ }
+
+ public void closed()
+ {
+ receiver.closed();
+ }
+
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkDelegate.java (from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolActions.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkDelegate.java?p2=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkDelegate.java&p1=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolActions.java&r1=573657&r2=575474&rev=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolActions.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkDelegate.java Thu Sep 13 14:42:57 2007
@@ -18,16 +18,19 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport.network;
+
+import org.apache.qpidity.transport.ProtocolError;
+import org.apache.qpidity.transport.ProtocolHeader;
/**
- * ProtocolActions
+ * NetworkDelegate
*
* @author Rafael H. Schloming
*/
-interface ProtocolActions
+public interface NetworkDelegate
{
void init(ProtocolHeader header);
Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkEvent.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkEvent.java?rev=575474&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkEvent.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkEvent.java Thu Sep 13 14:42:57 2007
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport.network;
+
+
+/**
+ * NetworkEvent
+ *
+ */
+
+public interface NetworkEvent
+{
+
+ void delegate(NetworkDelegate delegate);
+
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkEvent.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java?rev=575474&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java Thu Sep 13 14:42:57 2007
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport.network;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpidity.transport.ProtocolError;
+import org.apache.qpidity.transport.ProtocolHeader;
+import org.apache.qpidity.transport.Sender;
+
+import static org.apache.qpidity.transport.network.Frame.*;
+
+
+/**
+ * OutputHandler
+ *
+ */
+
+public class OutputHandler implements Sender<NetworkEvent>, NetworkDelegate
+{
+
+ private Sender<ByteBuffer> sender;
+ private Object lock = new Object();
+
+ public OutputHandler(Sender<ByteBuffer> sender)
+ {
+ this.sender = sender;
+ }
+
+ public void send(NetworkEvent event)
+ {
+ event.delegate(this);
+ }
+
+ public void close()
+ {
+ synchronized (lock)
+ {
+ sender.close();
+ }
+ }
+
+ public void init(ProtocolHeader header)
+ {
+ synchronized (lock)
+ {
+ sender.send(header.toByteBuffer());
+ }
+ }
+
+ public void frame(Frame frame)
+ {
+ ByteBuffer hdr = ByteBuffer.allocate(HEADER_SIZE);
+ hdr.put(frame.getFlags());
+ hdr.put(frame.getType());
+ hdr.putShort((short) (frame.getSize() + HEADER_SIZE));
+ hdr.put(RESERVED);
+ hdr.put(frame.getTrack());
+ hdr.putShort((short) frame.getChannel());
+ hdr.put(RESERVED);
+ hdr.put(RESERVED);
+ hdr.put(RESERVED);
+ hdr.put(RESERVED);
+ hdr.flip();
+
+ synchronized (lock)
+ {
+ sender.send(hdr);
+ for (ByteBuffer buf : frame)
+ {
+ sender.send(buf);
+ }
+ }
+ }
+
+ public void error(ProtocolError error)
+ {
+ throw new IllegalStateException("XXX");
+ }
+
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java (from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/MinaHandler.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java?p2=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java&p1=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/MinaHandler.java&r1=573657&r2=575474&rev=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/MinaHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java Thu Sep 13 14:42:57 2007
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport.network.mina;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -28,7 +28,6 @@
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoAcceptor;
-import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.SimpleByteBufferAllocator;
@@ -36,6 +35,16 @@
import org.apache.mina.transport.socket.nio.SocketAcceptor;
import org.apache.mina.transport.socket.nio.SocketConnector;
+import org.apache.qpidity.transport.Connection;
+import org.apache.qpidity.transport.ConnectionDelegate;
+import org.apache.qpidity.transport.Receiver;
+import org.apache.qpidity.transport.Sender;
+
+import org.apache.qpidity.transport.network.Assembler;
+import org.apache.qpidity.transport.network.Disassembler;
+import org.apache.qpidity.transport.network.InputHandler;
+import org.apache.qpidity.transport.network.OutputHandler;
+
/**
* MinaHandler
@@ -57,9 +66,9 @@
public void messageReceived(IoSession ssn, Object obj)
{
- Connection conn = (Connection) ssn.getAttachment();
+ Attachment attachment = (Attachment) ssn.getAttachment();
ByteBuffer buf = (ByteBuffer) obj;
- conn.getInputHandler().handle(buf.buf());
+ attachment.receiver.received(buf.buf());
}
public void messageSent(IoSession ssn, Object obj)
@@ -80,16 +89,15 @@
public void sessionOpened(final IoSession ssn)
{
System.out.println("opened " + ssn);
- Connection conn = new Connection(new Handler<java.nio.ByteBuffer>()
- {
- public void handle(java.nio.ByteBuffer buf)
- {
- ssn.write(ByteBuffer.wrap(buf));
- }
- },
- delegate,
- state);
- ssn.setAttachment(conn);
+ // XXX: hardcoded version + max-frame
+ Connection conn = new Connection
+ (new Disassembler(new OutputHandler(new MinaSender(ssn)),
+ (byte)0, (byte)10, 64*1024),
+ delegate);
+ // XXX: hardcoded version
+ Receiver<java.nio.ByteBuffer> receiver =
+ new InputHandler(new Assembler(conn, (byte)0, (byte)10), state);
+ ssn.setAttachment(new Attachment(conn, receiver));
// XXX
synchronized (ssn)
{
@@ -100,21 +108,27 @@
public void sessionClosed(IoSession ssn)
{
System.out.println("closed " + ssn);
+ Attachment attachment = (Attachment) ssn.getAttachment();
+ attachment.receiver.closed();
ssn.setAttachment(null);
}
public void sessionIdle(IoSession ssn, IdleStatus status)
{
- System.out.println(status);
+ // do nothing
}
- public static final void main(String[] args) throws IOException
+ private class Attachment
{
- ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
- if (args[0].equals("accept")) {
- accept("0.0.0.0", 5672, SessionDelegateStub.source());
- } else if (args[0].equals("connect")) {
- connect("0.0.0.0", 5672, SessionDelegateStub.source());
+
+ Connection connection;
+ Receiver<java.nio.ByteBuffer> receiver;
+
+ Attachment(Connection connection,
+ Receiver<java.nio.ByteBuffer> receiver)
+ {
+ this.connection = connection;
+ this.receiver = receiver;
}
}
@@ -124,8 +138,7 @@
{
IoAcceptor acceptor = new SocketAcceptor();
acceptor.bind(new InetSocketAddress(host, port),
- new MinaHandler(delegate, InputHandler.State.PROTO_HDR));
-
+ new MinaHandler(delegate, InputHandler.State.PROTO_HDR));
}
public static final Connection connect(String host, int port,
@@ -134,7 +147,8 @@
MinaHandler handler = new MinaHandler(delegate,
InputHandler.State.FRAME_HDR);
SocketAddress addr = new InetSocketAddress(host, port);
- IoConnector connector = new SocketConnector();
+ SocketConnector connector = new SocketConnector();
+ connector.setWorkerTimeout(0);
ConnectFuture cf = connector.connect(addr, handler);
cf.join();
IoSession ssn = cf.getSession();
@@ -153,8 +167,8 @@
}
}
}
- Connection conn = (Connection) ssn.getAttachment();
- return conn;
+ Attachment attachment = (Attachment) ssn.getAttachment();
+ return attachment.connection;
}
}
Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java?rev=575474&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java Thu Sep 13 14:42:57 2007
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport.network.mina;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
+
+import org.apache.qpidity.transport.Sender;
+
+
+/**
+ * MinaSender
+ *
+ */
+
+public class MinaSender implements Sender<java.nio.ByteBuffer>
+{
+
+ private final IoSession session;
+
+ public MinaSender(IoSession session)
+ {
+ this.session = session;
+ }
+
+ public void send(java.nio.ByteBuffer buf)
+ {
+ session.write(ByteBuffer.wrap(buf));
+ }
+
+ public void close()
+ {
+ session.close();
+ }
+
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java (from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Functions.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java?p2=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java&p1=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Functions.java&r1=573657&r2=575474&rev=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Functions.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java Thu Sep 13 14:42:57 2007
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport.util;
import java.nio.ByteBuffer;
Copied: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/util/SliceIterator.java (from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SliceIterator.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/util/SliceIterator.java?p2=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/util/SliceIterator.java&p1=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SliceIterator.java&r1=573657&r2=575474&rev=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SliceIterator.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/util/SliceIterator.java Thu Sep 13 14:42:57 2007
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpidity;
+package org.apache.qpidity.transport.util;
import java.nio.ByteBuffer;
@@ -31,7 +31,7 @@
* @author Rafael H. Schloming
*/
-class SliceIterator implements Iterator<ByteBuffer>
+public class SliceIterator implements Iterator<ByteBuffer>
{
final private Iterator<ByteBuffer> iterator;
Modified: incubator/qpid/trunk/qpid/java/plugins/src/main/java/org/apache/qpid/plugins/JythonMojo.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/plugins/src/main/java/org/apache/qpid/plugins/JythonMojo.java?rev=575474&r1=575473&r2=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/plugins/src/main/java/org/apache/qpid/plugins/JythonMojo.java (original)
+++ incubator/qpid/trunk/qpid/java/plugins/src/main/java/org/apache/qpid/plugins/JythonMojo.java Thu Sep 13 14:42:57 2007
@@ -21,6 +21,7 @@
package org.apache.qpid.plugins;
import java.io.File;
+import java.io.IOException;
import org.apache.maven.plugin.AbstractMojo;
import org.apache.maven.plugin.MojoExecutionException;
@@ -46,9 +47,44 @@
*/
private String[] params = new String[0];
+ /**
+ * Source file.
+ *
+ * @parameter
+ */
+ private File source;
+
+ /**
+ * Optional timestamp.
+ *
+ * @parameter
+ */
+ private File timestamp;
+
public void execute() throws MojoExecutionException
{
+ if (source != null && timestamp != null)
+ {
+ if (timestamp.lastModified() > source.lastModified())
+ {
+ return;
+ }
+ }
+
jython.main(params);
+
+ if (timestamp != null)
+ {
+ try
+ {
+ timestamp.createNewFile();
+ }
+ catch (IOException e)
+ {
+ throw new MojoExecutionException("cannot create timestamp", e);
+ }
+ timestamp.setLastModified(System.currentTimeMillis());
+ }
}
}