You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2006/09/20 00:07:25 UTC

svn commit: r447994 [36/46] - in /incubator/qpid/trunk/qpid: ./ cpp/ cpp/bin/ cpp/broker/ cpp/broker/inc/ cpp/broker/src/ cpp/broker/test/ cpp/client/ cpp/client/inc/ cpp/client/src/ cpp/client/test/ cpp/common/ cpp/common/concurrent/ cpp/common/concur...

Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/bio/SocketConnector.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/bio/SocketConnector.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/bio/SocketConnector.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/bio/SocketConnector.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,150 @@
+/*
+ *   @(#) $Id: SocketConnector.java 389042 2006-03-27 07:49:41Z trustin $
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.qpid.bio;
+
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.common.support.BaseIoConnector;
+import org.apache.mina.common.support.DefaultConnectFuture;
+import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
+import org.apache.mina.transport.socket.nio.SocketSessionConfig;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.nio.channels.ByteChannel;
+import java.nio.channels.SocketChannel;
+
+/**
+ */
+public class SocketConnector extends BaseIoConnector
+{
+    /**
+     * @noinspection StaticNonFinalField
+     */
+    private static final Sequence idSequence = new Sequence();
+
+    private final Object lock = new Object();
+    private final String threadName = "SocketConnector-" + idSequence.nextId();
+    private final IoServiceConfig defaultConfig = new SocketConnectorConfig();
+    private final Set managedSessions = Collections.synchronizedSet(new HashSet());
+
+    /**
+     * Create a connector with a single processing thread
+     */
+    public SocketConnector()
+    {
+    }
+
+    public IoServiceConfig getDefaultConfig()
+    {
+        return defaultConfig;
+    }
+
+    public ConnectFuture connect(SocketAddress address, IoHandler handler, IoServiceConfig config)
+    {
+        return connect(address, null, handler, config);
+    }
+
+    public ConnectFuture connect(SocketAddress address, SocketAddress localAddress,
+                                 IoHandler handler, IoServiceConfig config)
+    {
+        if (address == null)
+        {
+            throw new NullPointerException("address");
+        }
+        if (handler == null)
+        {
+            throw new NullPointerException("handler");
+        }
+
+        if (! (address instanceof InetSocketAddress))
+        {
+            throw new IllegalArgumentException("Unexpected address type: " + address.getClass());
+        }
+        if (localAddress != null && !(localAddress instanceof InetSocketAddress))
+        {
+            throw new IllegalArgumentException("Unexpected local address type: " + localAddress.getClass());
+        }
+        if (config == null)
+        {
+            config = getDefaultConfig();
+        }
+
+        DefaultConnectFuture future = new DefaultConnectFuture();
+        try
+        {
+
+            //Socket socket = new Socket();
+            //socket.connect(address);
+            //SimpleSocketChannel channel = new SimpleSocketChannel(socket);
+            //SocketAddress serviceAddress = socket.getRemoteSocketAddress();
+
+            SocketChannel channel = SocketChannel.open(address);
+            channel.configureBlocking(true);
+            SocketAddress serviceAddress = channel.socket().getRemoteSocketAddress();
+
+
+            SocketSessionImpl session = newSession(channel, handler, config, channel.socket().getRemoteSocketAddress());
+            future.setSession(session);
+        }
+        catch (IOException e)
+        {
+            future.setException(e);
+        }
+
+        return future;
+    }
+
+    private SocketSessionImpl newSession(ByteChannel channel, IoHandler handler, IoServiceConfig config, SocketAddress serviceAddress)
+            throws IOException
+    {
+        SocketSessionImpl session = new SocketSessionImpl(this,
+                                                          (SocketSessionConfig) config.getSessionConfig(),
+                                                          handler,
+                                                          channel,
+                                                          serviceAddress);
+        try
+        {
+            getFilterChainBuilder().buildFilterChain(session.getFilterChain());
+            config.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
+            config.getThreadModel().buildFilterChain(session.getFilterChain());
+            ((SocketFilterChain) session.getFilterChain()).sessionCreated(session);
+
+            session.start();
+            //not sure if this will work... socket is already opened before the created callback is called...
+            ((SocketFilterChain) session.getFilterChain()).sessionOpened(session);
+        }
+        catch (Throwable e)
+        {
+            throw (IOException) new IOException("Failed to create a session.").initCause(e);
+        }
+
+        //TODO: figure out how the managed session are used/ what they are etc.
+        //session.getManagedSessions().add( session );
+
+
+        return session;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/bio/SocketConnector.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/bio/SocketFilterChain.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/bio/SocketFilterChain.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/bio/SocketFilterChain.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/bio/SocketFilterChain.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,62 @@
+/*
+ *   @(#) $Id: SocketFilterChain.java 398039 2006-04-28 23:36:27Z proyal $
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.qpid.bio;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoFilter.WriteRequest;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.support.AbstractIoFilterChain;
+
+import java.io.IOException;
+import java.nio.channels.ClosedByInterruptException;
+
+/**
+ */
+class SocketFilterChain extends AbstractIoFilterChain
+{
+
+    SocketFilterChain(IoSession parent)
+    {
+        super(parent);
+    }
+
+    protected void doWrite(IoSession session, WriteRequest writeRequest) throws Exception
+    {
+        SocketSessionImpl s = (SocketSessionImpl) session;
+
+        //write to socket
+        try
+        {
+            s.getChannel().write(((ByteBuffer) writeRequest.getMessage()).buf());
+
+            //notify of completion
+            writeRequest.getFuture().setWritten(true);
+        }
+        catch(ClosedByInterruptException e)
+        {
+            writeRequest.getFuture().setWritten(false);
+        }
+    }
+
+    protected void doClose(IoSession session) throws IOException
+    {
+        SocketSessionImpl s = (SocketSessionImpl) session;
+        s.shutdown();
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/bio/SocketFilterChain.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/bio/SocketSessionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/bio/SocketSessionImpl.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/bio/SocketSessionImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/bio/SocketSessionImpl.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,421 @@
+/*
+ *   @(#) $Id: SocketSessionImpl.java 398039 2006-04-28 23:36:27Z proyal $
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.qpid.bio;
+
+import org.apache.mina.common.IoFilter.WriteRequest;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoService;
+import org.apache.mina.common.IoSessionConfig;
+import org.apache.mina.common.RuntimeIOException;
+import org.apache.mina.common.TransportType;
+import org.apache.mina.common.support.BaseIoSession;
+import org.apache.mina.common.support.BaseIoSessionConfig;
+import org.apache.mina.transport.socket.nio.SocketSessionConfig;
+import org.apache.mina.transport.socket.nio.SocketSessionConfigImpl;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.nio.channels.ByteChannel;
+import java.nio.channels.SocketChannel;
+
+/**
+ */
+class SocketSessionImpl extends BaseIoSession
+{
+    private final IoService manager;
+    private final SocketSessionConfig config;
+    private final SocketFilterChain filterChain;
+    private final IoHandler handler;
+    private final SocketAddress remoteAddress;
+    private final SocketAddress localAddress;
+    private final SocketAddress serviceAddress;
+    private final Socket socket;
+    private final ByteChannel channel;
+    private final Reader reader;
+    private Thread runner;
+    private int readBufferSize;
+
+    /**
+     * Creates a new instance.
+     */
+    SocketSessionImpl(IoService manager,
+                      SocketSessionConfig config,
+                      IoHandler handler,
+                      ByteChannel channel,
+                      SocketAddress serviceAddress) throws IOException
+    {
+        this.manager = manager;
+        this.filterChain = new SocketFilterChain(this);
+        this.handler = handler;
+        this.channel = channel;
+        if(channel instanceof SocketChannel)
+        {
+            socket = ((SocketChannel) channel).socket();
+        }
+        else if(channel instanceof SimpleSocketChannel)
+        {
+            socket = ((SimpleSocketChannel) channel).socket();
+        }
+        else
+        {
+            throw new IllegalArgumentException("Unrecognised channel type: " + channel.getClass());
+        }
+
+        this.remoteAddress = socket.getRemoteSocketAddress();
+        this.localAddress = socket.getLocalSocketAddress();
+        this.serviceAddress = serviceAddress;
+
+        this.config = new SessionConfigImpl(config);
+
+        reader = new Reader(handler, this);
+    }
+
+    void start()
+    {
+        //create & start thread for this...
+        runner = new Thread(reader);
+        runner.start();
+    }
+
+    void shutdown() throws IOException
+    {
+        filterChain.sessionClosed( this );
+        reader.stop();
+        channel.close();
+    }
+
+    ByteChannel getChannel()
+    {
+        return channel;
+    }
+
+    protected void write0(WriteRequest writeRequest)
+    {
+        filterChain.filterWrite(this, writeRequest);
+    }
+
+    protected void close0()
+    {
+        filterChain.filterClose(this);
+        super.close0();
+    }
+
+    protected void updateTrafficMask()
+    {
+        //TODO
+    }
+
+    public IoService getService()
+    {
+        return manager;
+    }
+
+    public IoSessionConfig getConfig()
+    {
+        return config;
+    }
+
+    public IoFilterChain getFilterChain()
+    {
+        return filterChain;
+    }
+
+    public IoHandler getHandler()
+    {
+        return handler;
+    }
+
+    public int getScheduledWriteRequests()
+    {
+        return 0;
+    }
+
+    public int getScheduledWriteBytes()
+    {
+        return 0;
+    }
+
+    public TransportType getTransportType()
+    {
+        return TransportType.SOCKET;
+    }
+
+    public SocketAddress getRemoteAddress()
+    {
+        return remoteAddress;
+    }
+
+    public SocketAddress getLocalAddress()
+    {
+        return localAddress;
+    }
+
+    public SocketAddress getServiceAddress()
+    {
+        return serviceAddress;
+    }
+
+    int getReadBufferSize()
+    {
+        return readBufferSize;
+    }
+
+    private class SessionConfigImpl extends BaseIoSessionConfig implements SocketSessionConfig
+    {
+        SessionConfigImpl()
+        {
+        }
+
+        SessionConfigImpl(SocketSessionConfig cfg)
+        {
+            setKeepAlive(cfg.isKeepAlive());
+            setOobInline(cfg.isOobInline());
+            setReceiveBufferSize(cfg.getReceiveBufferSize());
+            readBufferSize = cfg.getReceiveBufferSize();
+            setReuseAddress(cfg.isReuseAddress());
+            setSendBufferSize(cfg.getSendBufferSize());
+            setSoLinger(cfg.getSoLinger());
+            setTcpNoDelay(cfg.isTcpNoDelay());
+            if (getTrafficClass() != cfg.getTrafficClass())
+            {
+                setTrafficClass(cfg.getTrafficClass());
+            }
+        }
+
+
+        public boolean isKeepAlive()
+        {
+            try
+            {
+                return socket.getKeepAlive();
+            }
+            catch (SocketException e)
+            {
+                throw new RuntimeIOException(e);
+            }
+        }
+
+        public void setKeepAlive(boolean on)
+        {
+            try
+            {
+                socket.setKeepAlive(on);
+            }
+            catch (SocketException e)
+            {
+                throw new RuntimeIOException(e);
+            }
+        }
+
+        public boolean isOobInline()
+        {
+            try
+            {
+                return socket.getOOBInline();
+            }
+            catch (SocketException e)
+            {
+                throw new RuntimeIOException(e);
+            }
+        }
+
+        public void setOobInline(boolean on)
+        {
+            try
+            {
+                socket.setOOBInline(on);
+            }
+            catch (SocketException e)
+            {
+                throw new RuntimeIOException(e);
+            }
+        }
+
+        public boolean isReuseAddress()
+        {
+            try
+            {
+                return socket.getReuseAddress();
+            }
+            catch (SocketException e)
+            {
+                throw new RuntimeIOException(e);
+            }
+        }
+
+        public void setReuseAddress(boolean on)
+        {
+            try
+            {
+                socket.setReuseAddress(on);
+            }
+            catch (SocketException e)
+            {
+                throw new RuntimeIOException(e);
+            }
+        }
+
+        public int getSoLinger()
+        {
+            try
+            {
+                return socket.getSoLinger();
+            }
+            catch (SocketException e)
+            {
+                throw new RuntimeIOException(e);
+            }
+        }
+
+        public void setSoLinger(int linger)
+        {
+            try
+            {
+                if (linger < 0)
+                {
+                    socket.setSoLinger(false, 0);
+                }
+                else
+                {
+                    socket.setSoLinger(true, linger);
+                }
+            }
+            catch (SocketException e)
+            {
+                throw new RuntimeIOException(e);
+            }
+        }
+
+        public boolean isTcpNoDelay()
+        {
+            try
+            {
+                return socket.getTcpNoDelay();
+            }
+            catch (SocketException e)
+            {
+                throw new RuntimeIOException(e);
+            }
+        }
+
+        public void setTcpNoDelay(boolean on)
+        {
+            try
+            {
+                socket.setTcpNoDelay(on);
+            }
+            catch (SocketException e)
+            {
+                throw new RuntimeIOException(e);
+            }
+        }
+
+        public int getTrafficClass()
+        {
+            if (SocketSessionConfigImpl.isGetTrafficClassAvailable())
+            {
+                try
+                {
+                    return socket.getTrafficClass();
+                }
+                catch (SocketException e)
+                {
+                    throw new RuntimeIOException(e);
+                }
+            }
+            else
+            {
+                return 0;
+            }
+        }
+
+        public void setTrafficClass(int tc)
+        {
+            if (SocketSessionConfigImpl.isSetTrafficClassAvailable())
+            {
+                try
+                {
+                    socket.setTrafficClass(tc);
+                }
+                catch (SocketException e)
+                {
+                    throw new RuntimeIOException(e);
+                }
+            }
+        }
+
+        public int getSendBufferSize()
+        {
+            try
+            {
+                return socket.getSendBufferSize();
+            }
+            catch (SocketException e)
+            {
+                throw new RuntimeIOException(e);
+            }
+        }
+
+        public void setSendBufferSize(int size)
+        {
+            if (SocketSessionConfigImpl.isSetSendBufferSizeAvailable())
+            {
+                try
+                {
+                    socket.setSendBufferSize(size);
+                }
+                catch (SocketException e)
+                {
+                    throw new RuntimeIOException(e);
+                }
+            }
+        }
+
+        public int getReceiveBufferSize()
+        {
+            try
+            {
+                return socket.getReceiveBufferSize();
+            }
+            catch (SocketException e)
+            {
+                throw new RuntimeIOException(e);
+            }
+        }
+
+        public void setReceiveBufferSize(int size)
+        {
+            if (SocketSessionConfigImpl.isSetReceiveBufferSizeAvailable())
+            {
+                try
+                {
+                    socket.setReceiveBufferSize(size);
+                    SocketSessionImpl.this.readBufferSize = size;
+                }
+                catch (SocketException e)
+                {
+                    throw new RuntimeIOException(e);
+                }
+            }
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/bio/SocketSessionImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/codec/AMQCodecFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/codec/AMQCodecFactory.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/codec/AMQCodecFactory.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/codec/AMQCodecFactory.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,50 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.codec;
+
+import org.apache.mina.filter.codec.ProtocolCodecFactory;
+import org.apache.mina.filter.codec.ProtocolDecoder;
+import org.apache.mina.filter.codec.ProtocolEncoder;
+
+public class AMQCodecFactory implements ProtocolCodecFactory
+{
+    private AMQEncoder _encoder = new AMQEncoder();
+
+    private AMQDecoder _frameDecoder;
+
+    /**
+     * @param expectProtocolInitiation true if the first frame received is going to be
+     * a protocol initiation frame, false if it is going to be a standard AMQ data block.
+     * The former case is used for the broker, which always expects to received the
+     * protocol initiation first from a newly connected client.
+     */
+    public AMQCodecFactory(boolean expectProtocolInitiation)
+    {
+        _frameDecoder = new AMQDecoder(expectProtocolInitiation);
+    }
+
+    public ProtocolEncoder getEncoder()
+    {
+        return _encoder;
+    }
+
+    public ProtocolDecoder getDecoder()
+    {
+        return _frameDecoder;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/codec/AMQCodecFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/codec/AMQDecoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/codec/AMQDecoder.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/codec/AMQDecoder.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/codec/AMQDecoder.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,96 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.codec;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+import org.apache.qpid.framing.AMQDataBlockDecoder;
+import org.apache.qpid.framing.ProtocolInitiation;
+
+/**
+ * There is one instance of this class per session. Any changes or configuration done
+ * at run time to the encoders or decoders only affects decoding/encoding of the
+ * protocol session data to which is it bound.
+ *
+ */
+public class AMQDecoder extends CumulativeProtocolDecoder
+{
+    private AMQDataBlockDecoder _dataBlockDecoder = new AMQDataBlockDecoder();
+
+    private ProtocolInitiation.Decoder _piDecoder = new ProtocolInitiation.Decoder();
+
+    private boolean _expectProtocolInitiation;
+
+    public AMQDecoder(boolean expectProtocolInitiation)
+    {
+        _expectProtocolInitiation = expectProtocolInitiation;
+    }
+
+    protected boolean doDecode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
+    {
+        if (_expectProtocolInitiation)
+        {
+            return doDecodePI(session, in, out);
+        }
+        else
+        {
+            return doDecodeDataBlock(session, in, out);
+        }
+    }
+
+    protected boolean doDecodeDataBlock(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
+    {
+        int pos = in.position();
+        boolean enoughData = _dataBlockDecoder.decodable(session, in);
+        in.position(pos);
+        if (!enoughData)
+        {
+            // returning false means it will leave the contents in the buffer and
+            // call us again when more data has been read
+            return false;
+        }
+        else
+        {
+            _dataBlockDecoder.decode(session, in, out);
+            return true;
+        }
+    }
+
+    private boolean doDecodePI(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
+    {
+        boolean enoughData = _piDecoder.decodable(session, in);
+        if (!enoughData)
+        {
+            // returning false means it will leave the contents in the buffer and
+            // call us again when more data has been read
+            return false;
+        }
+        else
+        {
+            _piDecoder.decode(session, in, out);
+            return true;
+        }
+    }
+
+    public void setExpectProtocolInitiation(boolean expectProtocolInitiation)
+    {
+        _expectProtocolInitiation = expectProtocolInitiation;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/codec/AMQDecoder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/codec/AMQEncoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/codec/AMQEncoder.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/codec/AMQEncoder.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/codec/AMQEncoder.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,38 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.codec;
+
+import org.apache.mina.filter.codec.ProtocolEncoder;
+import org.apache.mina.filter.codec.ProtocolEncoderOutput;
+import org.apache.mina.common.IoSession;
+import org.apache.qpid.framing.AMQDataBlockEncoder;
+
+public class AMQEncoder implements ProtocolEncoder
+{
+    private AMQDataBlockEncoder _dataBlockEncoder = new AMQDataBlockEncoder();
+
+    public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception
+    {
+        _dataBlockEncoder.encode(session, message, out);
+    }
+
+    public void dispose(IoSession session) throws Exception
+    {
+
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/codec/AMQEncoder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/configuration/Configured.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/configuration/Configured.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/configuration/Configured.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/configuration/Configured.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,41 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.configuration;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Target;
+
+/**
+ * Marks a field as being "configured" externally.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.FIELD)
+public @interface Configured
+{
+    /**
+     * The Commons Configuration path to the configuration element
+     */
+    String path();
+
+    /**
+     * The default value to use should the path not be found in the configuration source
+     */
+    String defaultValue();
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/configuration/Configured.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/configuration/PropertyException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/configuration/PropertyException.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/configuration/PropertyException.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/configuration/PropertyException.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,62 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.configuration;
+
+import org.apache.qpid.AMQException;
+import org.apache.log4j.Logger;
+
+/**
+ * Indicates an error parsing a property expansion.
+ */
+public class PropertyException extends AMQException
+{
+    public PropertyException(String message)
+    {
+        super(message);
+    }
+
+    public PropertyException(String msg, Throwable t)
+    {
+        super(msg, t);
+    }
+
+    public PropertyException(int errorCode, String msg, Throwable t)
+    {
+        super(errorCode, msg, t);
+    }
+
+    public PropertyException(int errorCode, String msg)
+    {
+        super(errorCode, msg);
+    }
+
+    public PropertyException(Logger logger, String msg, Throwable t)
+    {
+        super(logger, msg, t);
+    }
+
+    public PropertyException(Logger logger, String msg)
+    {
+        super(logger, msg);
+    }
+
+    public PropertyException(Logger logger, int errorCode, String msg)
+    {
+        super(logger, errorCode, msg);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/configuration/PropertyException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/configuration/PropertyUtils.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/configuration/PropertyUtils.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/configuration/PropertyUtils.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/configuration/PropertyUtils.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,153 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.configuration;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+
+/**
+ * Based on code in Apache Ant, this utility class handles property expansion. This
+ * is most useful in config files and so on.
+ */
+public class PropertyUtils
+{
+    /**
+     * Replaces <code>${xxx}</code> style constructions in the given value
+     * with the string value of the corresponding data types. Replaces only system
+     * properties
+     *
+     * @param value The string to be scanned for property references.
+     *              May be <code>null</code>, in which case this
+     *              method returns immediately with no effect.
+     * @return the original string with the properties replaced, or
+     *         <code>null</code> if the original string is <code>null</code>.
+     * @throws PropertyException if the string contains an opening
+     *                           <code>${</code> without a closing
+     *                           <code>}</code>
+     */
+    public static String replaceProperties(String value) throws PropertyException
+    {
+        if (value == null)
+        {
+            return null;
+        }
+
+        ArrayList<String> fragments = new ArrayList<String>();
+        ArrayList<String> propertyRefs = new ArrayList<String>();
+        parsePropertyString(value, fragments, propertyRefs);
+
+        StringBuffer sb = new StringBuffer();
+        Iterator j = propertyRefs.iterator();
+
+        for (String fragment : fragments)
+        {
+            if (fragment == null)
+            {
+                String propertyName = (String) j.next();
+
+                // try to get it from the project or keys
+                // Backward compatibility
+                String replacement = System.getProperty(propertyName);
+
+                if (replacement == null)
+                {
+                    throw new PropertyException("Property ${" + propertyName +
+                                                "} has not been set");
+                }
+                fragment = replacement;
+            }
+            sb.append(fragment);
+        }
+
+        return sb.toString();
+    }
+
+    /**
+     * Default parsing method. Parses the supplied value for properties which are specified
+     * using ${foo} syntax. $X is left as is, and $$ specifies a single $.
+     * @param value the property string to parse
+     * @param fragments is populated with the string fragments. A null means "insert a
+     * property value here. The number of nulls in the list when populated is equal to the
+     * size of the propertyRefs list
+     * @param propertyRefs populated with the property names to be added into the final
+     * String.
+     */
+    private static void parsePropertyString(String value, ArrayList<String> fragments,
+                                            ArrayList<String> propertyRefs)
+            throws PropertyException
+    {
+        int prev = 0;
+        int pos;
+        //search for the next instance of $ from the 'prev' position
+        while ((pos = value.indexOf("$", prev)) >= 0)
+        {
+
+            //if there was any text before this, add it as a fragment
+            if (pos > 0)
+            {
+                fragments.add(value.substring(prev, pos));
+            }
+            //if we are at the end of the string, we tack on a $
+            //then move past it
+            if (pos == (value.length() - 1))
+            {
+                fragments.add("$");
+                prev = pos + 1;
+            }
+            else if (value.charAt(pos + 1) != '{')
+            {
+                //peek ahead to see if the next char is a property or not
+                //not a property: insert the char as a literal
+                if (value.charAt(pos + 1) == '$')
+                {
+                    // two $ map to one $
+                    fragments.add("$");
+                    prev = pos + 2;
+                }
+                else
+                {
+                    // $X maps to $X for all values of X!='$'
+                    fragments.add(value.substring(pos, pos + 2));
+                    prev = pos + 2;
+                }
+            }
+            else
+            {
+                // property found, extract its name or bail on a typo
+                int endName = value.indexOf('}', pos);
+                if (endName < 0)
+                {
+                    throw new PropertyException("Syntax error in property: " +
+                                                value);
+                }
+                String propertyName = value.substring(pos + 2, endName);
+                fragments.add(null);
+                propertyRefs.add(propertyName);
+                prev = endName + 1;
+            }
+        }
+        //no more $ signs found
+        //if there is any tail to the file, append it
+        if (prev < value.length())
+        {
+            fragments.add(value.substring(prev));
+        }
+    }
+
+
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/configuration/PropertyUtils.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/exchange/ExchangeDefaults.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/exchange/ExchangeDefaults.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/exchange/ExchangeDefaults.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/exchange/ExchangeDefaults.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,33 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.exchange;
+
+public class ExchangeDefaults
+{
+    public final static String TOPIC_EXCHANGE_NAME = "amq.topic";
+
+    public final static String TOPIC_EXCHANGE_CLASS = "topic";
+
+    public final static String DIRECT_EXCHANGE_NAME = "amq.direct";
+
+    public final static String DIRECT_EXCHANGE_CLASS = "direct";
+
+    public final static String HEADERS_EXCHANGE_NAME = "amq.match";
+
+    public final static String HEADERS_EXCHANGE_CLASS = "headers";
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/exchange/ExchangeDefaults.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQBody.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQBody.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQBody.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,35 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+public abstract class AMQBody
+{
+    protected abstract byte getType();
+    
+    /** 
+     * Get the size of the body
+     * @return unsigned short
+     */
+    protected abstract int getSize();
+    
+    protected abstract void writePayload(ByteBuffer buffer);
+    
+    protected abstract void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException;        
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQBody.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQDataBlock.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQDataBlock.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQDataBlock.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQDataBlock.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,40 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+/**
+ * A data block represents something that has a size in bytes and the ability to write itself to a byte
+ * buffer (similar to a byte array).
+ */
+public abstract class AMQDataBlock implements EncodableAMQDataBlock
+{
+    /**
+     * Get the size of buffer needed to store the byte representation of this
+     * frame.
+     * @return unsigned integer
+     */
+    public abstract long getSize();
+
+    /**
+     * Writes the datablock to the specified buffer.
+     * @param buffer
+     */
+    public abstract void writePayload(ByteBuffer buffer);
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQDataBlock.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQDataBlockDecoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQDataBlockDecoder.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQDataBlockDecoder.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQDataBlockDecoder.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,113 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class AMQDataBlockDecoder
+{
+    Logger _logger = Logger.getLogger(AMQDataBlockDecoder.class);
+
+    private final Map _supportedBodies = new HashMap();
+
+    public AMQDataBlockDecoder()
+    {
+        _supportedBodies.put(new Byte(AMQMethodBody.TYPE), AMQMethodBodyFactory.getInstance());
+        _supportedBodies.put(new Byte(ContentHeaderBody.TYPE), ContentHeaderBodyFactory.getInstance());
+        _supportedBodies.put(new Byte(ContentBody.TYPE), ContentBodyFactory.getInstance());
+        _supportedBodies.put(new Byte(HeartbeatBody.TYPE), new HeartbeatBodyFactory());
+    }
+
+    public boolean decodable(IoSession session, ByteBuffer in) throws AMQFrameDecodingException
+    {
+        // type, channel, body size and end byte
+        if (in.remaining() < (1 + 2 + 4 + 1))
+        {
+            return false;
+        }
+
+        final byte type = in.get();
+        final int channel = in.getUnsignedShort();
+        final long bodySize = in.getUnsignedInt();
+
+        // bodySize can be zero
+        if (type <= 0 || channel < 0 || bodySize < 0)
+        {
+            throw new AMQFrameDecodingException("Undecodable frame: type = " + type + " channel = " + channel +
+                                                " bodySize = " + bodySize);
+        }
+
+        if (in.remaining() < (bodySize + 1))
+        {
+            return false;
+        }
+        return true;
+    }
+
+    private boolean isSupportedFrameType(byte frameType)
+    {
+        final boolean result = _supportedBodies.containsKey(new Byte(frameType));
+
+        if (!result)
+        {
+        	_logger.warn("AMQDataBlockDecoder does not handle frame type " + frameType);
+        }
+
+        return result;
+    }
+
+    protected Object createAndPopulateFrame(ByteBuffer in)
+                    throws AMQFrameDecodingException
+    {
+        final byte type = in.get();
+        if (!isSupportedFrameType(type))
+        {
+            throw new AMQFrameDecodingException("Unsupported frame type: " + type);
+        }
+        final int channel = in.getUnsignedShort();
+        final long bodySize = in.getUnsignedInt();
+
+        BodyFactory bodyFactory = (BodyFactory) _supportedBodies.get(new Byte(type));
+        if (bodyFactory == null)
+        {
+            throw new AMQFrameDecodingException("Unsupported body type: " + type);
+        }
+        AMQFrame frame = new AMQFrame();
+
+        frame.populateFromBuffer(in, channel, bodySize, bodyFactory);
+
+        byte marker = in.get();
+        if ((marker & 0xFF) != 0xCE)
+        {
+            throw new AMQFrameDecodingException("End of frame marker not found. Read " + marker + " size=" + bodySize + " type=" + type);
+        }
+        return frame;
+    }
+
+    public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out)
+        throws Exception
+    {
+        out.write(createAndPopulateFrame(in));
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQDataBlockDecoder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQDataBlockEncoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQDataBlockEncoder.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQDataBlockEncoder.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQDataBlockEncoder.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,62 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.codec.demux.MessageEncoder;
+import org.apache.mina.filter.codec.ProtocolEncoderOutput;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class AMQDataBlockEncoder implements MessageEncoder
+{
+	Logger _logger = Logger.getLogger(AMQDataBlockEncoder.class);
+
+    private Set _messageTypes;
+
+    public AMQDataBlockEncoder()
+    {
+        _messageTypes = new HashSet();
+        _messageTypes.add(EncodableAMQDataBlock.class);
+    }
+
+    public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception
+    {
+        final AMQDataBlock frame = (AMQDataBlock) message;
+        int frameSize = (int)frame.getSize();
+        final ByteBuffer buffer = ByteBuffer.allocate(frameSize);
+        //buffer.setAutoExpand(true);
+        frame.writePayload(buffer);
+
+        if (_logger.isDebugEnabled())
+        {
+        	_logger.debug("Encoded frame byte-buffer is '" + EncodingUtils.convertToHexString(buffer) + "'");
+        }
+
+        buffer.flip();
+        out.write(buffer);
+    }
+
+    public Set getMessageTypes()
+    {
+        return _messageTypes;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQDataBlockEncoder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQFrame.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQFrame.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQFrame.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQFrame.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,73 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
+{
+    public int channel;
+
+    public AMQBody bodyFrame;
+
+    public AMQFrame()
+    {
+    }
+
+    public AMQFrame(int channel, AMQBody bodyFrame)
+    {
+        this.channel = channel;
+        this.bodyFrame = bodyFrame;
+    }
+
+    public long getSize()
+    {
+        return 1 + 2 + 4 + bodyFrame.getSize() + 1;
+    }
+
+    public void writePayload(ByteBuffer buffer)
+    {
+        buffer.put(bodyFrame.getType());
+        // TODO: how does channel get populated
+        EncodingUtils.writeUnsignedShort(buffer, channel);
+        EncodingUtils.writeUnsignedInteger(buffer, bodyFrame.getSize());
+        bodyFrame.writePayload(buffer);
+        buffer.put((byte) 0xCE);
+    }
+
+    /**
+     *
+     * @param buffer
+     * @param channel unsigned short
+     * @param bodySize unsigned integer
+     * @param bodyFactory
+     * @throws AMQFrameDecodingException
+     */
+    public void populateFromBuffer(ByteBuffer buffer, int channel, long bodySize, BodyFactory bodyFactory)
+            throws AMQFrameDecodingException
+    {
+        this.channel = channel;
+        bodyFrame = bodyFactory.createBody(buffer);
+        bodyFrame.populateFromBuffer(buffer, bodySize);
+    }
+
+    public String toString()
+    {
+        return "Frame channelId: " + channel + ", bodyFrame: " + String.valueOf(bodyFrame);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQFrame.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQFrameDecodingException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQFrameDecodingException.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQFrameDecodingException.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQFrameDecodingException.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,45 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+
+public class AMQFrameDecodingException extends AMQException
+{
+    public AMQFrameDecodingException(String message)
+    {
+        super(message);
+    }
+
+    public AMQFrameDecodingException(String message, Throwable t)
+    {
+        super(message, t);
+    }
+
+    public AMQFrameDecodingException(Logger log, String message)
+    {
+        super(log, message);
+    }
+
+    public AMQFrameDecodingException(Logger log, String message, Throwable t)
+    {
+        super(log, message, t);
+    }
+
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQFrameDecodingException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQMethodBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQMethodBody.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQMethodBody.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQMethodBody.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,87 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQChannelException;
+
+public abstract class AMQMethodBody extends AMQBody
+{
+    public static final byte TYPE = 1;    
+
+    /** unsigned short */
+    protected abstract int getBodySize();
+
+    /**
+     * @return unsigned short
+     */
+    protected abstract int getClazz();
+
+    /**
+     * @return unsigned short
+     */
+    protected abstract int getMethod();
+
+    protected abstract void writeMethodPayload(ByteBuffer buffer);
+
+    protected byte getType()
+    {
+        return TYPE;
+    }
+
+    protected int getSize()
+    {
+        return 2 + 2 + getBodySize();
+    }
+
+    protected void writePayload(ByteBuffer buffer)
+    {
+        EncodingUtils.writeUnsignedShort(buffer, getClazz());
+        EncodingUtils.writeUnsignedShort(buffer, getMethod());
+        writeMethodPayload(buffer);
+    }
+
+    protected abstract void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException;
+
+    protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException
+    {
+        populateMethodBodyFromBuffer(buffer);
+    }
+
+    public String toString()
+    {
+        StringBuffer buf = new StringBuffer(getClass().toString());
+        buf.append(" Class: ").append(getClazz());
+        buf.append(" Method: ").append(getMethod());
+        return buf.toString();
+    }
+
+    /**
+     * Creates an AMQChannelException for the corresponding body type (a channel exception
+     * should include the class and method ids of the body it resulted from).
+     */
+    public AMQChannelException getChannelException(int code, String message)
+    {
+        return new AMQChannelException(code, message, getClazz(), getMethod());
+    }
+
+    public AMQChannelException getChannelException(int code, String message, Throwable cause)
+    {
+        return new AMQChannelException(code, message, getClazz(), getMethod(), cause);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQMethodBody.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQMethodBodyFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQMethodBodyFactory.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQMethodBodyFactory.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQMethodBodyFactory.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,43 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+
+public class AMQMethodBodyFactory implements BodyFactory
+{
+    private static final Logger _log = Logger.getLogger(AMQMethodBodyFactory.class);
+    
+    private static final AMQMethodBodyFactory _instance = new AMQMethodBodyFactory();
+    
+    public static AMQMethodBodyFactory getInstance()
+    {
+        return _instance;
+    }
+    
+    private AMQMethodBodyFactory()
+    {
+        _log.debug("Creating method body factory");
+    }
+
+    public AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException
+    {
+        return MethodBodyDecoderRegistry.get(in.getUnsignedShort(), in.getUnsignedShort());        
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQMethodBodyFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQProtocolClassException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQProtocolClassException.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQProtocolClassException.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQProtocolClassException.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,26 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+public class AMQProtocolClassException extends AMQProtocolHeaderException
+{
+   public AMQProtocolClassException(String message)
+   {
+       super(message);
+   }
+}
\ No newline at end of file

Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQProtocolClassException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQProtocolHeaderException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQProtocolHeaderException.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQProtocolHeaderException.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQProtocolHeaderException.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,28 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.qpid.AMQException;
+
+public class AMQProtocolHeaderException extends AMQException
+{
+   public AMQProtocolHeaderException(String message)
+   {
+       super(message);
+   }
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQProtocolHeaderException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQProtocolInstanceException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQProtocolInstanceException.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQProtocolInstanceException.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQProtocolInstanceException.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,26 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+public class AMQProtocolInstanceException extends AMQProtocolHeaderException
+{
+   public AMQProtocolInstanceException(String message)
+   {
+       super(message);
+   }
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQProtocolInstanceException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQProtocolVersionException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQProtocolVersionException.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQProtocolVersionException.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQProtocolVersionException.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,30 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+/**
+ * Exception that is thrown when the client and server differ on expected protocol version (header) information.
+ *
+ */
+public class AMQProtocolVersionException extends AMQProtocolHeaderException
+{
+   public AMQProtocolVersionException(String message)
+   {
+       super(message);
+   }
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQProtocolVersionException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/BasicContentHeaderProperties.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/BasicContentHeaderProperties.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/BasicContentHeaderProperties.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/BasicContentHeaderProperties.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,592 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+
+public class BasicContentHeaderProperties implements ContentHeaderProperties
+{
+    private static final Logger _logger = Logger.getLogger(BasicContentHeaderProperties.class);
+
+    /**
+     * We store the encoded form when we decode the content header so that if we need to
+     * write it out without modifying it we can do so without incurring the expense of
+     * reencoding it
+     */
+    private byte[] _encodedForm;
+
+    /**
+     * Flag indicating whether the entire content header has been decoded yet
+     */
+    private boolean _decoded = true;
+
+    /**
+     * We have some optimisations for partial decoding for maximum performance. The headers are used in the broker
+     * for routing in some cases so we can decode that separately.
+     */
+    private boolean _decodedHeaders = true;
+
+    /**
+     * We have some optimisations for partial decoding for maximum performance. The content type is used by all
+     * clients to determine the message type
+     */
+    private boolean _decodedContentType = true;
+
+    private String _contentType;
+
+    private String _encoding;
+
+    private FieldTable _headers;
+
+    private byte _deliveryMode;
+
+    private byte _priority;
+
+    private String _correlationId;
+
+    private String _replyTo;
+
+    private long _expiration;
+
+    private String _messageId;
+
+    private long _timestamp;
+
+    private String _type;
+
+    private String _userId;
+
+    private String _appId;
+
+    private String _clusterId;
+
+    private int _propertyFlags = 0;
+
+    public BasicContentHeaderProperties()
+    {
+    }
+
+    public int getPropertyListSize()
+    {
+        if (_encodedForm != null)
+        {
+            return _encodedForm.length;
+        }
+        else
+        {
+            int size = 0;
+
+            if ((_propertyFlags & (1 << 15)) > 0)
+            {
+                size += EncodingUtils.encodedShortStringLength(_contentType);
+            }
+            if ((_propertyFlags & (1 << 14)) > 0)
+            {
+                size += EncodingUtils.encodedShortStringLength(_encoding);
+            }
+            if ((_propertyFlags & (1 << 13)) > 0)
+            {
+                size += EncodingUtils.encodedFieldTableLength(_headers);
+            }
+            if ((_propertyFlags & (1 << 12)) > 0)
+            {
+                size += 1;
+            }
+            if ((_propertyFlags & (1 << 11)) > 0)
+            {
+                size += 1;
+            }
+            if ((_propertyFlags & (1 << 10)) > 0)
+            {
+                size += EncodingUtils.encodedShortStringLength(_correlationId);
+            }
+            if ((_propertyFlags & (1 << 9)) > 0)
+            {
+                size += EncodingUtils.encodedShortStringLength(_replyTo);
+            }
+            if ((_propertyFlags & (1 << 8)) > 0)
+            {
+                size += EncodingUtils.encodedShortStringLength(String.valueOf(_expiration));
+            }
+            if ((_propertyFlags & (1 << 7)) > 0)
+            {
+                size += EncodingUtils.encodedShortStringLength(_messageId);
+            }
+            if ((_propertyFlags & (1 << 6)) > 0)
+            {
+                size += 8;
+            }
+            if ((_propertyFlags & (1 << 5)) > 0)
+            {
+                size += EncodingUtils.encodedShortStringLength(_type);
+            }
+            if ((_propertyFlags & (1 << 4)) > 0)
+            {
+                size += EncodingUtils.encodedShortStringLength(_userId);
+            }
+            if ((_propertyFlags & (1 << 3)) > 0)
+            {
+                size += EncodingUtils.encodedShortStringLength(_appId);
+            }
+            if ((_propertyFlags & (1 << 2)) > 0)
+            {
+                size += EncodingUtils.encodedShortStringLength(_clusterId);
+            }
+            return size;
+        }
+    }
+
+    private void clearEncodedForm()
+    {
+        if (!_decoded && _encodedForm != null)
+        {
+            //decode();
+        }
+        _encodedForm = null;
+    }
+
+    public void setPropertyFlags(int propertyFlags)
+    {
+        clearEncodedForm();
+        _propertyFlags = propertyFlags;
+    }
+
+    public int getPropertyFlags()
+    {
+        return _propertyFlags;
+    }
+
+    public void writePropertyListPayload(ByteBuffer buffer)
+    {
+        if (_encodedForm != null)
+        {
+            buffer.put(_encodedForm);
+        }
+        else
+        {
+            if ((_propertyFlags & (1 << 15)) > 0)
+            {
+                EncodingUtils.writeShortStringBytes(buffer, _contentType);
+            }
+            if ((_propertyFlags & (1 << 14)) > 0)
+            {
+                EncodingUtils.writeShortStringBytes(buffer, _encoding);
+            }
+            if ((_propertyFlags & (1 << 13)) > 0)
+            {
+                EncodingUtils.writeFieldTableBytes(buffer, _headers);
+            }
+            if ((_propertyFlags & (1 << 12)) > 0)
+            {
+                buffer.put(_deliveryMode);
+            }
+            if ((_propertyFlags & (1 << 11)) > 0)
+            {
+                buffer.put(_priority);
+            }
+            if ((_propertyFlags & (1 << 10)) > 0)
+            {
+                EncodingUtils.writeShortStringBytes(buffer, _correlationId);
+            }
+            if ((_propertyFlags & (1 << 9)) > 0)
+            {
+                EncodingUtils.writeShortStringBytes(buffer, _replyTo);
+            }
+            if ((_propertyFlags & (1 << 8)) > 0)
+            {
+                EncodingUtils.writeShortStringBytes(buffer, String.valueOf(_expiration));
+            }
+            if ((_propertyFlags & (1 << 7)) > 0)
+            {
+                EncodingUtils.writeShortStringBytes(buffer, _messageId);
+            }
+            if ((_propertyFlags & (1 << 6)) > 0)
+            {
+                EncodingUtils.writeUnsignedInteger(buffer, 0/*timestamp msb*/);
+                EncodingUtils.writeUnsignedInteger(buffer, _timestamp);
+            }
+            if ((_propertyFlags & (1 << 5)) > 0)
+            {
+                EncodingUtils.writeShortStringBytes(buffer, _type);
+            }
+            if ((_propertyFlags & (1 << 4)) > 0)
+            {
+                EncodingUtils.writeShortStringBytes(buffer, _userId);
+            }
+            if ((_propertyFlags & (1 << 3)) > 0)
+            {
+                EncodingUtils.writeShortStringBytes(buffer, _appId);
+            }
+            if ((_propertyFlags & (1 << 2)) > 0)
+            {
+                EncodingUtils.writeShortStringBytes(buffer, _clusterId);
+            }
+        }
+    }
+    
+    public void populatePropertiesFromBuffer(ByteBuffer buffer, int propertyFlags, int size)
+            throws AMQFrameDecodingException
+    {
+        _propertyFlags = propertyFlags;
+
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Property flags: " + _propertyFlags);
+        }
+        decode(buffer);
+        /*_encodedForm = new byte[size];
+        buffer.get(_encodedForm, 0, size);
+        _decoded = false;
+        _decodedHeaders = false;
+        _decodedContentType = false;*/
+    }
+
+    private void decode(ByteBuffer buffer)
+    {
+        //ByteBuffer buffer = ByteBuffer.wrap(_encodedForm);
+        int pos = buffer.position();
+        try
+        {
+            if ((_propertyFlags & (1 << 15)) > 0)
+            {
+                _contentType = EncodingUtils.readShortString(buffer);
+            }
+            if ((_propertyFlags & (1 << 14)) > 0)
+            {
+                _encoding = EncodingUtils.readShortString(buffer);
+            }
+            if ((_propertyFlags & (1 << 13)) > 0)
+            {
+                _headers = EncodingUtils.readFieldTable(buffer);
+            }
+            if ((_propertyFlags & (1 << 12)) > 0)
+            {
+                _deliveryMode = buffer.get();
+            }
+            if ((_propertyFlags & (1 << 11)) > 0)
+            {
+                _priority = buffer.get();
+            }
+            if ((_propertyFlags & (1 << 10)) > 0)
+            {
+                _correlationId = EncodingUtils.readShortString(buffer);
+            }
+            if ((_propertyFlags & (1 << 9)) > 0)
+            {
+                _replyTo = EncodingUtils.readShortString(buffer);
+            }
+            if ((_propertyFlags & (1 << 8)) > 0)
+            {
+                _expiration = Long.parseLong(EncodingUtils.readShortString(buffer));
+            }
+            if ((_propertyFlags & (1 << 7)) > 0)
+            {
+                _messageId = EncodingUtils.readShortString(buffer);
+            }
+            if ((_propertyFlags & (1 << 6)) > 0)
+            {
+                // Discard msb from AMQ timestamp
+                buffer.getUnsignedInt();
+                _timestamp = buffer.getUnsignedInt();
+            }
+            if ((_propertyFlags & (1 << 5)) > 0)
+            {
+                _type = EncodingUtils.readShortString(buffer);
+            }
+            if ((_propertyFlags & (1 << 4)) > 0)
+            {
+                _userId = EncodingUtils.readShortString(buffer);
+            }
+            if ((_propertyFlags & (1 << 3)) > 0)
+            {
+                _appId = EncodingUtils.readShortString(buffer);
+            }
+            if ((_propertyFlags & (1 << 2)) > 0)
+            {
+                _clusterId = EncodingUtils.readShortString(buffer);
+            }
+        }
+        catch (AMQFrameDecodingException e)
+        {
+            throw new RuntimeException("Error in content header data: " + e);
+        }
+
+        final int endPos = buffer.position();
+        buffer.position(pos);
+        final int len = endPos - pos;
+        _encodedForm = new byte[len];
+        final int limit = buffer.limit();
+        buffer.limit(endPos);
+        buffer.get(_encodedForm, 0, len);
+        buffer.limit(limit);
+        buffer.position(endPos);
+        _decoded = true;
+    }
+
+
+    private void decodeUpToHeaders()
+    {
+        ByteBuffer buffer = ByteBuffer.wrap(_encodedForm);
+        try
+        {
+            if ((_propertyFlags & (1 << 15)) > 0)
+            {
+                byte length = buffer.get();
+                buffer.skip(length);
+            }
+            if ((_propertyFlags & (1 << 14)) > 0)
+            {
+                byte length = buffer.get();
+                buffer.skip(length);
+            }
+            if ((_propertyFlags & (1 << 13)) > 0)
+            {
+                _headers = EncodingUtils.readFieldTable(buffer);
+            }
+            _decodedHeaders = true;
+        }
+        catch (AMQFrameDecodingException e)
+        {
+            throw new RuntimeException("Error in content header data: " + e);
+        }
+    }
+
+    private void decodeUpToContentType()
+    {
+        ByteBuffer buffer = ByteBuffer.wrap(_encodedForm);
+
+        if ((_propertyFlags & (1 << 15)) > 0)
+        {
+            _contentType = EncodingUtils.readShortString(buffer);
+        }
+
+        _decodedContentType = true;
+    }
+
+    private void decodeIfNecessary()
+    {
+        if (!_decoded)
+        {
+            //decode();
+        }
+    }
+
+    private void decodeHeadersIfNecessary()
+    {
+        if (!_decoded && !_decodedHeaders)
+        {
+            decodeUpToHeaders();
+        }
+    }
+
+    private void decodeContentTypeIfNecessary()
+    {
+        if (!_decoded && !_decodedContentType)
+        {
+            decodeUpToContentType();
+        }
+    }
+    public String getContentType()
+    {
+        decodeContentTypeIfNecessary();
+        return _contentType;
+    }
+
+    public void setContentType(String contentType)
+    {
+        clearEncodedForm();
+        _propertyFlags |= (1 << 15);
+        _contentType = contentType;
+    }
+
+    public String getEncoding()
+    {
+        decodeIfNecessary();
+        return _encoding;
+    }
+
+    public void setEncoding(String encoding)
+    {
+        clearEncodedForm();
+        _propertyFlags |= (1 << 14);
+        _encoding = encoding;
+    }
+
+    public FieldTable getHeaders()
+    {
+        decodeHeadersIfNecessary();
+        return _headers;
+    }
+
+    public void setHeaders(FieldTable headers)
+    {
+        clearEncodedForm();
+        _propertyFlags |= (1 << 13);
+        _headers = headers;
+    }
+
+    public byte getDeliveryMode()
+    {
+        decodeIfNecessary();
+        return _deliveryMode;
+    }
+
+    public void setDeliveryMode(byte deliveryMode)
+    {
+        clearEncodedForm();
+        _propertyFlags |= (1 << 12);
+        _deliveryMode = deliveryMode;
+    }
+
+    public byte getPriority()
+    {
+        decodeIfNecessary();
+        return _priority;
+    }
+
+    public void setPriority(byte priority)
+    {
+        clearEncodedForm();
+        _propertyFlags |= (1 << 11);
+        _priority = priority;
+    }
+
+    public String getCorrelationId()
+    {
+        decodeIfNecessary();
+        return _correlationId;
+    }
+
+    public void setCorrelationId(String correlationId)
+    {
+        clearEncodedForm();
+        _propertyFlags |= (1 << 10);
+        _correlationId = correlationId;
+    }
+
+    public String getReplyTo()
+    {
+        decodeIfNecessary();
+        return _replyTo;
+    }
+
+    public void setReplyTo(String replyTo)
+    {
+        clearEncodedForm();
+        _propertyFlags |= (1 << 9);
+        _replyTo = replyTo;
+    }
+
+    public long getExpiration()
+    {
+        decodeIfNecessary();
+        return _expiration;
+    }
+
+    public void setExpiration(long expiration)
+    {
+        clearEncodedForm();
+        _propertyFlags |= (1 << 8);
+        _expiration = expiration;
+    }
+
+
+    public String getMessageId()
+    {
+        decodeIfNecessary();
+        return _messageId;
+    }
+
+    public void setMessageId(String messageId)
+    {
+        clearEncodedForm();
+        _propertyFlags |= (1 << 7);
+        _messageId = messageId;
+    }
+
+    public long getTimestamp()
+    {
+        decodeIfNecessary();
+        return _timestamp;
+    }
+
+    public void setTimestamp(long timestamp)
+    {
+        clearEncodedForm();
+        _propertyFlags |= (1 << 6);
+        _timestamp = timestamp;
+    }
+
+    public String getType()
+    {
+        decodeIfNecessary();
+        return _type;
+    }
+
+    public void setType(String type)
+    {
+        clearEncodedForm();
+        _propertyFlags |= (1 << 5);
+        _type = type;
+    }
+
+    public String getUserId()
+    {
+        decodeIfNecessary();
+        return _userId;
+    }
+
+    public void setUserId(String userId)
+    {
+        clearEncodedForm();
+        _propertyFlags |= (1 << 4);
+        _userId = userId;
+    }
+
+    public String getAppId()
+    {
+        decodeIfNecessary();
+        return _appId;
+    }
+
+    public void setAppId(String appId)
+    {
+        clearEncodedForm();
+        _propertyFlags |= (1 << 3);
+        _appId = appId;
+    }
+
+    public String getClusterId()
+    {
+        decodeIfNecessary();
+        return _clusterId;
+    }
+
+    public void setClusterId(String clusterId)
+    {
+        clearEncodedForm();
+        _propertyFlags |= (1 << 2);
+        _clusterId = clusterId;
+    }
+
+    public String toString()
+    {
+        return "reply-to = " + _replyTo + " propertyFlags = " + _propertyFlags;   
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/BasicContentHeaderProperties.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/BodyFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/BodyFactory.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/BodyFactory.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/BodyFactory.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,28 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+/**
+ * Any class that is capable of turning a stream of bytes into an AMQ structure must implement this interface.
+ */
+public interface BodyFactory
+{
+    AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException;
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/BodyFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native