You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2006/09/20 00:07:25 UTC
svn commit: r447994 [36/46] - in /incubator/qpid/trunk/qpid: ./ cpp/
cpp/bin/ cpp/broker/ cpp/broker/inc/ cpp/broker/src/ cpp/broker/test/
cpp/client/ cpp/client/inc/ cpp/client/src/ cpp/client/test/ cpp/common/
cpp/common/concurrent/ cpp/common/concur...
Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/bio/SocketConnector.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/bio/SocketConnector.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/bio/SocketConnector.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/bio/SocketConnector.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,150 @@
+/*
+ * @(#) $Id: SocketConnector.java 389042 2006-03-27 07:49:41Z trustin $
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.bio;
+
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.common.support.BaseIoConnector;
+import org.apache.mina.common.support.DefaultConnectFuture;
+import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
+import org.apache.mina.transport.socket.nio.SocketSessionConfig;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.nio.channels.ByteChannel;
+import java.nio.channels.SocketChannel;
+
+/**
+ */
+public class SocketConnector extends BaseIoConnector
+{
+ /**
+ * @noinspection StaticNonFinalField
+ */
+ private static final Sequence idSequence = new Sequence();
+
+ private final Object lock = new Object();
+ private final String threadName = "SocketConnector-" + idSequence.nextId();
+ private final IoServiceConfig defaultConfig = new SocketConnectorConfig();
+ private final Set managedSessions = Collections.synchronizedSet(new HashSet());
+
+ /**
+ * Create a connector with a single processing thread
+ */
+ public SocketConnector()
+ {
+ }
+
+ public IoServiceConfig getDefaultConfig()
+ {
+ return defaultConfig;
+ }
+
+ public ConnectFuture connect(SocketAddress address, IoHandler handler, IoServiceConfig config)
+ {
+ return connect(address, null, handler, config);
+ }
+
+ public ConnectFuture connect(SocketAddress address, SocketAddress localAddress,
+ IoHandler handler, IoServiceConfig config)
+ {
+ if (address == null)
+ {
+ throw new NullPointerException("address");
+ }
+ if (handler == null)
+ {
+ throw new NullPointerException("handler");
+ }
+
+ if (! (address instanceof InetSocketAddress))
+ {
+ throw new IllegalArgumentException("Unexpected address type: " + address.getClass());
+ }
+ if (localAddress != null && !(localAddress instanceof InetSocketAddress))
+ {
+ throw new IllegalArgumentException("Unexpected local address type: " + localAddress.getClass());
+ }
+ if (config == null)
+ {
+ config = getDefaultConfig();
+ }
+
+ DefaultConnectFuture future = new DefaultConnectFuture();
+ try
+ {
+
+ //Socket socket = new Socket();
+ //socket.connect(address);
+ //SimpleSocketChannel channel = new SimpleSocketChannel(socket);
+ //SocketAddress serviceAddress = socket.getRemoteSocketAddress();
+
+ SocketChannel channel = SocketChannel.open(address);
+ channel.configureBlocking(true);
+ SocketAddress serviceAddress = channel.socket().getRemoteSocketAddress();
+
+
+ SocketSessionImpl session = newSession(channel, handler, config, channel.socket().getRemoteSocketAddress());
+ future.setSession(session);
+ }
+ catch (IOException e)
+ {
+ future.setException(e);
+ }
+
+ return future;
+ }
+
+ private SocketSessionImpl newSession(ByteChannel channel, IoHandler handler, IoServiceConfig config, SocketAddress serviceAddress)
+ throws IOException
+ {
+ SocketSessionImpl session = new SocketSessionImpl(this,
+ (SocketSessionConfig) config.getSessionConfig(),
+ handler,
+ channel,
+ serviceAddress);
+ try
+ {
+ getFilterChainBuilder().buildFilterChain(session.getFilterChain());
+ config.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
+ config.getThreadModel().buildFilterChain(session.getFilterChain());
+ ((SocketFilterChain) session.getFilterChain()).sessionCreated(session);
+
+ session.start();
+ //not sure if this will work... socket is already opened before the created callback is called...
+ ((SocketFilterChain) session.getFilterChain()).sessionOpened(session);
+ }
+ catch (Throwable e)
+ {
+ throw (IOException) new IOException("Failed to create a session.").initCause(e);
+ }
+
+ //TODO: figure out how the managed session are used/ what they are etc.
+ //session.getManagedSessions().add( session );
+
+
+ return session;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/bio/SocketConnector.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/bio/SocketFilterChain.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/bio/SocketFilterChain.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/bio/SocketFilterChain.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/bio/SocketFilterChain.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,62 @@
+/*
+ * @(#) $Id: SocketFilterChain.java 398039 2006-04-28 23:36:27Z proyal $
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.bio;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoFilter.WriteRequest;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.support.AbstractIoFilterChain;
+
+import java.io.IOException;
+import java.nio.channels.ClosedByInterruptException;
+
+/**
+ */
+class SocketFilterChain extends AbstractIoFilterChain
+{
+
+ SocketFilterChain(IoSession parent)
+ {
+ super(parent);
+ }
+
+ protected void doWrite(IoSession session, WriteRequest writeRequest) throws Exception
+ {
+ SocketSessionImpl s = (SocketSessionImpl) session;
+
+ //write to socket
+ try
+ {
+ s.getChannel().write(((ByteBuffer) writeRequest.getMessage()).buf());
+
+ //notify of completion
+ writeRequest.getFuture().setWritten(true);
+ }
+ catch(ClosedByInterruptException e)
+ {
+ writeRequest.getFuture().setWritten(false);
+ }
+ }
+
+ protected void doClose(IoSession session) throws IOException
+ {
+ SocketSessionImpl s = (SocketSessionImpl) session;
+ s.shutdown();
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/bio/SocketFilterChain.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/bio/SocketSessionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/bio/SocketSessionImpl.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/bio/SocketSessionImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/bio/SocketSessionImpl.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,421 @@
+/*
+ * @(#) $Id: SocketSessionImpl.java 398039 2006-04-28 23:36:27Z proyal $
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.bio;
+
+import org.apache.mina.common.IoFilter.WriteRequest;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoService;
+import org.apache.mina.common.IoSessionConfig;
+import org.apache.mina.common.RuntimeIOException;
+import org.apache.mina.common.TransportType;
+import org.apache.mina.common.support.BaseIoSession;
+import org.apache.mina.common.support.BaseIoSessionConfig;
+import org.apache.mina.transport.socket.nio.SocketSessionConfig;
+import org.apache.mina.transport.socket.nio.SocketSessionConfigImpl;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.nio.channels.ByteChannel;
+import java.nio.channels.SocketChannel;
+
+/**
+ */
+class SocketSessionImpl extends BaseIoSession
+{
+ private final IoService manager;
+ private final SocketSessionConfig config;
+ private final SocketFilterChain filterChain;
+ private final IoHandler handler;
+ private final SocketAddress remoteAddress;
+ private final SocketAddress localAddress;
+ private final SocketAddress serviceAddress;
+ private final Socket socket;
+ private final ByteChannel channel;
+ private final Reader reader;
+ private Thread runner;
+ private int readBufferSize;
+
+ /**
+ * Creates a new instance.
+ */
+ SocketSessionImpl(IoService manager,
+ SocketSessionConfig config,
+ IoHandler handler,
+ ByteChannel channel,
+ SocketAddress serviceAddress) throws IOException
+ {
+ this.manager = manager;
+ this.filterChain = new SocketFilterChain(this);
+ this.handler = handler;
+ this.channel = channel;
+ if(channel instanceof SocketChannel)
+ {
+ socket = ((SocketChannel) channel).socket();
+ }
+ else if(channel instanceof SimpleSocketChannel)
+ {
+ socket = ((SimpleSocketChannel) channel).socket();
+ }
+ else
+ {
+ throw new IllegalArgumentException("Unrecognised channel type: " + channel.getClass());
+ }
+
+ this.remoteAddress = socket.getRemoteSocketAddress();
+ this.localAddress = socket.getLocalSocketAddress();
+ this.serviceAddress = serviceAddress;
+
+ this.config = new SessionConfigImpl(config);
+
+ reader = new Reader(handler, this);
+ }
+
+ void start()
+ {
+ //create & start thread for this...
+ runner = new Thread(reader);
+ runner.start();
+ }
+
+ void shutdown() throws IOException
+ {
+ filterChain.sessionClosed( this );
+ reader.stop();
+ channel.close();
+ }
+
+ ByteChannel getChannel()
+ {
+ return channel;
+ }
+
+ protected void write0(WriteRequest writeRequest)
+ {
+ filterChain.filterWrite(this, writeRequest);
+ }
+
+ protected void close0()
+ {
+ filterChain.filterClose(this);
+ super.close0();
+ }
+
+ protected void updateTrafficMask()
+ {
+ //TODO
+ }
+
+ public IoService getService()
+ {
+ return manager;
+ }
+
+ public IoSessionConfig getConfig()
+ {
+ return config;
+ }
+
+ public IoFilterChain getFilterChain()
+ {
+ return filterChain;
+ }
+
+ public IoHandler getHandler()
+ {
+ return handler;
+ }
+
+ public int getScheduledWriteRequests()
+ {
+ return 0;
+ }
+
+ public int getScheduledWriteBytes()
+ {
+ return 0;
+ }
+
+ public TransportType getTransportType()
+ {
+ return TransportType.SOCKET;
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return remoteAddress;
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return localAddress;
+ }
+
+ public SocketAddress getServiceAddress()
+ {
+ return serviceAddress;
+ }
+
+ int getReadBufferSize()
+ {
+ return readBufferSize;
+ }
+
+ private class SessionConfigImpl extends BaseIoSessionConfig implements SocketSessionConfig
+ {
+ SessionConfigImpl()
+ {
+ }
+
+ SessionConfigImpl(SocketSessionConfig cfg)
+ {
+ setKeepAlive(cfg.isKeepAlive());
+ setOobInline(cfg.isOobInline());
+ setReceiveBufferSize(cfg.getReceiveBufferSize());
+ readBufferSize = cfg.getReceiveBufferSize();
+ setReuseAddress(cfg.isReuseAddress());
+ setSendBufferSize(cfg.getSendBufferSize());
+ setSoLinger(cfg.getSoLinger());
+ setTcpNoDelay(cfg.isTcpNoDelay());
+ if (getTrafficClass() != cfg.getTrafficClass())
+ {
+ setTrafficClass(cfg.getTrafficClass());
+ }
+ }
+
+
+ public boolean isKeepAlive()
+ {
+ try
+ {
+ return socket.getKeepAlive();
+ }
+ catch (SocketException e)
+ {
+ throw new RuntimeIOException(e);
+ }
+ }
+
+ public void setKeepAlive(boolean on)
+ {
+ try
+ {
+ socket.setKeepAlive(on);
+ }
+ catch (SocketException e)
+ {
+ throw new RuntimeIOException(e);
+ }
+ }
+
+ public boolean isOobInline()
+ {
+ try
+ {
+ return socket.getOOBInline();
+ }
+ catch (SocketException e)
+ {
+ throw new RuntimeIOException(e);
+ }
+ }
+
+ public void setOobInline(boolean on)
+ {
+ try
+ {
+ socket.setOOBInline(on);
+ }
+ catch (SocketException e)
+ {
+ throw new RuntimeIOException(e);
+ }
+ }
+
+ public boolean isReuseAddress()
+ {
+ try
+ {
+ return socket.getReuseAddress();
+ }
+ catch (SocketException e)
+ {
+ throw new RuntimeIOException(e);
+ }
+ }
+
+ public void setReuseAddress(boolean on)
+ {
+ try
+ {
+ socket.setReuseAddress(on);
+ }
+ catch (SocketException e)
+ {
+ throw new RuntimeIOException(e);
+ }
+ }
+
+ public int getSoLinger()
+ {
+ try
+ {
+ return socket.getSoLinger();
+ }
+ catch (SocketException e)
+ {
+ throw new RuntimeIOException(e);
+ }
+ }
+
+ public void setSoLinger(int linger)
+ {
+ try
+ {
+ if (linger < 0)
+ {
+ socket.setSoLinger(false, 0);
+ }
+ else
+ {
+ socket.setSoLinger(true, linger);
+ }
+ }
+ catch (SocketException e)
+ {
+ throw new RuntimeIOException(e);
+ }
+ }
+
+ public boolean isTcpNoDelay()
+ {
+ try
+ {
+ return socket.getTcpNoDelay();
+ }
+ catch (SocketException e)
+ {
+ throw new RuntimeIOException(e);
+ }
+ }
+
+ public void setTcpNoDelay(boolean on)
+ {
+ try
+ {
+ socket.setTcpNoDelay(on);
+ }
+ catch (SocketException e)
+ {
+ throw new RuntimeIOException(e);
+ }
+ }
+
+ public int getTrafficClass()
+ {
+ if (SocketSessionConfigImpl.isGetTrafficClassAvailable())
+ {
+ try
+ {
+ return socket.getTrafficClass();
+ }
+ catch (SocketException e)
+ {
+ throw new RuntimeIOException(e);
+ }
+ }
+ else
+ {
+ return 0;
+ }
+ }
+
+ public void setTrafficClass(int tc)
+ {
+ if (SocketSessionConfigImpl.isSetTrafficClassAvailable())
+ {
+ try
+ {
+ socket.setTrafficClass(tc);
+ }
+ catch (SocketException e)
+ {
+ throw new RuntimeIOException(e);
+ }
+ }
+ }
+
+ public int getSendBufferSize()
+ {
+ try
+ {
+ return socket.getSendBufferSize();
+ }
+ catch (SocketException e)
+ {
+ throw new RuntimeIOException(e);
+ }
+ }
+
+ public void setSendBufferSize(int size)
+ {
+ if (SocketSessionConfigImpl.isSetSendBufferSizeAvailable())
+ {
+ try
+ {
+ socket.setSendBufferSize(size);
+ }
+ catch (SocketException e)
+ {
+ throw new RuntimeIOException(e);
+ }
+ }
+ }
+
+ public int getReceiveBufferSize()
+ {
+ try
+ {
+ return socket.getReceiveBufferSize();
+ }
+ catch (SocketException e)
+ {
+ throw new RuntimeIOException(e);
+ }
+ }
+
+ public void setReceiveBufferSize(int size)
+ {
+ if (SocketSessionConfigImpl.isSetReceiveBufferSizeAvailable())
+ {
+ try
+ {
+ socket.setReceiveBufferSize(size);
+ SocketSessionImpl.this.readBufferSize = size;
+ }
+ catch (SocketException e)
+ {
+ throw new RuntimeIOException(e);
+ }
+ }
+ }
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/bio/SocketSessionImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/codec/AMQCodecFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/codec/AMQCodecFactory.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/codec/AMQCodecFactory.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/codec/AMQCodecFactory.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,50 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.codec;
+
+import org.apache.mina.filter.codec.ProtocolCodecFactory;
+import org.apache.mina.filter.codec.ProtocolDecoder;
+import org.apache.mina.filter.codec.ProtocolEncoder;
+
+public class AMQCodecFactory implements ProtocolCodecFactory
+{
+ private AMQEncoder _encoder = new AMQEncoder();
+
+ private AMQDecoder _frameDecoder;
+
+ /**
+ * @param expectProtocolInitiation true if the first frame received is going to be
+ * a protocol initiation frame, false if it is going to be a standard AMQ data block.
+ * The former case is used for the broker, which always expects to received the
+ * protocol initiation first from a newly connected client.
+ */
+ public AMQCodecFactory(boolean expectProtocolInitiation)
+ {
+ _frameDecoder = new AMQDecoder(expectProtocolInitiation);
+ }
+
+ public ProtocolEncoder getEncoder()
+ {
+ return _encoder;
+ }
+
+ public ProtocolDecoder getDecoder()
+ {
+ return _frameDecoder;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/codec/AMQCodecFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/codec/AMQDecoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/codec/AMQDecoder.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/codec/AMQDecoder.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/codec/AMQDecoder.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,96 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.codec;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+import org.apache.qpid.framing.AMQDataBlockDecoder;
+import org.apache.qpid.framing.ProtocolInitiation;
+
+/**
+ * There is one instance of this class per session. Any changes or configuration done
+ * at run time to the encoders or decoders only affects decoding/encoding of the
+ * protocol session data to which is it bound.
+ *
+ */
+public class AMQDecoder extends CumulativeProtocolDecoder
+{
+ private AMQDataBlockDecoder _dataBlockDecoder = new AMQDataBlockDecoder();
+
+ private ProtocolInitiation.Decoder _piDecoder = new ProtocolInitiation.Decoder();
+
+ private boolean _expectProtocolInitiation;
+
+ public AMQDecoder(boolean expectProtocolInitiation)
+ {
+ _expectProtocolInitiation = expectProtocolInitiation;
+ }
+
+ protected boolean doDecode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
+ {
+ if (_expectProtocolInitiation)
+ {
+ return doDecodePI(session, in, out);
+ }
+ else
+ {
+ return doDecodeDataBlock(session, in, out);
+ }
+ }
+
+ protected boolean doDecodeDataBlock(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
+ {
+ int pos = in.position();
+ boolean enoughData = _dataBlockDecoder.decodable(session, in);
+ in.position(pos);
+ if (!enoughData)
+ {
+ // returning false means it will leave the contents in the buffer and
+ // call us again when more data has been read
+ return false;
+ }
+ else
+ {
+ _dataBlockDecoder.decode(session, in, out);
+ return true;
+ }
+ }
+
+ private boolean doDecodePI(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
+ {
+ boolean enoughData = _piDecoder.decodable(session, in);
+ if (!enoughData)
+ {
+ // returning false means it will leave the contents in the buffer and
+ // call us again when more data has been read
+ return false;
+ }
+ else
+ {
+ _piDecoder.decode(session, in, out);
+ return true;
+ }
+ }
+
+ public void setExpectProtocolInitiation(boolean expectProtocolInitiation)
+ {
+ _expectProtocolInitiation = expectProtocolInitiation;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/codec/AMQDecoder.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/codec/AMQEncoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/codec/AMQEncoder.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/codec/AMQEncoder.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/codec/AMQEncoder.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,38 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.codec;
+
+import org.apache.mina.filter.codec.ProtocolEncoder;
+import org.apache.mina.filter.codec.ProtocolEncoderOutput;
+import org.apache.mina.common.IoSession;
+import org.apache.qpid.framing.AMQDataBlockEncoder;
+
+public class AMQEncoder implements ProtocolEncoder
+{
+ private AMQDataBlockEncoder _dataBlockEncoder = new AMQDataBlockEncoder();
+
+ public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception
+ {
+ _dataBlockEncoder.encode(session, message, out);
+ }
+
+ public void dispose(IoSession session) throws Exception
+ {
+
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/codec/AMQEncoder.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/configuration/Configured.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/configuration/Configured.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/configuration/Configured.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/configuration/Configured.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,41 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.configuration;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Target;
+
+/**
+ * Marks a field as being "configured" externally.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.FIELD)
+public @interface Configured
+{
+ /**
+ * The Commons Configuration path to the configuration element
+ */
+ String path();
+
+ /**
+ * The default value to use should the path not be found in the configuration source
+ */
+ String defaultValue();
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/configuration/Configured.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/configuration/PropertyException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/configuration/PropertyException.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/configuration/PropertyException.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/configuration/PropertyException.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,62 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.configuration;
+
+import org.apache.qpid.AMQException;
+import org.apache.log4j.Logger;
+
+/**
+ * Indicates an error parsing a property expansion.
+ */
+public class PropertyException extends AMQException
+{
+ public PropertyException(String message)
+ {
+ super(message);
+ }
+
+ public PropertyException(String msg, Throwable t)
+ {
+ super(msg, t);
+ }
+
+ public PropertyException(int errorCode, String msg, Throwable t)
+ {
+ super(errorCode, msg, t);
+ }
+
+ public PropertyException(int errorCode, String msg)
+ {
+ super(errorCode, msg);
+ }
+
+ public PropertyException(Logger logger, String msg, Throwable t)
+ {
+ super(logger, msg, t);
+ }
+
+ public PropertyException(Logger logger, String msg)
+ {
+ super(logger, msg);
+ }
+
+ public PropertyException(Logger logger, int errorCode, String msg)
+ {
+ super(logger, errorCode, msg);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/configuration/PropertyException.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/configuration/PropertyUtils.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/configuration/PropertyUtils.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/configuration/PropertyUtils.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/configuration/PropertyUtils.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,153 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.configuration;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+
+/**
+ * Based on code in Apache Ant, this utility class handles property expansion. This
+ * is most useful in config files and so on.
+ */
+public class PropertyUtils
+{
+ /**
+ * Replaces <code>${xxx}</code> style constructions in the given value
+ * with the string value of the corresponding data types. Replaces only system
+ * properties
+ *
+ * @param value The string to be scanned for property references.
+ * May be <code>null</code>, in which case this
+ * method returns immediately with no effect.
+ * @return the original string with the properties replaced, or
+ * <code>null</code> if the original string is <code>null</code>.
+ * @throws PropertyException if the string contains an opening
+ * <code>${</code> without a closing
+ * <code>}</code>
+ */
+ public static String replaceProperties(String value) throws PropertyException
+ {
+ if (value == null)
+ {
+ return null;
+ }
+
+ ArrayList<String> fragments = new ArrayList<String>();
+ ArrayList<String> propertyRefs = new ArrayList<String>();
+ parsePropertyString(value, fragments, propertyRefs);
+
+ StringBuffer sb = new StringBuffer();
+ Iterator j = propertyRefs.iterator();
+
+ for (String fragment : fragments)
+ {
+ if (fragment == null)
+ {
+ String propertyName = (String) j.next();
+
+ // try to get it from the project or keys
+ // Backward compatibility
+ String replacement = System.getProperty(propertyName);
+
+ if (replacement == null)
+ {
+ throw new PropertyException("Property ${" + propertyName +
+ "} has not been set");
+ }
+ fragment = replacement;
+ }
+ sb.append(fragment);
+ }
+
+ return sb.toString();
+ }
+
+ /**
+ * Default parsing method. Parses the supplied value for properties which are specified
+ * using ${foo} syntax. $X is left as is, and $$ specifies a single $.
+ * @param value the property string to parse
+ * @param fragments is populated with the string fragments. A null means "insert a
+ * property value here. The number of nulls in the list when populated is equal to the
+ * size of the propertyRefs list
+ * @param propertyRefs populated with the property names to be added into the final
+ * String.
+ */
+ private static void parsePropertyString(String value, ArrayList<String> fragments,
+ ArrayList<String> propertyRefs)
+ throws PropertyException
+ {
+ int prev = 0;
+ int pos;
+ //search for the next instance of $ from the 'prev' position
+ while ((pos = value.indexOf("$", prev)) >= 0)
+ {
+
+ //if there was any text before this, add it as a fragment
+ if (pos > 0)
+ {
+ fragments.add(value.substring(prev, pos));
+ }
+ //if we are at the end of the string, we tack on a $
+ //then move past it
+ if (pos == (value.length() - 1))
+ {
+ fragments.add("$");
+ prev = pos + 1;
+ }
+ else if (value.charAt(pos + 1) != '{')
+ {
+ //peek ahead to see if the next char is a property or not
+ //not a property: insert the char as a literal
+ if (value.charAt(pos + 1) == '$')
+ {
+ // two $ map to one $
+ fragments.add("$");
+ prev = pos + 2;
+ }
+ else
+ {
+ // $X maps to $X for all values of X!='$'
+ fragments.add(value.substring(pos, pos + 2));
+ prev = pos + 2;
+ }
+ }
+ else
+ {
+ // property found, extract its name or bail on a typo
+ int endName = value.indexOf('}', pos);
+ if (endName < 0)
+ {
+ throw new PropertyException("Syntax error in property: " +
+ value);
+ }
+ String propertyName = value.substring(pos + 2, endName);
+ fragments.add(null);
+ propertyRefs.add(propertyName);
+ prev = endName + 1;
+ }
+ }
+ //no more $ signs found
+ //if there is any tail to the file, append it
+ if (prev < value.length())
+ {
+ fragments.add(value.substring(prev));
+ }
+ }
+
+
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/configuration/PropertyUtils.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/exchange/ExchangeDefaults.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/exchange/ExchangeDefaults.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/exchange/ExchangeDefaults.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/exchange/ExchangeDefaults.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,33 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.exchange;
+
+public class ExchangeDefaults
+{
+ public final static String TOPIC_EXCHANGE_NAME = "amq.topic";
+
+ public final static String TOPIC_EXCHANGE_CLASS = "topic";
+
+ public final static String DIRECT_EXCHANGE_NAME = "amq.direct";
+
+ public final static String DIRECT_EXCHANGE_CLASS = "direct";
+
+ public final static String HEADERS_EXCHANGE_NAME = "amq.match";
+
+ public final static String HEADERS_EXCHANGE_CLASS = "headers";
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/exchange/ExchangeDefaults.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQBody.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQBody.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQBody.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,35 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+public abstract class AMQBody
+{
+ protected abstract byte getType();
+
+ /**
+ * Get the size of the body
+ * @return unsigned short
+ */
+ protected abstract int getSize();
+
+ protected abstract void writePayload(ByteBuffer buffer);
+
+ protected abstract void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException;
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQBody.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQDataBlock.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQDataBlock.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQDataBlock.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQDataBlock.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,40 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+/**
+ * A data block represents something that has a size in bytes and the ability to write itself to a byte
+ * buffer (similar to a byte array).
+ */
+public abstract class AMQDataBlock implements EncodableAMQDataBlock
+{
+ /**
+ * Get the size of buffer needed to store the byte representation of this
+ * frame.
+ * @return unsigned integer
+ */
+ public abstract long getSize();
+
+ /**
+ * Writes the datablock to the specified buffer.
+ * @param buffer
+ */
+ public abstract void writePayload(ByteBuffer buffer);
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQDataBlock.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQDataBlockDecoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQDataBlockDecoder.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQDataBlockDecoder.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQDataBlockDecoder.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,113 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class AMQDataBlockDecoder
+{
+ Logger _logger = Logger.getLogger(AMQDataBlockDecoder.class);
+
+ private final Map _supportedBodies = new HashMap();
+
+ public AMQDataBlockDecoder()
+ {
+ _supportedBodies.put(new Byte(AMQMethodBody.TYPE), AMQMethodBodyFactory.getInstance());
+ _supportedBodies.put(new Byte(ContentHeaderBody.TYPE), ContentHeaderBodyFactory.getInstance());
+ _supportedBodies.put(new Byte(ContentBody.TYPE), ContentBodyFactory.getInstance());
+ _supportedBodies.put(new Byte(HeartbeatBody.TYPE), new HeartbeatBodyFactory());
+ }
+
+ public boolean decodable(IoSession session, ByteBuffer in) throws AMQFrameDecodingException
+ {
+ // type, channel, body size and end byte
+ if (in.remaining() < (1 + 2 + 4 + 1))
+ {
+ return false;
+ }
+
+ final byte type = in.get();
+ final int channel = in.getUnsignedShort();
+ final long bodySize = in.getUnsignedInt();
+
+ // bodySize can be zero
+ if (type <= 0 || channel < 0 || bodySize < 0)
+ {
+ throw new AMQFrameDecodingException("Undecodable frame: type = " + type + " channel = " + channel +
+ " bodySize = " + bodySize);
+ }
+
+ if (in.remaining() < (bodySize + 1))
+ {
+ return false;
+ }
+ return true;
+ }
+
+ private boolean isSupportedFrameType(byte frameType)
+ {
+ final boolean result = _supportedBodies.containsKey(new Byte(frameType));
+
+ if (!result)
+ {
+ _logger.warn("AMQDataBlockDecoder does not handle frame type " + frameType);
+ }
+
+ return result;
+ }
+
+ protected Object createAndPopulateFrame(ByteBuffer in)
+ throws AMQFrameDecodingException
+ {
+ final byte type = in.get();
+ if (!isSupportedFrameType(type))
+ {
+ throw new AMQFrameDecodingException("Unsupported frame type: " + type);
+ }
+ final int channel = in.getUnsignedShort();
+ final long bodySize = in.getUnsignedInt();
+
+ BodyFactory bodyFactory = (BodyFactory) _supportedBodies.get(new Byte(type));
+ if (bodyFactory == null)
+ {
+ throw new AMQFrameDecodingException("Unsupported body type: " + type);
+ }
+ AMQFrame frame = new AMQFrame();
+
+ frame.populateFromBuffer(in, channel, bodySize, bodyFactory);
+
+ byte marker = in.get();
+ if ((marker & 0xFF) != 0xCE)
+ {
+ throw new AMQFrameDecodingException("End of frame marker not found. Read " + marker + " size=" + bodySize + " type=" + type);
+ }
+ return frame;
+ }
+
+ public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out)
+ throws Exception
+ {
+ out.write(createAndPopulateFrame(in));
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQDataBlockDecoder.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQDataBlockEncoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQDataBlockEncoder.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQDataBlockEncoder.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQDataBlockEncoder.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,62 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.codec.demux.MessageEncoder;
+import org.apache.mina.filter.codec.ProtocolEncoderOutput;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class AMQDataBlockEncoder implements MessageEncoder
+{
+ Logger _logger = Logger.getLogger(AMQDataBlockEncoder.class);
+
+ private Set _messageTypes;
+
+ public AMQDataBlockEncoder()
+ {
+ _messageTypes = new HashSet();
+ _messageTypes.add(EncodableAMQDataBlock.class);
+ }
+
+ public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception
+ {
+ final AMQDataBlock frame = (AMQDataBlock) message;
+ int frameSize = (int)frame.getSize();
+ final ByteBuffer buffer = ByteBuffer.allocate(frameSize);
+ //buffer.setAutoExpand(true);
+ frame.writePayload(buffer);
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Encoded frame byte-buffer is '" + EncodingUtils.convertToHexString(buffer) + "'");
+ }
+
+ buffer.flip();
+ out.write(buffer);
+ }
+
+ public Set getMessageTypes()
+ {
+ return _messageTypes;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQDataBlockEncoder.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQFrame.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQFrame.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQFrame.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQFrame.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,73 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
+{
+ public int channel;
+
+ public AMQBody bodyFrame;
+
+ public AMQFrame()
+ {
+ }
+
+ public AMQFrame(int channel, AMQBody bodyFrame)
+ {
+ this.channel = channel;
+ this.bodyFrame = bodyFrame;
+ }
+
+ public long getSize()
+ {
+ return 1 + 2 + 4 + bodyFrame.getSize() + 1;
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ buffer.put(bodyFrame.getType());
+ // TODO: how does channel get populated
+ EncodingUtils.writeUnsignedShort(buffer, channel);
+ EncodingUtils.writeUnsignedInteger(buffer, bodyFrame.getSize());
+ bodyFrame.writePayload(buffer);
+ buffer.put((byte) 0xCE);
+ }
+
+ /**
+ *
+ * @param buffer
+ * @param channel unsigned short
+ * @param bodySize unsigned integer
+ * @param bodyFactory
+ * @throws AMQFrameDecodingException
+ */
+ public void populateFromBuffer(ByteBuffer buffer, int channel, long bodySize, BodyFactory bodyFactory)
+ throws AMQFrameDecodingException
+ {
+ this.channel = channel;
+ bodyFrame = bodyFactory.createBody(buffer);
+ bodyFrame.populateFromBuffer(buffer, bodySize);
+ }
+
+ public String toString()
+ {
+ return "Frame channelId: " + channel + ", bodyFrame: " + String.valueOf(bodyFrame);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQFrame.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQFrameDecodingException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQFrameDecodingException.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQFrameDecodingException.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQFrameDecodingException.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,45 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+
+public class AMQFrameDecodingException extends AMQException
+{
+ public AMQFrameDecodingException(String message)
+ {
+ super(message);
+ }
+
+ public AMQFrameDecodingException(String message, Throwable t)
+ {
+ super(message, t);
+ }
+
+ public AMQFrameDecodingException(Logger log, String message)
+ {
+ super(log, message);
+ }
+
+ public AMQFrameDecodingException(Logger log, String message, Throwable t)
+ {
+ super(log, message, t);
+ }
+
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQFrameDecodingException.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQMethodBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQMethodBody.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQMethodBody.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQMethodBody.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,87 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQChannelException;
+
+public abstract class AMQMethodBody extends AMQBody
+{
+ public static final byte TYPE = 1;
+
+ /** unsigned short */
+ protected abstract int getBodySize();
+
+ /**
+ * @return unsigned short
+ */
+ protected abstract int getClazz();
+
+ /**
+ * @return unsigned short
+ */
+ protected abstract int getMethod();
+
+ protected abstract void writeMethodPayload(ByteBuffer buffer);
+
+ protected byte getType()
+ {
+ return TYPE;
+ }
+
+ protected int getSize()
+ {
+ return 2 + 2 + getBodySize();
+ }
+
+ protected void writePayload(ByteBuffer buffer)
+ {
+ EncodingUtils.writeUnsignedShort(buffer, getClazz());
+ EncodingUtils.writeUnsignedShort(buffer, getMethod());
+ writeMethodPayload(buffer);
+ }
+
+ protected abstract void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException;
+
+ protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException
+ {
+ populateMethodBodyFromBuffer(buffer);
+ }
+
+ public String toString()
+ {
+ StringBuffer buf = new StringBuffer(getClass().toString());
+ buf.append(" Class: ").append(getClazz());
+ buf.append(" Method: ").append(getMethod());
+ return buf.toString();
+ }
+
+ /**
+ * Creates an AMQChannelException for the corresponding body type (a channel exception
+ * should include the class and method ids of the body it resulted from).
+ */
+ public AMQChannelException getChannelException(int code, String message)
+ {
+ return new AMQChannelException(code, message, getClazz(), getMethod());
+ }
+
+ public AMQChannelException getChannelException(int code, String message, Throwable cause)
+ {
+ return new AMQChannelException(code, message, getClazz(), getMethod(), cause);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQMethodBody.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQMethodBodyFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQMethodBodyFactory.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQMethodBodyFactory.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQMethodBodyFactory.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,43 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+
+public class AMQMethodBodyFactory implements BodyFactory
+{
+ private static final Logger _log = Logger.getLogger(AMQMethodBodyFactory.class);
+
+ private static final AMQMethodBodyFactory _instance = new AMQMethodBodyFactory();
+
+ public static AMQMethodBodyFactory getInstance()
+ {
+ return _instance;
+ }
+
+ private AMQMethodBodyFactory()
+ {
+ _log.debug("Creating method body factory");
+ }
+
+ public AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException
+ {
+ return MethodBodyDecoderRegistry.get(in.getUnsignedShort(), in.getUnsignedShort());
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQMethodBodyFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQProtocolClassException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQProtocolClassException.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQProtocolClassException.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQProtocolClassException.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,26 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+public class AMQProtocolClassException extends AMQProtocolHeaderException
+{
+ public AMQProtocolClassException(String message)
+ {
+ super(message);
+ }
+}
\ No newline at end of file
Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQProtocolClassException.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQProtocolHeaderException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQProtocolHeaderException.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQProtocolHeaderException.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQProtocolHeaderException.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,28 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.qpid.AMQException;
+
+public class AMQProtocolHeaderException extends AMQException
+{
+ public AMQProtocolHeaderException(String message)
+ {
+ super(message);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQProtocolHeaderException.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQProtocolInstanceException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQProtocolInstanceException.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQProtocolInstanceException.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQProtocolInstanceException.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,26 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+public class AMQProtocolInstanceException extends AMQProtocolHeaderException
+{
+ public AMQProtocolInstanceException(String message)
+ {
+ super(message);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQProtocolInstanceException.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQProtocolVersionException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQProtocolVersionException.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQProtocolVersionException.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQProtocolVersionException.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,30 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+/**
+ * Exception that is thrown when the client and server differ on expected protocol version (header) information.
+ *
+ */
+public class AMQProtocolVersionException extends AMQProtocolHeaderException
+{
+ public AMQProtocolVersionException(String message)
+ {
+ super(message);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/AMQProtocolVersionException.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/BasicContentHeaderProperties.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/BasicContentHeaderProperties.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/BasicContentHeaderProperties.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/BasicContentHeaderProperties.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,592 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+
+public class BasicContentHeaderProperties implements ContentHeaderProperties
+{
+ private static final Logger _logger = Logger.getLogger(BasicContentHeaderProperties.class);
+
+ /**
+ * We store the encoded form when we decode the content header so that if we need to
+ * write it out without modifying it we can do so without incurring the expense of
+ * reencoding it
+ */
+ private byte[] _encodedForm;
+
+ /**
+ * Flag indicating whether the entire content header has been decoded yet
+ */
+ private boolean _decoded = true;
+
+ /**
+ * We have some optimisations for partial decoding for maximum performance. The headers are used in the broker
+ * for routing in some cases so we can decode that separately.
+ */
+ private boolean _decodedHeaders = true;
+
+ /**
+ * We have some optimisations for partial decoding for maximum performance. The content type is used by all
+ * clients to determine the message type
+ */
+ private boolean _decodedContentType = true;
+
+ private String _contentType;
+
+ private String _encoding;
+
+ private FieldTable _headers;
+
+ private byte _deliveryMode;
+
+ private byte _priority;
+
+ private String _correlationId;
+
+ private String _replyTo;
+
+ private long _expiration;
+
+ private String _messageId;
+
+ private long _timestamp;
+
+ private String _type;
+
+ private String _userId;
+
+ private String _appId;
+
+ private String _clusterId;
+
+ private int _propertyFlags = 0;
+
+ public BasicContentHeaderProperties()
+ {
+ }
+
+ public int getPropertyListSize()
+ {
+ if (_encodedForm != null)
+ {
+ return _encodedForm.length;
+ }
+ else
+ {
+ int size = 0;
+
+ if ((_propertyFlags & (1 << 15)) > 0)
+ {
+ size += EncodingUtils.encodedShortStringLength(_contentType);
+ }
+ if ((_propertyFlags & (1 << 14)) > 0)
+ {
+ size += EncodingUtils.encodedShortStringLength(_encoding);
+ }
+ if ((_propertyFlags & (1 << 13)) > 0)
+ {
+ size += EncodingUtils.encodedFieldTableLength(_headers);
+ }
+ if ((_propertyFlags & (1 << 12)) > 0)
+ {
+ size += 1;
+ }
+ if ((_propertyFlags & (1 << 11)) > 0)
+ {
+ size += 1;
+ }
+ if ((_propertyFlags & (1 << 10)) > 0)
+ {
+ size += EncodingUtils.encodedShortStringLength(_correlationId);
+ }
+ if ((_propertyFlags & (1 << 9)) > 0)
+ {
+ size += EncodingUtils.encodedShortStringLength(_replyTo);
+ }
+ if ((_propertyFlags & (1 << 8)) > 0)
+ {
+ size += EncodingUtils.encodedShortStringLength(String.valueOf(_expiration));
+ }
+ if ((_propertyFlags & (1 << 7)) > 0)
+ {
+ size += EncodingUtils.encodedShortStringLength(_messageId);
+ }
+ if ((_propertyFlags & (1 << 6)) > 0)
+ {
+ size += 8;
+ }
+ if ((_propertyFlags & (1 << 5)) > 0)
+ {
+ size += EncodingUtils.encodedShortStringLength(_type);
+ }
+ if ((_propertyFlags & (1 << 4)) > 0)
+ {
+ size += EncodingUtils.encodedShortStringLength(_userId);
+ }
+ if ((_propertyFlags & (1 << 3)) > 0)
+ {
+ size += EncodingUtils.encodedShortStringLength(_appId);
+ }
+ if ((_propertyFlags & (1 << 2)) > 0)
+ {
+ size += EncodingUtils.encodedShortStringLength(_clusterId);
+ }
+ return size;
+ }
+ }
+
+ private void clearEncodedForm()
+ {
+ if (!_decoded && _encodedForm != null)
+ {
+ //decode();
+ }
+ _encodedForm = null;
+ }
+
+ public void setPropertyFlags(int propertyFlags)
+ {
+ clearEncodedForm();
+ _propertyFlags = propertyFlags;
+ }
+
+ public int getPropertyFlags()
+ {
+ return _propertyFlags;
+ }
+
+ public void writePropertyListPayload(ByteBuffer buffer)
+ {
+ if (_encodedForm != null)
+ {
+ buffer.put(_encodedForm);
+ }
+ else
+ {
+ if ((_propertyFlags & (1 << 15)) > 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _contentType);
+ }
+ if ((_propertyFlags & (1 << 14)) > 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _encoding);
+ }
+ if ((_propertyFlags & (1 << 13)) > 0)
+ {
+ EncodingUtils.writeFieldTableBytes(buffer, _headers);
+ }
+ if ((_propertyFlags & (1 << 12)) > 0)
+ {
+ buffer.put(_deliveryMode);
+ }
+ if ((_propertyFlags & (1 << 11)) > 0)
+ {
+ buffer.put(_priority);
+ }
+ if ((_propertyFlags & (1 << 10)) > 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _correlationId);
+ }
+ if ((_propertyFlags & (1 << 9)) > 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _replyTo);
+ }
+ if ((_propertyFlags & (1 << 8)) > 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, String.valueOf(_expiration));
+ }
+ if ((_propertyFlags & (1 << 7)) > 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _messageId);
+ }
+ if ((_propertyFlags & (1 << 6)) > 0)
+ {
+ EncodingUtils.writeUnsignedInteger(buffer, 0/*timestamp msb*/);
+ EncodingUtils.writeUnsignedInteger(buffer, _timestamp);
+ }
+ if ((_propertyFlags & (1 << 5)) > 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _type);
+ }
+ if ((_propertyFlags & (1 << 4)) > 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _userId);
+ }
+ if ((_propertyFlags & (1 << 3)) > 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _appId);
+ }
+ if ((_propertyFlags & (1 << 2)) > 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _clusterId);
+ }
+ }
+ }
+
+ public void populatePropertiesFromBuffer(ByteBuffer buffer, int propertyFlags, int size)
+ throws AMQFrameDecodingException
+ {
+ _propertyFlags = propertyFlags;
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Property flags: " + _propertyFlags);
+ }
+ decode(buffer);
+ /*_encodedForm = new byte[size];
+ buffer.get(_encodedForm, 0, size);
+ _decoded = false;
+ _decodedHeaders = false;
+ _decodedContentType = false;*/
+ }
+
+ private void decode(ByteBuffer buffer)
+ {
+ //ByteBuffer buffer = ByteBuffer.wrap(_encodedForm);
+ int pos = buffer.position();
+ try
+ {
+ if ((_propertyFlags & (1 << 15)) > 0)
+ {
+ _contentType = EncodingUtils.readShortString(buffer);
+ }
+ if ((_propertyFlags & (1 << 14)) > 0)
+ {
+ _encoding = EncodingUtils.readShortString(buffer);
+ }
+ if ((_propertyFlags & (1 << 13)) > 0)
+ {
+ _headers = EncodingUtils.readFieldTable(buffer);
+ }
+ if ((_propertyFlags & (1 << 12)) > 0)
+ {
+ _deliveryMode = buffer.get();
+ }
+ if ((_propertyFlags & (1 << 11)) > 0)
+ {
+ _priority = buffer.get();
+ }
+ if ((_propertyFlags & (1 << 10)) > 0)
+ {
+ _correlationId = EncodingUtils.readShortString(buffer);
+ }
+ if ((_propertyFlags & (1 << 9)) > 0)
+ {
+ _replyTo = EncodingUtils.readShortString(buffer);
+ }
+ if ((_propertyFlags & (1 << 8)) > 0)
+ {
+ _expiration = Long.parseLong(EncodingUtils.readShortString(buffer));
+ }
+ if ((_propertyFlags & (1 << 7)) > 0)
+ {
+ _messageId = EncodingUtils.readShortString(buffer);
+ }
+ if ((_propertyFlags & (1 << 6)) > 0)
+ {
+ // Discard msb from AMQ timestamp
+ buffer.getUnsignedInt();
+ _timestamp = buffer.getUnsignedInt();
+ }
+ if ((_propertyFlags & (1 << 5)) > 0)
+ {
+ _type = EncodingUtils.readShortString(buffer);
+ }
+ if ((_propertyFlags & (1 << 4)) > 0)
+ {
+ _userId = EncodingUtils.readShortString(buffer);
+ }
+ if ((_propertyFlags & (1 << 3)) > 0)
+ {
+ _appId = EncodingUtils.readShortString(buffer);
+ }
+ if ((_propertyFlags & (1 << 2)) > 0)
+ {
+ _clusterId = EncodingUtils.readShortString(buffer);
+ }
+ }
+ catch (AMQFrameDecodingException e)
+ {
+ throw new RuntimeException("Error in content header data: " + e);
+ }
+
+ final int endPos = buffer.position();
+ buffer.position(pos);
+ final int len = endPos - pos;
+ _encodedForm = new byte[len];
+ final int limit = buffer.limit();
+ buffer.limit(endPos);
+ buffer.get(_encodedForm, 0, len);
+ buffer.limit(limit);
+ buffer.position(endPos);
+ _decoded = true;
+ }
+
+
+ private void decodeUpToHeaders()
+ {
+ ByteBuffer buffer = ByteBuffer.wrap(_encodedForm);
+ try
+ {
+ if ((_propertyFlags & (1 << 15)) > 0)
+ {
+ byte length = buffer.get();
+ buffer.skip(length);
+ }
+ if ((_propertyFlags & (1 << 14)) > 0)
+ {
+ byte length = buffer.get();
+ buffer.skip(length);
+ }
+ if ((_propertyFlags & (1 << 13)) > 0)
+ {
+ _headers = EncodingUtils.readFieldTable(buffer);
+ }
+ _decodedHeaders = true;
+ }
+ catch (AMQFrameDecodingException e)
+ {
+ throw new RuntimeException("Error in content header data: " + e);
+ }
+ }
+
+ private void decodeUpToContentType()
+ {
+ ByteBuffer buffer = ByteBuffer.wrap(_encodedForm);
+
+ if ((_propertyFlags & (1 << 15)) > 0)
+ {
+ _contentType = EncodingUtils.readShortString(buffer);
+ }
+
+ _decodedContentType = true;
+ }
+
+ private void decodeIfNecessary()
+ {
+ if (!_decoded)
+ {
+ //decode();
+ }
+ }
+
+ private void decodeHeadersIfNecessary()
+ {
+ if (!_decoded && !_decodedHeaders)
+ {
+ decodeUpToHeaders();
+ }
+ }
+
+ private void decodeContentTypeIfNecessary()
+ {
+ if (!_decoded && !_decodedContentType)
+ {
+ decodeUpToContentType();
+ }
+ }
+ public String getContentType()
+ {
+ decodeContentTypeIfNecessary();
+ return _contentType;
+ }
+
+ public void setContentType(String contentType)
+ {
+ clearEncodedForm();
+ _propertyFlags |= (1 << 15);
+ _contentType = contentType;
+ }
+
+ public String getEncoding()
+ {
+ decodeIfNecessary();
+ return _encoding;
+ }
+
+ public void setEncoding(String encoding)
+ {
+ clearEncodedForm();
+ _propertyFlags |= (1 << 14);
+ _encoding = encoding;
+ }
+
+ public FieldTable getHeaders()
+ {
+ decodeHeadersIfNecessary();
+ return _headers;
+ }
+
+ public void setHeaders(FieldTable headers)
+ {
+ clearEncodedForm();
+ _propertyFlags |= (1 << 13);
+ _headers = headers;
+ }
+
+ public byte getDeliveryMode()
+ {
+ decodeIfNecessary();
+ return _deliveryMode;
+ }
+
+ public void setDeliveryMode(byte deliveryMode)
+ {
+ clearEncodedForm();
+ _propertyFlags |= (1 << 12);
+ _deliveryMode = deliveryMode;
+ }
+
+ public byte getPriority()
+ {
+ decodeIfNecessary();
+ return _priority;
+ }
+
+ public void setPriority(byte priority)
+ {
+ clearEncodedForm();
+ _propertyFlags |= (1 << 11);
+ _priority = priority;
+ }
+
+ public String getCorrelationId()
+ {
+ decodeIfNecessary();
+ return _correlationId;
+ }
+
+ public void setCorrelationId(String correlationId)
+ {
+ clearEncodedForm();
+ _propertyFlags |= (1 << 10);
+ _correlationId = correlationId;
+ }
+
+ public String getReplyTo()
+ {
+ decodeIfNecessary();
+ return _replyTo;
+ }
+
+ public void setReplyTo(String replyTo)
+ {
+ clearEncodedForm();
+ _propertyFlags |= (1 << 9);
+ _replyTo = replyTo;
+ }
+
+ public long getExpiration()
+ {
+ decodeIfNecessary();
+ return _expiration;
+ }
+
+ public void setExpiration(long expiration)
+ {
+ clearEncodedForm();
+ _propertyFlags |= (1 << 8);
+ _expiration = expiration;
+ }
+
+
+ public String getMessageId()
+ {
+ decodeIfNecessary();
+ return _messageId;
+ }
+
+ public void setMessageId(String messageId)
+ {
+ clearEncodedForm();
+ _propertyFlags |= (1 << 7);
+ _messageId = messageId;
+ }
+
+ public long getTimestamp()
+ {
+ decodeIfNecessary();
+ return _timestamp;
+ }
+
+ public void setTimestamp(long timestamp)
+ {
+ clearEncodedForm();
+ _propertyFlags |= (1 << 6);
+ _timestamp = timestamp;
+ }
+
+ public String getType()
+ {
+ decodeIfNecessary();
+ return _type;
+ }
+
+ public void setType(String type)
+ {
+ clearEncodedForm();
+ _propertyFlags |= (1 << 5);
+ _type = type;
+ }
+
+ public String getUserId()
+ {
+ decodeIfNecessary();
+ return _userId;
+ }
+
+ public void setUserId(String userId)
+ {
+ clearEncodedForm();
+ _propertyFlags |= (1 << 4);
+ _userId = userId;
+ }
+
+ public String getAppId()
+ {
+ decodeIfNecessary();
+ return _appId;
+ }
+
+ public void setAppId(String appId)
+ {
+ clearEncodedForm();
+ _propertyFlags |= (1 << 3);
+ _appId = appId;
+ }
+
+ public String getClusterId()
+ {
+ decodeIfNecessary();
+ return _clusterId;
+ }
+
+ public void setClusterId(String clusterId)
+ {
+ clearEncodedForm();
+ _propertyFlags |= (1 << 2);
+ _clusterId = clusterId;
+ }
+
+ public String toString()
+ {
+ return "reply-to = " + _replyTo + " propertyFlags = " + _propertyFlags;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/BasicContentHeaderProperties.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/BodyFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/BodyFactory.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/BodyFactory.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/BodyFactory.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,28 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+
+/**
+ * Any class that is capable of turning a stream of bytes into an AMQ structure must implement this interface.
+ */
+public interface BodyFactory
+{
+ AMQBody createBody(ByteBuffer in) throws AMQFrameDecodingException;
+}
Propchange: incubator/qpid/trunk/qpid/java/common/src/org/apache/qpid/framing/BodyFactory.java
------------------------------------------------------------------------------
svn:eol-style = native