You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2007/09/13 23:43:04 UTC

svn commit: r575474 [2/2] - in /incubator/qpid/trunk/qpid/java: client/src/main/java/org/apache/qpid/client/ client/src/main/java/org/apache/qpid/client/message/ client/src/main/java/org/apache/qpidity/client/ client/src/main/java/org/apache/qpidity/cl...

Copied: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java (from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Connection.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java?p2=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java&p1=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Connection.java&r1=573657&r2=575474&rev=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Connection.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java Thu Sep 13 14:42:57 2007
@@ -18,7 +18,7 @@
  * under the License.
  *
  */
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -37,25 +37,19 @@
  */
 
 // RA making this public until we sort out the package issues
-public class Connection implements ProtocolActions
+public class Connection
+    implements Receiver<ConnectionEvent>, Sender<ConnectionEvent>
 {
 
-    final private Handler<ByteBuffer> input;
-    final private Handler<ByteBuffer> output;
+    final private Sender<ConnectionEvent> sender;
     final private ConnectionDelegate delegate;
 
     final private Map<Integer,Channel> channels = new HashMap<Integer,Channel>();
-    // XXX: hardcoded versions
-    private ProtocolHeader header = new ProtocolHeader((byte) 1, (byte) 0, (byte) 10);
-    // XXX
-    private int maxFrame = 64*1024;
-
-    public Connection(Handler<ByteBuffer> output,
-                      ConnectionDelegate delegate,
-                      InputHandler.State state)
+
+    public Connection(Sender<ConnectionEvent> sender,
+                      ConnectionDelegate delegate)
     {
-        this.input = new InputHandler(this, state);
-        this.output = output;
+        this.sender = sender;
         this.delegate = delegate;
     }
 
@@ -63,79 +57,48 @@
     {
         return delegate;
     }
-    
-    public Connection(Handler<ByteBuffer> output,
-                      ConnectionDelegate delegate)
-    {
-        this(output, delegate, InputHandler.State.PROTO_HDR);
-    }
 
-    public Handler<ByteBuffer> getInputHandler()
+    public void received(ConnectionEvent event)
     {
-        return input;
+        Channel channel = getChannel(event.getChannel());
+        channel.received(event.getProtocolEvent());
     }
 
-    public Handler<ByteBuffer> getOutputHandler()
+    public void send(ConnectionEvent event)
     {
-        return output;
+        sender.send(event);
     }
 
-    public ProtocolHeader getHeader()
-    {
-        return header;
-    }
-
-    public byte getMajor()
-    {
-        return header.getMajor();
-    }
-
-    public byte getMinor()
-    {
-        return header.getMinor();
-    }
-
-    public int getMaxFrame()
-    {
-        return maxFrame;
-    }
-
-    public void init(ProtocolHeader hdr)
+    public Channel getChannel(int number)
     {
-        System.out.println(header);
-        if (hdr.getMajor() != header.getMajor() &&
-            hdr.getMinor() != header.getMinor())
+        synchronized (channels)
         {
-            output.handle(header.toByteBuffer());
-            // XXX: how do we close the connection?
+            Channel channel = channels.get(number);
+            if (channel == null)
+            {
+                channel = new Channel(this, number, delegate.getSessionDelegate());
+                channels.put(number, channel);
+            }
+            return channel;
         }
-        
-        // not sure if this is the right place
-        System.out.println("\n--------------------Broker Start Connection Negotiation -----------------------\n");
-        
-        getChannel(0).connectionStart(header.getMajor(), header.getMinor(), null, "PLAIN", "utf8");
     }
 
-    public Channel getChannel(int number)
+    void removeChannel(int number)
     {
-        Channel channel = channels.get(number);
-        if (channel == null)
+        synchronized (channels)
         {
-            channel = new Channel(this, number, delegate.getSessionDelegate());
-            channels.put(number, channel);
+            channels.remove(number);
         }
-        return channel;
     }
 
-    public void frame(Frame frame)
+    public void closed()
     {
-        Channel channel = getChannel(frame.getChannel());
-        channel.handle(frame);
+        System.out.println("connection closed: " + this);
     }
 
-    public void error(ProtocolError error)
+    public void close()
     {
-        throw new RuntimeException(error.getMessage());
+        sender.close();
     }
 
 }

Copied: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java (from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java?p2=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java&p1=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java&r1=573657&r2=575474&rev=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java Thu Sep 13 14:42:57 2007
@@ -18,7 +18,10 @@
  * under the License.
  *
  */
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
+
+import org.apache.qpidity.SecurityHelper;
+import org.apache.qpidity.QpidException;
 
 import java.io.UnsupportedEncodingException;
 import java.util.HashMap;
@@ -41,7 +44,7 @@
 /**
  * Currently only implemented client specific methods
  * the server specific methods are dummy impls for testing
- * 
+ *
  * the connectionClose is kind of different for both sides
  */
 public abstract class ConnectionDelegate extends Delegate<Channel>
@@ -56,23 +59,47 @@
     private int maxFrame = 64*1024;
     private Condition _negotiationComplete;
     private Lock _negotiationCompleteLock;
-    
+
     public abstract SessionDelegate getSessionDelegate();
-    
+
     public void setCondition(Lock negotiationCompleteLock,Condition negotiationComplete)
     {
         _negotiationComplete = negotiationComplete;
         _negotiationCompleteLock = negotiationCompleteLock;
     }
-    
+
+    @Override public void init(Channel ch, ProtocolHeader hdr)
+    {
+        System.out.println(hdr);
+        // XXX: hardcoded version
+        if (hdr.getMajor() != 0 && hdr.getMinor() != 10)
+        {
+            // XXX
+            ch.getConnection().send(new ConnectionEvent(0, new ProtocolHeader(1, 0, 10)));
+            ch.getConnection().close();
+        }
+        else
+        {
+
+            System.out.println("\n--------------------Broker Start Connection Negotiation -----------------------\n");
+
+            ch.connectionStart(hdr.getMajor(), hdr.getMinor(), null, "PLAIN", "utf8");
+        }
+    }
+
+    @Override public void error(Channel ch, ProtocolError error)
+    {
+        throw new RuntimeException(error.getMessage());
+    }
+
     // ----------------------------------------------
-    //           Client side 
+    //           Client side
     //-----------------------------------------------
-    @Override public void connectionStart(Channel context, ConnectionStart struct) 
+    @Override public void connectionStart(Channel context, ConnectionStart struct)
     {
         System.out.println("\n--------------------Client Start Connection Negotiation -----------------------\n");
         System.out.println("The broker has sent connection-start");
-        
+
         String mechanism = null;
         String response = null;
         try
@@ -94,15 +121,15 @@
         {
           //  need error handling
         }
-                
-        Map<String,?> props = new HashMap<String,String>();        
+
+        Map<String,?> props = new HashMap<String,String>();
         context.connectionStartOk(props, mechanism, response, _locale);
     }
-    
-    @Override public void connectionSecure(Channel context, ConnectionSecure struct) 
+
+    @Override public void connectionSecure(Channel context, ConnectionSecure struct)
     {
         System.out.println("The broker has sent connection-secure with chanllenge " + struct.getChallenge());
-        
+
         try
         {
             String response = new String(saslClient.evaluateChallenge(struct.getChallenge().getBytes()),_locale);
@@ -115,20 +142,20 @@
         catch (SaslException e)
         {
           // need error handling
-        }        
+        }
     }
-    
-    @Override public void connectionTune(Channel context, ConnectionTune struct) 
+
+    @Override public void connectionTune(Channel context, ConnectionTune struct)
     {
         System.out.println("The broker has sent connection-tune " + struct.toString());
-        
+
         // should update the channel max given by the broker.
-        context.connectionTuneOk(struct.getChannelMax(), struct.getFrameMax(), struct.getHeartbeat());    
+        context.connectionTuneOk(struct.getChannelMax(), struct.getFrameMax(), struct.getHeartbeat());
         context.connectionOpen(_virtualHost, null, Option.INSIST);
     }
-   
-    
-    @Override public void connectionOpenOk(Channel context, ConnectionOpenOk struct) 
+
+
+    @Override public void connectionOpenOk(Channel context, ConnectionOpenOk struct)
     {
         String knownHosts = struct.getKnownHosts();
         System.out.println("The broker has opened the connection for use");
@@ -147,23 +174,23 @@
         }
         System.out.println("\n-------------------- Client End Connection Negotiation -----------------------\n");
     }
-    
-    public void connectionRedirect(Channel context, ConnectionRedirect struct) 
+
+    public void connectionRedirect(Channel context, ConnectionRedirect struct)
     {
         // not going to bother at the moment
     }
-    
+
     //  ----------------------------------------------
-    //           Server side 
+    //           Server side
     //-----------------------------------------------
-    @Override public void connectionStartOk(Channel context, ConnectionStartOk struct) 
+    @Override public void connectionStartOk(Channel context, ConnectionStartOk struct)
     {
         //set the client side locale on the server side
         _locale = struct.getLocale();
         _mechanism = struct.getMechanism();
-        
+
         System.out.println("The client has sent connection-start-ok");
-        
+
         //try
         //{
             //saslServer = Sasl.createSaslServer(_mechanism, "AMQP", "ABC",null,SecurityHelper.createCallbackHandler(_mechanism,_username,_password));
@@ -183,11 +210,11 @@
                 }
                 catch(Exception e)
                 {
-                    
+
                 }
             }
-            
-            
+
+
         /*}
         catch (SaslException e)
         {
@@ -198,13 +225,13 @@
           //  need error handling
         }*/
     }
-    
-    @Override public void connectionTuneOk(Channel context, ConnectionTuneOk struct) 
+
+    @Override public void connectionTuneOk(Channel context, ConnectionTuneOk struct)
     {
         System.out.println("The client has excepted the tune params");
     }
-    
-    @Override public void connectionSecureOk(Channel context, ConnectionSecureOk struct) 
+
+    @Override public void connectionSecureOk(Channel context, ConnectionSecureOk struct)
     {
         System.out.println("The client has sent connection-secure-ok");
         try
@@ -225,11 +252,11 @@
                 }
                 catch(Exception e)
                 {
-                    
+
                 }
             }
-            
-            
+
+
         }
         catch (SaslException e)
         {
@@ -240,17 +267,16 @@
           //  need error handling
         }
     }
-    
-    
-    @Override public void connectionOpen(Channel context, ConnectionOpen struct) 
+
+
+    @Override public void connectionOpen(Channel context, ConnectionOpen struct)
     {
        String hosts = "amqp:1223243232325";
        System.out.println("The client has sent connection-open");
        context.connectionOpenOk(hosts);
        System.out.println("\n-------------------- Broker End Connection Negotiation -----------------------\n");
     }
-    
-    
+
     public String getPassword()
     {
         return _password;

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionEvent.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionEvent.java?rev=575474&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionEvent.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionEvent.java Thu Sep 13 14:42:57 2007
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport;
+
+
+/**
+ * ConnectionEvent
+ *
+ */
+
+public class ConnectionEvent
+{
+
+    private final int channel;
+    private final ProtocolEvent event;
+
+    public ConnectionEvent(int channel, ProtocolEvent event)
+    {
+        this.channel = channel;
+        this.event = event;
+    }
+
+    public int getChannel()
+    {
+        return channel;
+    }
+
+    public ProtocolEvent getProtocolEvent()
+    {
+        return event;
+    }
+
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionEvent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Data.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Data.java?rev=575474&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Data.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Data.java Thu Sep 13 14:42:57 2007
@@ -0,0 +1,84 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport;
+
+import org.apache.qpidity.transport.network.Frame;
+
+import java.nio.ByteBuffer;
+
+import java.util.Collections;
+
+
+/**
+ * Data
+ *
+ */
+
+public class Data implements ProtocolEvent
+{
+
+    private final Iterable<ByteBuffer> fragments;
+    private final boolean first;
+    private final boolean last;
+
+    public Data(Iterable<ByteBuffer> fragments, boolean first, boolean last)
+    {
+        this.fragments = fragments;
+        this.first = first;
+        this.last = last;
+    }
+
+    public Data(ByteBuffer buf, boolean first, boolean last)
+    {
+        this(Collections.singletonList(buf), first, last);
+    }
+
+    public Iterable<ByteBuffer> getFragments()
+    {
+        return fragments;
+    }
+
+    public boolean isFirst()
+    {
+        return first;
+    }
+
+    public boolean isLast()
+    {
+        return last;
+    }
+
+    public byte getEncodedTrack()
+    {
+        return Frame.L4;
+    }
+
+    public <C> void delegate(C context, Delegate<C> delegate)
+    {
+        delegate.data(context, this);
+    }
+
+    public <C> void delegate(C context, Switch sw)
+    {
+        sw.data(context, this);
+    }
+
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Data.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Future.java (from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Future.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Future.java?p2=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Future.java&p1=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Future.java&r1=573657&r2=575474&rev=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Future.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Future.java Thu Sep 13 14:42:57 2007
@@ -18,7 +18,7 @@
  * under the License.
  *
  */
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
 
 
 /**

Copied: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Header.java (from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Header.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Header.java?p2=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Header.java&p1=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Header.java&r1=573657&r2=575474&rev=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Header.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Header.java Thu Sep 13 14:42:57 2007
@@ -18,7 +18,11 @@
  * under the License.
  *
  */
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
+
+import org.apache.qpidity.transport.network.Frame;
+
+import java.util.List;
 
 
 /**
@@ -27,4 +31,46 @@
  * @author Rafael H. Schloming
  */
 
-public abstract class Header extends Struct {}
+public class Header implements ProtocolEvent {
+
+    private final List<Struct> structs;
+
+    public Header(List<Struct> structs)
+    {
+        this.structs = structs;
+    }
+
+    public List<Struct> getStructs()
+    {
+        return structs;
+    }
+
+    public <T> T get(Class<T> klass)
+    {
+        for (Struct st : structs)
+        {
+            if (klass.isInstance(st))
+            {
+                return klass.cast(st);
+            }
+        }
+
+        return null;
+    }
+
+    public byte getEncodedTrack()
+    {
+        return Frame.L4;
+    }
+
+    public <C> void delegate(C context, Delegate<C> delegate)
+    {
+        delegate.header(context, this);
+    }
+
+    public <C> void delegate(C context, Switch sw)
+    {
+        sw.header(context, this);
+    }
+
+}

Copied: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Method.java (from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Method.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Method.java?p2=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Method.java&p1=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Method.java&r1=573657&r2=575474&rev=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Method.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Method.java Thu Sep 13 14:42:57 2007
@@ -18,7 +18,7 @@
  * under the License.
  *
  */
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
 
 
 /**
@@ -27,7 +27,7 @@
  * @author Rafael H. Schloming
  */
 
-public abstract class Method extends Struct
+public abstract class Method extends Struct implements ProtocolEvent
 {
 
     public static final Method create(int type)
@@ -54,11 +54,9 @@
 
     public abstract byte getEncodedTrack();
 
-    // XXX: do we need a segment base type?
-    public byte getSegmentType()
+    public <C> void delegate(C context, Switch sw)
     {
-        // XXX
-        return Frame.METHOD;
+        sw.method(context, this);
     }
 
 }

Copied: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolError.java (from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolError.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolError.java?p2=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolError.java&p1=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolError.java&r1=573657&r2=575474&rev=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolError.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolError.java Thu Sep 13 14:42:57 2007
@@ -18,7 +18,10 @@
  * under the License.
  *
  */
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
+
+import org.apache.qpidity.transport.network.NetworkDelegate;
+import org.apache.qpidity.transport.network.NetworkEvent;
 
 
 /**
@@ -27,21 +30,43 @@
  * @author Rafael H. Schloming
  */
 
-class ProtocolError
+public class ProtocolError implements NetworkEvent, ProtocolEvent
 {
 
+    private final byte track;
     private final String format;
     private final Object[] args;
 
-    public ProtocolError(String format, Object ... args)
+    public ProtocolError(byte track, String format, Object ... args)
     {
+        this.track = track;
         this.format = format;
         this.args = args;
     }
 
+    public byte getEncodedTrack()
+    {
+        return track;
+    }
+
     public String getMessage()
     {
         return String.format(format, args);
+    }
+
+    public <C> void delegate(C context, Switch sw)
+    {
+        sw.error(context, this);
+    }
+
+    public void delegate(NetworkDelegate delegate)
+    {
+        delegate.error(this);
+    }
+
+    public <C> void delegate(C context, Delegate<C> delegate)
+    {
+        delegate.error(context, this);
     }
 
 }

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolEvent.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolEvent.java?rev=575474&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolEvent.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolEvent.java Thu Sep 13 14:42:57 2007
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport;
+
+
+/**
+ * ProtocolEvent
+ *
+ */
+
+public interface ProtocolEvent
+{
+
+    public interface Switch<C>
+    {
+        void init(C context, ProtocolHeader header);
+        void method(C context, Method method);
+        void header(C context, Header header);
+        void data(C context, Data data);
+        void error(C context, ProtocolError error);
+    }
+
+    // XXX: could do this switching with cascading defaults for the
+    // specific dispatch methods
+    <C> void delegate(C context, Switch sw);
+
+    <C> void delegate(C context, Delegate<C> delegate);
+
+    byte getEncodedTrack();
+
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolEvent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolHeader.java (from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolHeader.java?p2=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolHeader.java&p1=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java&r1=573657&r2=575474&rev=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolHeader.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ProtocolHeader.java Thu Sep 13 14:42:57 2007
@@ -18,10 +18,14 @@
  * under the License.
  *
  */
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
 
 import java.nio.ByteBuffer;
 
+import org.apache.qpidity.transport.network.NetworkDelegate;
+import org.apache.qpidity.transport.network.NetworkEvent;
+import org.apache.qpidity.transport.network.Frame;
+
 
 /**
  * ProtocolHeader
@@ -31,7 +35,7 @@
 
 //RA making this public until we sort out the package issues
 
-public class ProtocolHeader
+public class ProtocolHeader implements NetworkEvent, ProtocolEvent
 {
 
     private static final byte[] AMQP = {'A', 'M', 'Q', 'P' };
@@ -48,6 +52,11 @@
         this.minor = minor;
     }
 
+    public ProtocolHeader(int instance, int major, int minor)
+    {
+        this((byte) instance, (byte) major, (byte) minor);
+    }
+
     public byte getInstance()
     {
         return instance;
@@ -63,6 +72,11 @@
         return minor;
     }
 
+    public byte getEncodedTrack()
+    {
+        return Frame.L1;
+    }
+
     public ByteBuffer toByteBuffer()
     {
         ByteBuffer buf = ByteBuffer.allocate(8);
@@ -73,6 +87,21 @@
         buf.put(minor);
         buf.flip();
         return buf;
+    }
+
+    public <C> void delegate(C context, Switch sw)
+    {
+        sw.init(context, this);
+    }
+
+    public void delegate(NetworkDelegate delegate)
+    {
+        delegate.init(this);
+    }
+
+    public <C> void delegate(C context, Delegate<C> delegate)
+    {
+        delegate.init(context, this);
     }
 
     public String toString()

Copied: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Range.java (from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Range.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Range.java?p2=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Range.java&p1=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Range.java&r1=573657&r2=575474&rev=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Range.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Range.java Thu Sep 13 14:42:57 2007
@@ -18,7 +18,7 @@
  * under the License.
  *
  */
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
 
 import static java.lang.Math.*;
 

Copied: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java (from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/RangeSet.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java?p2=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java&p1=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/RangeSet.java&r1=573657&r2=575474&rev=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/RangeSet.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java Thu Sep 13 14:42:57 2007
@@ -18,7 +18,7 @@
  * under the License.
  *
  */
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
 
 import java.util.Collection;
 import java.util.Iterator;

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Receiver.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Receiver.java?rev=575474&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Receiver.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Receiver.java Thu Sep 13 14:42:57 2007
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport;
+
+
+/**
+ * Receiver
+ *
+ */
+
+public interface Receiver<T>
+{
+
+    void received(T msg);
+
+    void closed();
+
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Receiver.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Result.java (from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Result.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Result.java?p2=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Result.java&p1=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Result.java&r1=573657&r2=575474&rev=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Result.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Result.java Thu Sep 13 14:42:57 2007
@@ -18,7 +18,7 @@
  * under the License.
  *
  */
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
 
 
 /**

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Sender.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Sender.java?rev=575474&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Sender.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Sender.java Thu Sep 13 14:42:57 2007
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport;
+
+
+/**
+ * Sender
+ *
+ */
+
+public interface Sender<T>
+{
+
+    void send(T msg);
+
+    void close();
+
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Sender.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java (from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Session.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java?p2=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java&p1=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Session.java&r1=573657&r2=575474&rev=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Session.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java Thu Sep 13 14:42:57 2007
@@ -18,10 +18,14 @@
  * under the License.
  *
  */
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
+
+import org.apache.qpidity.transport.network.Frame;
 
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 
@@ -42,12 +46,12 @@
     // completed incoming commands
     private final RangeSet processed = new RangeSet();
     private Range syncPoint = null;
-    
+
     // outgoing command count
     private long commandsOut = 0;
     private Map<Long,Method> commands = new HashMap<Long,Method>();
     private long mark = 0;
-    
+
 
     public Map<Long,Method> getOutstandingCommands()
     {
@@ -104,19 +108,19 @@
     }
 
     void flushProcessed()
-    {           
+    {
         for (Range r: processed)
         {
-            System.out.println("Completed Range [" + r.getLower() + "," + r.getUpper() +"]" );     
+            System.out.println("Completed Range [" + r.getLower() + "," + r.getUpper() +"]" );
         }
-        System.out.println("Notifying peer with execution complete");        
+        System.out.println("Notifying peer with execution complete");
         executionComplete(0, processed);
     }
 
     void syncPoint()
     {
         System.out.println("===========Request received to sync==========================");
-        
+
         Range range = new Range(0, getCommandsIn() - 1);
         boolean flush;
         synchronized (processed)
@@ -154,8 +158,8 @@
             for (long id = lower; id <= upper; id++)
             {
                 commands.remove(id);
-            }  
-             
+            }
+
             if (commands.isEmpty())
             {
                 System.out.println("\n All outstanding commands are completed !!!! \n");
@@ -177,15 +181,25 @@
             synchronized (commands)
             {
                 System.out.println("sent command " + m.getClass().getName() + " command Id" + commandsOut);
-                commands.put(commandsOut++, m);                
+                commands.put(commandsOut++, m);
             }
         }
         channel.method(m);
     }
 
-    public void headers(Struct ... headers)
+    public void header(Header header)
+    {
+        channel.header(header);
+    }
+
+    public void header(List<Struct> structs)
+    {
+        header(new Header(structs));
+    }
+
+    public void header(Struct ... structs)
     {
-        channel.headers(headers);
+        header(Arrays.asList(structs));
     }
 
     public void data(ByteBuffer buf)
@@ -210,7 +224,7 @@
 
     public void sync()
     {
-        System.out.println("calling sync()"); 
+        System.out.println("calling sync()");
         synchronized (commands)
         {
             if (!commands.isEmpty())
@@ -221,7 +235,7 @@
             while (!commands.isEmpty())
             {
                 try {
-                    System.out.println("\n============sync() waiting for commmands to be completed ==============\n"); 
+                    System.out.println("\n============sync() waiting for commmands to be completed ==============\n");
                     commands.wait();
                     System.out.println("\n============sync() got notified=========================================\n");
                 }
@@ -312,6 +326,12 @@
             return result != null;
         }
 
+    }
+
+    public void close()
+    {
+        sessionClose();
+        channel.close();
     }
 
 }

Copied: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java (from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java?p2=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java&p1=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java&r1=573657&r2=575474&rev=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java Thu Sep 13 14:42:57 2007
@@ -18,7 +18,7 @@
  * under the License.
  *
  */
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
 
 
 /**
@@ -30,9 +30,7 @@
 public abstract class SessionDelegate extends Delegate<Session>
 {
 
-    public abstract void headers(Session ssn, Struct ... headers);
-
-    public abstract void data(Session ssn, Frame frame);
+    private static final Struct[] EMPTY_STRUCT_ARRAY = {};
 
     @Override public void executionResult(Session ssn, ExecutionResult result)
     {
@@ -47,7 +45,7 @@
             for (Range range : ranges)
             {
                 System.out.println("completed command range: " + range.getLower() + " to " + range.getUpper());
-                ssn.complete(range.getLower(), range.getUpper());                
+                ssn.complete(range.getLower(), range.getUpper());
             }
         }
         ssn.complete(excmp.getCumulativeExecutionMark());

Copied: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Struct.java (from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Struct.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Struct.java?p2=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Struct.java&p1=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Struct.java&r1=573657&r2=575474&rev=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Struct.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Struct.java Thu Sep 13 14:42:57 2007
@@ -18,7 +18,7 @@
  * under the License.
  *
  */
-package org.apache.qpidity;
+package org.apache.qpidity.transport;
 
 import org.apache.qpidity.codec.Encodable;
 
@@ -29,7 +29,7 @@
  * @author Rafael H. Schloming
  */
 
-public abstract class Struct implements Delegator, Encodable
+public abstract class Struct implements Encodable
 {
 
     public static Struct create(int type)

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java?rev=575474&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java Thu Sep 13 14:42:57 2007
@@ -0,0 +1,184 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport.network;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpidity.codec.FragmentDecoder;
+
+import org.apache.qpidity.transport.ConnectionEvent;
+import org.apache.qpidity.transport.Data;
+import org.apache.qpidity.transport.Header;
+import org.apache.qpidity.transport.Method;
+import org.apache.qpidity.transport.ProtocolError;
+import org.apache.qpidity.transport.ProtocolEvent;
+import org.apache.qpidity.transport.ProtocolHeader;
+import org.apache.qpidity.transport.Receiver;
+import org.apache.qpidity.transport.Struct;
+
+
+/**
+ * Assembler
+ *
+ */
+
+public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
+{
+
+    private final Receiver<ConnectionEvent> receiver;
+    private final byte major;
+    private final byte minor;
+    private final Map<Integer,List<ByteBuffer>> segments;
+
+    public Assembler(Receiver<ConnectionEvent> receiver, byte major, byte minor)
+    {
+        this.receiver = receiver;
+        this.major = major;
+        this.minor = minor;
+        segments = new HashMap<Integer,List<ByteBuffer>>();
+    }
+
+    private int segmentKey(Frame frame)
+    {
+        // XXX: can this overflow?
+        return (frame.getTrack() + 1) * frame.getChannel();
+    }
+
+    private List<ByteBuffer> getSegment(Frame frame)
+    {
+        return segments.get(segmentKey(frame));
+    }
+
+    private void setSegment(Frame frame, List<ByteBuffer> segment)
+    {
+        int key = segmentKey(frame);
+        if (segments.containsKey(key))
+        {
+            error(new ProtocolError(Frame.L2, "segment in progress: %s",
+                                    frame));
+        }
+        segments.put(segmentKey(frame), segment);
+    }
+
+    private void clearSegment(Frame frame)
+    {
+        segments.remove(segmentKey(frame));
+    }
+
+    private void emit(int channel, ProtocolEvent event)
+    {
+        receiver.received(new ConnectionEvent(channel, event));
+    }
+
+    private void emit(Frame frame, ProtocolEvent event)
+    {
+        emit(frame.getChannel(), event);
+    }
+
+    public void received(NetworkEvent event)
+    {
+        event.delegate(this);
+    }
+
+    public void closed()
+    {
+        this.receiver.closed();
+    }
+
+    public void init(ProtocolHeader header)
+    {
+        emit(0, header);
+    }
+
+    public void frame(Frame frame)
+    {
+        switch (frame.getType())
+        {
+        case Frame.BODY:
+            emit(frame, new Data(frame, frame.isFirstFrame(),
+                                 frame.isLastFrame()));
+            break;
+        default:
+            assemble(frame);
+            break;
+        }
+    }
+
+    public void error(ProtocolError error)
+    {
+        emit(0, error);
+    }
+
+    private void assemble(Frame frame)
+    {
+        List<ByteBuffer> segment;
+        if (frame.isFirstFrame())
+        {
+            segment = new ArrayList<ByteBuffer>();
+            setSegment(frame, segment);
+        }
+        else
+        {
+            segment = getSegment(frame);
+        }
+
+        for (ByteBuffer buf : frame)
+        {
+            segment.add(buf);
+        }
+
+        if (frame.isLastFrame())
+        {
+            clearSegment(frame);
+            emit(frame, decode(frame.getType(), segment));
+        }
+    }
+
+    private ProtocolEvent decode(byte type, List<ByteBuffer> segment)
+    {
+        FragmentDecoder dec =
+            new FragmentDecoder(major, minor, segment.iterator());
+
+        switch (type)
+        {
+        case Frame.METHOD:
+            int methodType = (int) dec.readLong();
+            Method method = Method.create(methodType);
+            method.read(dec, major, minor);
+            return method;
+        case Frame.HEADER:
+            List<Struct> structs = new ArrayList();
+            while (dec.hasRemaining())
+            {
+                structs.add(dec.readLongStruct());
+            }
+            return new Header(structs);
+        default:
+            throw new IllegalStateException("unknown frame type: " + type);
+        }
+    }
+
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java?rev=575474&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java Thu Sep 13 14:42:57 2007
@@ -0,0 +1,168 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport.network;
+
+import org.apache.qpidity.codec.BBEncoder;
+import org.apache.qpidity.codec.SizeEncoder;
+
+import org.apache.qpidity.transport.ConnectionEvent;
+import org.apache.qpidity.transport.Data;
+import org.apache.qpidity.transport.Header;
+import org.apache.qpidity.transport.Method;
+import org.apache.qpidity.transport.ProtocolHeader;
+import org.apache.qpidity.transport.ProtocolError;
+import org.apache.qpidity.transport.ProtocolEvent;
+import org.apache.qpidity.transport.Sender;
+import org.apache.qpidity.transport.Struct;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.qpidity.transport.network.Frame.*;
+
+import static java.lang.Math.*;
+
+
+/**
+ * Disassembler
+ *
+ */
+
+public class Disassembler
+    implements Sender<ConnectionEvent>, ProtocolEvent.Switch<ConnectionEvent>
+{
+
+    private final Sender<NetworkEvent> sender;
+    private final int maxFrame;
+    private final byte major;
+    private final byte minor;
+
+    public Disassembler(Sender<NetworkEvent> sender, byte major, byte minor,
+                        int maxFrame)
+    {
+        this.sender = sender;
+        this.major = major;
+        this.minor = minor;
+        this.maxFrame = maxFrame;
+    }
+
+    public void send(ConnectionEvent event)
+    {
+        event.getProtocolEvent().delegate(event, this);
+    }
+
+    public void close()
+    {
+        sender.close();
+    }
+
+    private void fragment(byte flags, byte type, ConnectionEvent event,
+                          ByteBuffer buf, boolean first, boolean last)
+    {
+        while (buf.hasRemaining())
+        {
+            ByteBuffer slice = buf.slice();
+            slice.limit(min(maxFrame, slice.remaining()));
+            buf.position(buf.position() + slice.remaining());
+
+            byte newflags = flags;
+            if (first)
+            {
+                newflags |= FIRST_FRAME;
+                first = false;
+            }
+            if (last && !buf.hasRemaining())
+            {
+                newflags |= LAST_FRAME;
+            }
+
+            Frame frame = new Frame(newflags, type,
+                                    event.getProtocolEvent().getEncodedTrack(),
+                                    event.getChannel());
+            frame.addFragment(slice);
+            sender.send(frame);
+        }
+    }
+
+    public void init(ConnectionEvent event, ProtocolHeader header)
+    {
+        sender.send(header);
+    }
+
+    public void method(ConnectionEvent event, Method method)
+    {
+        SizeEncoder sizer = new SizeEncoder(major, minor);
+        sizer.writeLong(method.getEncodedType());
+        method.write(sizer, major, minor);
+        sizer.flush();
+        int size = sizer.getSize();
+
+        ByteBuffer buf = ByteBuffer.allocate(size);
+        BBEncoder enc = new BBEncoder(major, minor, buf);
+        enc.writeLong(method.getEncodedType());
+        method.write(enc, major, minor);
+        enc.flush();
+        buf.flip();
+
+        byte flags = FIRST_SEG;
+
+        if (!method.hasPayload())
+        {
+            flags |= LAST_SEG;
+        }
+
+        fragment(flags, METHOD, event, buf, true, true);
+    }
+
+    public void header(ConnectionEvent event, Header header)
+    {
+        SizeEncoder sizer = new SizeEncoder(major, minor);
+        for (Struct st : header.getStructs())
+        {
+            sizer.writeLongStruct(st);
+        }
+
+        ByteBuffer buf = ByteBuffer.allocate(sizer.getSize());
+        BBEncoder enc = new BBEncoder(major, minor, buf);
+        for (Struct st : header.getStructs())
+        {
+            enc.writeLongStruct(st);
+            enc.flush();
+        }
+        buf.flip();
+
+        fragment((byte) 0x0, HEADER, event, buf, true, true);
+    }
+
+    public void data(ConnectionEvent event, Data data)
+    {
+        for (ByteBuffer buf : data.getFragments())
+        {
+            fragment(LAST_SEG, BODY, event, buf, data.isFirst(),
+                     data.isLast());
+        }
+    }
+
+    public void error(ConnectionEvent event, ProtocolError error)
+    {
+        sender.send(error);
+    }
+
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java (from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Frame.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java?p2=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java&p1=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Frame.java&r1=573657&r2=575474&rev=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Frame.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java Thu Sep 13 14:42:57 2007
@@ -18,7 +18,9 @@
  * under the License.
  *
  */
-package org.apache.qpidity;
+package org.apache.qpidity.transport.network;
+
+import org.apache.qpidity.transport.util.SliceIterator;
 
 import java.nio.ByteBuffer;
 
@@ -26,7 +28,7 @@
 import java.util.List;
 import java.util.Iterator;
 
-import static org.apache.qpidity.Functions.*;
+import static org.apache.qpidity.transport.util.Functions.*;
 
 
 /**
@@ -36,7 +38,7 @@
  */
 
 // RA: changed it to public until we sort the package issues
-public class Frame implements Iterable<ByteBuffer>
+public class Frame implements NetworkEvent, Iterable<ByteBuffer>
 {
     public static final int HEADER_SIZE = 12;
 
@@ -82,6 +84,11 @@
         size += fragment.remaining();
     }
 
+    public byte getFlags()
+    {
+        return flags;
+    }
+
     public int getChannel()
     {
         return channel;
@@ -135,6 +142,11 @@
     public Iterator<ByteBuffer> iterator()
     {
         return getFragments();
+    }
+
+    public void delegate(NetworkDelegate delegate)
+    {
+        delegate.frame(this);
     }
 
     public String toString()

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java?rev=575474&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java Thu Sep 13 14:42:57 2007
@@ -0,0 +1,239 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport.network;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpidity.transport.ProtocolError;
+import org.apache.qpidity.transport.ProtocolHeader;
+import org.apache.qpidity.transport.Receiver;
+
+import static org.apache.qpidity.transport.network.InputHandler.State.*;
+
+
+/**
+ * InputHandler
+ *
+ * @author Rafael H. Schloming
+ */
+
+public class InputHandler implements Receiver<ByteBuffer>
+{
+
+    public enum State
+    {
+        PROTO_HDR,
+        PROTO_HDR_M,
+        PROTO_HDR_Q,
+        PROTO_HDR_P,
+        PROTO_HDR_CLASS,
+        PROTO_HDR_INSTANCE,
+        PROTO_HDR_MAJOR,
+        PROTO_HDR_MINOR,
+        FRAME_HDR,
+        FRAME_HDR_TYPE,
+        FRAME_HDR_SIZE1,
+        FRAME_HDR_SIZE2,
+        FRAME_HDR_RSVD1,
+        FRAME_HDR_TRACK,
+        FRAME_HDR_CH1,
+        FRAME_HDR_CH2,
+        FRAME_HDR_RSVD2,
+        FRAME_HDR_RSVD3,
+        FRAME_HDR_RSVD4,
+        FRAME_HDR_RSVD5,
+        FRAME_PAYLOAD,
+        FRAME_FRAGMENT,
+        ERROR;
+    }
+
+    private final Receiver<NetworkEvent> receiver;
+    private State state;
+
+    private byte instance;
+    private byte major;
+    private byte minor;
+
+    private byte flags;
+    private byte type;
+    private byte track;
+    private int channel;
+    private int size;
+    private Frame frame;
+
+    public InputHandler(Receiver<NetworkEvent> receiver, State state)
+    {
+        this.receiver = receiver;
+        this.state = state;
+    }
+
+    public InputHandler(Receiver<NetworkEvent> receiver)
+    {
+        this(receiver, PROTO_HDR);
+    }
+
+    private void init()
+    {
+        receiver.received(new ProtocolHeader(instance, major, minor));
+    }
+
+    private void frame()
+    {
+        assert size == frame.getSize();
+        receiver.received(frame);
+        frame = null;
+    }
+
+    private void error(String fmt, Object ... args)
+    {
+        receiver.received(new ProtocolError(Frame.L1, fmt, args));
+    }
+
+    public void received(ByteBuffer buf)
+    {
+        while (buf.hasRemaining())
+        {
+            state = next(buf);
+        }
+    }
+
+    private State next(ByteBuffer buf)
+    {
+        switch (state) {
+        case PROTO_HDR:
+            return expect(buf, 'A', PROTO_HDR_M);
+        case PROTO_HDR_M:
+            return expect(buf, 'M', PROTO_HDR_Q);
+        case PROTO_HDR_Q:
+            return expect(buf, 'Q', PROTO_HDR_P);
+        case PROTO_HDR_P:
+            return expect(buf, 'P', PROTO_HDR_CLASS);
+        case PROTO_HDR_CLASS:
+            return expect(buf, 1, PROTO_HDR_INSTANCE);
+        case PROTO_HDR_INSTANCE:
+            instance = buf.get();
+            return PROTO_HDR_MAJOR;
+        case PROTO_HDR_MAJOR:
+            major = buf.get();
+            return PROTO_HDR_MINOR;
+        case PROTO_HDR_MINOR:
+            minor = buf.get();
+            init();
+            return FRAME_HDR;
+        case FRAME_HDR:
+            flags = buf.get();
+            return FRAME_HDR_TYPE;
+        case FRAME_HDR_TYPE:
+            type = buf.get();
+            return FRAME_HDR_SIZE1;
+        case FRAME_HDR_SIZE1:
+            size = buf.get() << 8;
+            return FRAME_HDR_SIZE2;
+        case FRAME_HDR_SIZE2:
+            size += buf.get();
+            size -= 12;
+            return FRAME_HDR_RSVD1;
+        case FRAME_HDR_RSVD1:
+            return expect(buf, 0, FRAME_HDR_TRACK);
+        case FRAME_HDR_TRACK:
+            byte b = buf.get();
+            if ((b & 0xF0) != 0) {
+                error("non-zero reserved bits in upper nibble of " +
+                      "frame header byte 5: '%x'", b);
+                return ERROR;
+            } else {
+                track = (byte) (b & 0xF);
+                return FRAME_HDR_CH1;
+            }
+        case FRAME_HDR_CH1:
+            channel = buf.get() << 8;
+            return FRAME_HDR_CH2;
+        case FRAME_HDR_CH2:
+            channel += buf.get();
+            return FRAME_HDR_RSVD2;
+        case FRAME_HDR_RSVD2:
+            return expect(buf, 0, FRAME_HDR_RSVD3);
+        case FRAME_HDR_RSVD3:
+            return expect(buf, 0, FRAME_HDR_RSVD4);
+        case FRAME_HDR_RSVD4:
+            return expect(buf, 0, FRAME_HDR_RSVD5);
+        case FRAME_HDR_RSVD5:
+            return expect(buf, 0, FRAME_PAYLOAD);
+        case FRAME_PAYLOAD:
+            frame = new Frame(flags, type, track, channel);
+            if (size > buf.remaining()) {
+                frame.addFragment(buf.slice());
+                buf.position(buf.limit());
+                return FRAME_FRAGMENT;
+            } else {
+                ByteBuffer payload = buf.slice();
+                payload.limit(size);
+                buf.position(buf.position() + size);
+                frame.addFragment(payload);
+                frame();
+                return FRAME_HDR;
+            }
+        case FRAME_FRAGMENT:
+            int delta = size - frame.getSize();
+            if (delta > buf.remaining()) {
+                frame.addFragment(buf.slice());
+                buf.position(buf.limit());
+                return FRAME_FRAGMENT;
+            } else {
+                ByteBuffer fragment = buf.slice();
+                fragment.limit(delta);
+                buf.position(buf.position() + delta);
+                frame.addFragment(fragment);
+                frame();
+                return FRAME_HDR;
+            }
+        default:
+            throw new IllegalStateException();
+        }
+    }
+
+    private State expect(ByteBuffer buf, int expected, State next)
+    {
+        return expect(buf, (byte) expected, next);
+    }
+
+    private State expect(ByteBuffer buf, char expected, State next)
+    {
+        return expect(buf, (byte) expected, next);
+    }
+
+    private State expect(ByteBuffer buf, byte expected, State next)
+    {
+        byte b = buf.get();
+        if (b == expected) {
+            return next;
+        } else {
+            error("expecting '%x', got '%x'", expected, b);
+            return ERROR;
+        }
+    }
+
+    public void closed()
+    {
+        receiver.closed();
+    }
+
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkDelegate.java (from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolActions.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkDelegate.java?p2=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkDelegate.java&p1=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolActions.java&r1=573657&r2=575474&rev=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ProtocolActions.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkDelegate.java Thu Sep 13 14:42:57 2007
@@ -18,16 +18,19 @@
  * under the License.
  *
  */
-package org.apache.qpidity;
+package org.apache.qpidity.transport.network;
+
+import org.apache.qpidity.transport.ProtocolError;
+import org.apache.qpidity.transport.ProtocolHeader;
 
 
 /**
- * ProtocolActions
+ * NetworkDelegate
  *
  * @author Rafael H. Schloming
  */
 
-interface ProtocolActions
+public interface NetworkDelegate
 {
 
     void init(ProtocolHeader header);

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkEvent.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkEvent.java?rev=575474&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkEvent.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkEvent.java Thu Sep 13 14:42:57 2007
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport.network;
+
+
+/**
+ * NetworkEvent
+ *
+ */
+
+public interface NetworkEvent
+{
+
+    void delegate(NetworkDelegate delegate);
+
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/NetworkEvent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java?rev=575474&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java Thu Sep 13 14:42:57 2007
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport.network;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpidity.transport.ProtocolError;
+import org.apache.qpidity.transport.ProtocolHeader;
+import org.apache.qpidity.transport.Sender;
+
+import static org.apache.qpidity.transport.network.Frame.*;
+
+
+/**
+ * OutputHandler
+ *
+ */
+
+public class OutputHandler implements Sender<NetworkEvent>, NetworkDelegate
+{
+
+    private Sender<ByteBuffer> sender;
+    private Object lock = new Object();
+
+    public OutputHandler(Sender<ByteBuffer> sender)
+    {
+        this.sender = sender;
+    }
+
+    public void send(NetworkEvent event)
+    {
+        event.delegate(this);
+    }
+
+    public void close()
+    {
+        synchronized (lock)
+        {
+            sender.close();
+        }
+    }
+
+    public void init(ProtocolHeader header)
+    {
+        synchronized (lock)
+        {
+            sender.send(header.toByteBuffer());
+        }
+    }
+
+    public void frame(Frame frame)
+    {
+        ByteBuffer hdr = ByteBuffer.allocate(HEADER_SIZE);
+        hdr.put(frame.getFlags());
+        hdr.put(frame.getType());
+        hdr.putShort((short) (frame.getSize() + HEADER_SIZE));
+        hdr.put(RESERVED);
+        hdr.put(frame.getTrack());
+        hdr.putShort((short) frame.getChannel());
+        hdr.put(RESERVED);
+        hdr.put(RESERVED);
+        hdr.put(RESERVED);
+        hdr.put(RESERVED);
+        hdr.flip();
+
+        synchronized (lock)
+        {
+            sender.send(hdr);
+            for (ByteBuffer buf : frame)
+            {
+                sender.send(buf);
+            }
+        }
+    }
+
+    public void error(ProtocolError error)
+    {
+        throw new IllegalStateException("XXX");
+    }
+
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java (from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/MinaHandler.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java?p2=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java&p1=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/MinaHandler.java&r1=573657&r2=575474&rev=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/MinaHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaHandler.java Thu Sep 13 14:42:57 2007
@@ -18,7 +18,7 @@
  * under the License.
  *
  */
-package org.apache.qpidity;
+package org.apache.qpidity.transport.network.mina;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -28,7 +28,6 @@
 import org.apache.mina.common.ConnectFuture;
 import org.apache.mina.common.IdleStatus;
 import org.apache.mina.common.IoAcceptor;
-import org.apache.mina.common.IoConnector;
 import org.apache.mina.common.IoHandler;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.SimpleByteBufferAllocator;
@@ -36,6 +35,16 @@
 import org.apache.mina.transport.socket.nio.SocketAcceptor;
 import org.apache.mina.transport.socket.nio.SocketConnector;
 
+import org.apache.qpidity.transport.Connection;
+import org.apache.qpidity.transport.ConnectionDelegate;
+import org.apache.qpidity.transport.Receiver;
+import org.apache.qpidity.transport.Sender;
+
+import org.apache.qpidity.transport.network.Assembler;
+import org.apache.qpidity.transport.network.Disassembler;
+import org.apache.qpidity.transport.network.InputHandler;
+import org.apache.qpidity.transport.network.OutputHandler;
+
 
 /**
  * MinaHandler
@@ -57,9 +66,9 @@
 
     public void messageReceived(IoSession ssn, Object obj)
     {
-        Connection conn = (Connection) ssn.getAttachment();
+        Attachment attachment = (Attachment) ssn.getAttachment();
         ByteBuffer buf = (ByteBuffer) obj;
-        conn.getInputHandler().handle(buf.buf());
+        attachment.receiver.received(buf.buf());
     }
 
     public void messageSent(IoSession ssn, Object obj)
@@ -80,16 +89,15 @@
     public void sessionOpened(final IoSession ssn)
     {
         System.out.println("opened " + ssn);
-        Connection conn = new Connection(new Handler<java.nio.ByteBuffer>()
-                                         {
-                                             public void handle(java.nio.ByteBuffer buf)
-                                             {
-                                                 ssn.write(ByteBuffer.wrap(buf));
-                                             }
-                                         },
-                                         delegate,
-                                         state);
-        ssn.setAttachment(conn);
+        // XXX: hardcoded version + max-frame
+        Connection conn = new Connection
+            (new Disassembler(new OutputHandler(new MinaSender(ssn)),
+                              (byte)0, (byte)10, 64*1024),
+             delegate);
+        // XXX: hardcoded version
+        Receiver<java.nio.ByteBuffer> receiver =
+            new InputHandler(new Assembler(conn, (byte)0, (byte)10), state);
+        ssn.setAttachment(new Attachment(conn, receiver));
         // XXX
         synchronized (ssn)
         {
@@ -100,21 +108,27 @@
     public void sessionClosed(IoSession ssn)
     {
         System.out.println("closed " + ssn);
+        Attachment attachment = (Attachment) ssn.getAttachment();
+        attachment.receiver.closed();
         ssn.setAttachment(null);
     }
 
     public void sessionIdle(IoSession ssn, IdleStatus status)
     {
-        System.out.println(status);
+        // do nothing
     }
 
-    public static final void main(String[] args) throws IOException
+    private class Attachment
     {
-        ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
-        if (args[0].equals("accept")) {
-            accept("0.0.0.0", 5672, SessionDelegateStub.source());
-        } else if (args[0].equals("connect")) {
-            connect("0.0.0.0", 5672, SessionDelegateStub.source());
+
+        Connection connection;
+        Receiver<java.nio.ByteBuffer> receiver;
+
+        Attachment(Connection connection,
+                   Receiver<java.nio.ByteBuffer> receiver)
+        {
+            this.connection = connection;
+            this.receiver = receiver;
         }
     }
 
@@ -124,8 +138,7 @@
     {
         IoAcceptor acceptor = new SocketAcceptor();
         acceptor.bind(new InetSocketAddress(host, port),
-                      new MinaHandler(delegate, InputHandler.State.PROTO_HDR));       
-        
+                      new MinaHandler(delegate, InputHandler.State.PROTO_HDR));
     }
 
     public static final Connection connect(String host, int port,
@@ -134,7 +147,8 @@
         MinaHandler handler = new MinaHandler(delegate,
                                               InputHandler.State.FRAME_HDR);
         SocketAddress addr = new InetSocketAddress(host, port);
-        IoConnector connector = new SocketConnector();
+        SocketConnector connector = new SocketConnector();
+        connector.setWorkerTimeout(0);
         ConnectFuture cf = connector.connect(addr, handler);
         cf.join();
         IoSession ssn = cf.getSession();
@@ -153,8 +167,8 @@
                 }
             }
         }
-        Connection conn = (Connection) ssn.getAttachment();
-        return conn;
+        Attachment attachment = (Attachment) ssn.getAttachment();
+        return attachment.connection;
     }
 
 }

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java?rev=575474&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java Thu Sep 13 14:42:57 2007
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport.network.mina;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
+
+import org.apache.qpidity.transport.Sender;
+
+
+/**
+ * MinaSender
+ *
+ */
+
+public class MinaSender implements Sender<java.nio.ByteBuffer>
+{
+
+    private final IoSession session;
+
+    public MinaSender(IoSession session)
+    {
+        this.session = session;
+    }
+
+    public void send(java.nio.ByteBuffer buf)
+    {
+        session.write(ByteBuffer.wrap(buf));
+    }
+
+    public void close()
+    {
+        session.close();
+    }
+
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java (from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Functions.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java?p2=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java&p1=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Functions.java&r1=573657&r2=575474&rev=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Functions.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java Thu Sep 13 14:42:57 2007
@@ -18,7 +18,7 @@
  * under the License.
  *
  */
-package org.apache.qpidity;
+package org.apache.qpidity.transport.util;
 
 import java.nio.ByteBuffer;
 

Copied: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/util/SliceIterator.java (from r573657, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SliceIterator.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/util/SliceIterator.java?p2=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/util/SliceIterator.java&p1=incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SliceIterator.java&r1=573657&r2=575474&rev=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SliceIterator.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/util/SliceIterator.java Thu Sep 13 14:42:57 2007
@@ -18,7 +18,7 @@
  * under the License.
  *
  */
-package org.apache.qpidity;
+package org.apache.qpidity.transport.util;
 
 import java.nio.ByteBuffer;
 
@@ -31,7 +31,7 @@
  * @author Rafael H. Schloming
  */
 
-class SliceIterator implements Iterator<ByteBuffer>
+public class SliceIterator implements Iterator<ByteBuffer>
 {
 
     final private Iterator<ByteBuffer> iterator;

Modified: incubator/qpid/trunk/qpid/java/plugins/src/main/java/org/apache/qpid/plugins/JythonMojo.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/plugins/src/main/java/org/apache/qpid/plugins/JythonMojo.java?rev=575474&r1=575473&r2=575474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/plugins/src/main/java/org/apache/qpid/plugins/JythonMojo.java (original)
+++ incubator/qpid/trunk/qpid/java/plugins/src/main/java/org/apache/qpid/plugins/JythonMojo.java Thu Sep 13 14:42:57 2007
@@ -21,6 +21,7 @@
 package org.apache.qpid.plugins;
 
 import java.io.File;
+import java.io.IOException;
 
 import org.apache.maven.plugin.AbstractMojo;
 import org.apache.maven.plugin.MojoExecutionException;
@@ -46,9 +47,44 @@
      */
     private String[] params = new String[0];
 
+    /**
+     * Source file.
+     *
+     * @parameter
+     */
+    private File source;
+
+    /**
+     * Optional timestamp.
+     *
+     * @parameter
+     */
+    private File timestamp;
+
     public void execute() throws MojoExecutionException
     {
+        if (source != null && timestamp != null)
+        {
+            if (timestamp.lastModified() > source.lastModified())
+            {
+                return;
+            }
+        }
+
         jython.main(params);
+
+        if (timestamp != null)
+        {
+            try
+            {
+                timestamp.createNewFile();
+            }
+            catch (IOException e)
+            {
+                throw new MojoExecutionException("cannot create timestamp", e);
+            }
+            timestamp.setLastModified(System.currentTimeMillis());
+        }
     }
 
 }