You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2006/09/20 00:07:25 UTC
svn commit: r447994 [21/46] - in /incubator/qpid/trunk/qpid: ./ cpp/
cpp/bin/ cpp/broker/ cpp/broker/inc/ cpp/broker/src/ cpp/broker/test/
cpp/client/ cpp/client/inc/ cpp/client/src/ cpp/client/test/ cpp/common/
cpp/common/concurrent/ cpp/common/concur...
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSBytesMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSBytesMessage.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSBytesMessage.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSBytesMessage.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,351 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.AMQException;
+import org.apache.mina.common.ByteBuffer;
+
+import javax.jms.JMSException;
+import javax.jms.MessageNotReadableException;
+import javax.jms.MessageNotWriteableException;
+import java.io.*;
+import java.nio.charset.Charset;
+import java.nio.charset.CharacterCodingException;
+
+public class JMSBytesMessage extends AbstractJMSMessage implements javax.jms.BytesMessage
+{
+ private static final String MIME_TYPE = "application/octet-stream";
+
+ private boolean _readable = false;
+
+ /**
+ * The default initial size of the buffer. The buffer expands automatically.
+ */
+ private static final int DEFAULT_BUFFER_INITIAL_SIZE = 1024;
+
+ JMSBytesMessage()
+ {
+ this(null);
+ }
+
+ /**
+ * Construct a bytes message with existing data.
+ * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is
+ * set to auto expand
+ */
+ JMSBytesMessage(ByteBuffer data)
+ {
+ super(data); // this instanties a content header
+ getJmsContentHeaderProperties().setContentType(MIME_TYPE);
+
+ if (_data == null)
+ {
+ _data = ByteBuffer.allocate(DEFAULT_BUFFER_INITIAL_SIZE);
+ _data.setAutoExpand(true);
+ }
+ _readable = (data != null);
+ }
+
+ JMSBytesMessage(long messageNbr, ByteBuffer data, ContentHeaderBody contentHeader)
+ throws AMQException
+ {
+ // TODO: this casting is ugly. Need to review whole ContentHeaderBody idea
+ super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, data);
+ getJmsContentHeaderProperties().setContentType(MIME_TYPE);
+ _readable = true;
+ }
+
+ public void clearBody() throws JMSException
+ {
+ _data.clear();
+ _readable = false;
+ }
+
+ public String toBodyString() throws JMSException
+ {
+ checkReadable();
+ try
+ {
+ return getText();
+ }
+ catch (IOException e)
+ {
+ throw new JMSException(e.toString());
+ }
+ }
+
+ /**
+ * We reset the stream before and after reading the data. This means that toString() will always output
+ * the entire message and also that the caller can then immediately start reading as if toString() had
+ * never been called.
+ * @return
+ * @throws IOException
+ */
+ private String getText() throws IOException
+ {
+ // this will use the default platform encoding
+ if (_data == null)
+ {
+ return null;
+ }
+ int pos = _data.position();
+ _data.rewind();
+ // one byte left is for the end of frame marker
+ if (_data.remaining() == 0)
+ {
+ // this is really redundant since pos must be zero
+ _data.position(pos);
+ return null;
+ }
+ else
+ {
+ String data = _data.getString(Charset.forName("UTF8").newDecoder());
+ _data.position(pos);
+ return data;
+ }
+ }
+
+ public String getMimeType()
+ {
+ return MIME_TYPE;
+ }
+
+ public long getBodyLength() throws JMSException
+ {
+ checkReadable();
+ return _data.limit();
+ }
+
+ private void checkReadable() throws MessageNotReadableException
+ {
+ if (!_readable)
+ {
+ throw new MessageNotReadableException("You need to call reset() to make the message readable");
+ }
+ }
+
+ private void checkWritable() throws MessageNotWriteableException
+ {
+ if (_readable)
+ {
+ throw new MessageNotWriteableException("You need to call clearBody() to make the message writable");
+ }
+ }
+
+ public boolean readBoolean() throws JMSException
+ {
+ checkReadable();
+ return _data.get() != 0;
+ }
+
+ public byte readByte() throws JMSException
+ {
+ checkReadable();
+ return _data.get();
+ }
+
+ public int readUnsignedByte() throws JMSException
+ {
+ checkReadable();
+ return _data.getUnsigned();
+ }
+
+ public short readShort() throws JMSException
+ {
+ checkReadable();
+ return _data.getShort();
+ }
+
+ public int readUnsignedShort() throws JMSException
+ {
+ checkReadable();
+ return _data.getUnsignedShort();
+ }
+
+ public char readChar() throws JMSException
+ {
+ checkReadable();
+ return _data.getChar();
+ }
+
+ public int readInt() throws JMSException
+ {
+ checkReadable();
+ return _data.getInt();
+ }
+
+ public long readLong() throws JMSException
+ {
+ checkReadable();
+ return _data.getLong();
+ }
+
+ public float readFloat() throws JMSException
+ {
+ checkReadable();
+ return _data.getFloat();
+ }
+
+ public double readDouble() throws JMSException
+ {
+ checkReadable();
+ return _data.getDouble();
+ }
+
+ public String readUTF() throws JMSException
+ {
+ checkReadable();
+ try
+ {
+ return _data.getString(Charset.forName("UTF-8").newDecoder());
+ }
+ catch (CharacterCodingException e)
+ {
+ JMSException je = new JMSException("Error decoding byte stream as a UTF8 string: " + e);
+ je.setLinkedException(e);
+ throw je;
+ }
+ }
+
+ public int readBytes(byte[] bytes) throws JMSException
+ {
+ if (bytes == null)
+ {
+ throw new IllegalArgumentException("byte array must not be null");
+ }
+ checkReadable();
+ int count = (_data.remaining() >= bytes.length ? bytes.length : _data.remaining());
+ _data.get(bytes, 0, count);
+ return count;
+ }
+
+ public int readBytes(byte[] bytes, int maxLength) throws JMSException
+ {
+ if (bytes == null)
+ {
+ throw new IllegalArgumentException("byte array must not be null");
+ }
+ if (maxLength > bytes.length)
+ {
+ throw new IllegalArgumentException("maxLength must be <= bytes.length");
+ }
+ checkReadable();
+ int count = (_data.remaining() >= maxLength ? maxLength : _data.remaining());
+ _data.get(bytes, 0, count);
+ return count;
+ }
+
+ public void writeBoolean(boolean b) throws JMSException
+ {
+ checkWritable();
+ _data.put(b?(byte)1:(byte)0);
+ }
+
+ public void writeByte(byte b) throws JMSException
+ {
+ checkWritable();
+ _data.put(b);
+ }
+
+ public void writeShort(short i) throws JMSException
+ {
+ checkWritable();
+ _data.putShort(i);
+ }
+
+ public void writeChar(char c) throws JMSException
+ {
+ checkWritable();
+ _data.putChar(c);
+ }
+
+ public void writeInt(int i) throws JMSException
+ {
+ checkWritable();
+ _data.putInt(i);
+ }
+
+ public void writeLong(long l) throws JMSException
+ {
+ checkWritable();
+ _data.putLong(l);
+ }
+
+ public void writeFloat(float v) throws JMSException
+ {
+ checkWritable();
+ _data.putFloat(v);
+ }
+
+ public void writeDouble(double v) throws JMSException
+ {
+ checkWritable();
+ _data.putDouble(v);
+ }
+
+ public void writeUTF(String string) throws JMSException
+ {
+ checkWritable();
+ try
+ {
+ _data.putString(string, Charset.forName("UTF-8").newEncoder());
+ }
+ catch (CharacterCodingException e)
+ {
+ JMSException ex = new JMSException("Unable to encode string: " + e);
+ ex.setLinkedException(e);
+ throw ex;
+ }
+ }
+
+ public void writeBytes(byte[] bytes) throws JMSException
+ {
+ checkWritable();
+ _data.put(bytes);
+ }
+
+ public void writeBytes(byte[] bytes, int offset, int length) throws JMSException
+ {
+ checkWritable();
+ _data.put(bytes, offset, length);
+ }
+
+ public void writeObject(Object object) throws JMSException
+ {
+ checkWritable();
+ if (object == null)
+ {
+ throw new NullPointerException("Argument must not be null");
+ }
+ _data.putObject(object);
+ }
+
+ public void reset() throws JMSException
+ {
+ //checkWritable();
+ _data.flip();
+ _readable = true;
+ }
+
+ public boolean isReadable()
+ {
+ return _readable;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSBytesMessage.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSBytesMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSBytesMessageFactory.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSBytesMessageFactory.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSBytesMessageFactory.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,37 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+import javax.jms.JMSException;
+
+public class JMSBytesMessageFactory extends AbstractJMSMessageFactory
+{
+ protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader) throws AMQException
+ {
+ return new JMSBytesMessage(deliveryTag, data, contentHeader);
+ }
+
+ public AbstractJMSMessage createMessage() throws JMSException
+ {
+ return new JMSBytesMessage();
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSBytesMessageFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSObjectMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSObjectMessage.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSObjectMessage.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSObjectMessage.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,175 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.AMQException;
+import org.apache.mina.common.ByteBuffer;
+
+import javax.jms.ObjectMessage;
+import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
+import javax.jms.MessageNotWriteableException;
+import java.io.*;
+import java.nio.charset.Charset;
+import java.nio.charset.CharacterCodingException;
+
+public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessage
+{
+ static final String MIME_TYPE = "application/java-object-stream";
+ private final boolean _readonly;
+
+ private static final int DEFAULT_BUFFER_SIZE = 1024;
+ /**
+ * Creates empty, writable message for use by producers
+ */
+ JMSObjectMessage()
+ {
+ this(null);
+ }
+
+ private JMSObjectMessage(ByteBuffer data)
+ {
+ super(data);
+ if (data == null)
+ {
+ _data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
+ _data.setAutoExpand(true);
+ }
+ _readonly = (data != null);
+ getJmsContentHeaderProperties().setContentType(MIME_TYPE);
+ }
+
+ /**
+ * Creates read only message for delivery to consumers
+ */
+ JMSObjectMessage(long messageNbr, ByteBuffer data, ContentHeaderBody contentHeader) throws AMQException
+ {
+ super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, data);
+ _readonly = data != null;
+ }
+
+ public void clearBody() throws JMSException
+ {
+ if (_data != null)
+ {
+ _data.release();
+ }
+ _data = null;
+ }
+
+ public String toBodyString() throws JMSException
+ {
+ return toString(_data);
+ }
+
+ public String getMimeType()
+ {
+ return MIME_TYPE;
+ }
+
+ public void setObject(Serializable serializable) throws JMSException
+ {
+ if (_readonly)
+ {
+ throw new MessageNotWriteableException("Message is not writable.");
+ }
+
+ if (_data == null)
+ {
+ _data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
+ _data.setAutoExpand(true);
+ }
+ try
+ {
+ ObjectOutputStream out = new ObjectOutputStream(_data.asOutputStream());
+ out.writeObject(serializable);
+ out.flush();
+ out.close();
+ _data.rewind();
+ }
+ catch (IOException e)
+ {
+ throw new MessageFormatException("Message not serializable: " + e);
+ }
+ }
+
+ public Serializable getObject() throws JMSException
+ {
+ ObjectInputStream in = null;
+ if (_data == null)
+ {
+ return null;
+ }
+
+ try
+ {
+ in = new ObjectInputStream(_data.asInputStream());
+ return (Serializable) in.readObject();
+ }
+ catch (IOException e)
+ {
+ throw new MessageFormatException("Could not deserialize message: " + e);
+ }
+ catch (ClassNotFoundException e)
+ {
+ throw new MessageFormatException("Could not deserialize message: " + e);
+ }
+ finally
+ {
+ _data.rewind();
+ close(in);
+ }
+ }
+
+ private static void close(InputStream in)
+ {
+ try
+ {
+ if (in != null)
+ {
+ in.close();
+ }
+ }
+ catch (IOException ignore)
+ {
+ }
+ }
+
+ private static String toString(ByteBuffer data)
+ {
+ if (data == null)
+ {
+ return null;
+ }
+ int pos = data.position();
+ try
+ {
+ return data.getString(Charset.forName("UTF8").newDecoder());
+ }
+ catch (CharacterCodingException e)
+ {
+ return null;
+ }
+ finally
+ {
+ data.position(pos);
+ }
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSObjectMessage.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSObjectMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSObjectMessageFactory.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSObjectMessageFactory.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSObjectMessageFactory.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,37 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+import javax.jms.JMSException;
+
+public class JMSObjectMessageFactory extends AbstractJMSMessageFactory
+{
+ protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader) throws AMQException
+ {
+ return new JMSObjectMessage(deliveryTag, data, contentHeader);
+ }
+
+ public AbstractJMSMessage createMessage() throws JMSException
+ {
+ return new JMSObjectMessage();
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSObjectMessageFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSTextMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSTextMessage.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSTextMessage.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSTextMessage.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,159 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.AMQException;
+import org.apache.mina.common.ByteBuffer;
+
+import javax.jms.JMSException;
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharacterCodingException;
+
+public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.TextMessage
+{
+ private static final String MIME_TYPE = "text/plain";
+
+ private String _decodedValue;
+
+ JMSTextMessage() throws JMSException
+ {
+ this(null, null);
+ }
+
+ JMSTextMessage(ByteBuffer data, String encoding) throws JMSException
+ {
+ super(data); // this instantiates a content header
+ getJmsContentHeaderProperties().setContentType(MIME_TYPE);
+ getJmsContentHeaderProperties().setEncoding(encoding);
+ }
+
+ JMSTextMessage(long deliveryTag, ByteBuffer data, BasicContentHeaderProperties contentHeader)
+ throws AMQException
+ {
+ super(deliveryTag, contentHeader, data);
+ contentHeader.setContentType(MIME_TYPE);
+ _data = data;
+ }
+
+ JMSTextMessage(ByteBuffer data) throws JMSException
+ {
+ this(data, null);
+ }
+
+ JMSTextMessage(String text) throws JMSException
+ {
+ super((ByteBuffer)null);
+ setText(text);
+ }
+
+ public void clearBody() throws JMSException
+ {
+ if (_data != null)
+ {
+ _data.release();
+ }
+ _data = null;
+ _decodedValue = null;
+ }
+
+ public String toBodyString() throws JMSException
+ {
+ return getText();
+ }
+
+ public void setData(ByteBuffer data)
+ {
+ _data = data;
+ }
+
+ public String getMimeType()
+ {
+ return MIME_TYPE;
+ }
+
+ public void setText(String string) throws JMSException
+ {
+ clearBody();
+ try
+ {
+ _data = ByteBuffer.allocate(string.length());
+ _data.limit(string.length());
+ //_data.sweep();
+ _data.setAutoExpand(true);
+ if (getJmsContentHeaderProperties().getEncoding() == null)
+ {
+ _data.put(string.getBytes());
+ }
+ else
+ {
+ _data.put(string.getBytes(getJmsContentHeaderProperties().getEncoding()));
+ }
+ _decodedValue = string;
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ // should never occur
+ throw new JMSException("Unable to decode string data");
+ }
+ }
+
+ public String getText() throws JMSException
+ {
+ if (_data == null && _decodedValue == null)
+ {
+ return null;
+ }
+ else if (_decodedValue != null)
+ {
+ return _decodedValue;
+ }
+ else
+ {
+ _data.rewind();
+ if (getJmsContentHeaderProperties().getEncoding() != null)
+ {
+ try
+ {
+ _decodedValue = _data.getString(Charset.forName(getJmsContentHeaderProperties().getEncoding()).newDecoder());
+ }
+ catch (CharacterCodingException e)
+ {
+ JMSException je = new JMSException("Could not decode string data: " + e);
+ je.setLinkedException(e);
+ throw je;
+ }
+ }
+ else
+ {
+ try
+ {
+ _decodedValue = _data.getString(Charset.defaultCharset().newDecoder());
+ }
+ catch (CharacterCodingException e)
+ {
+ JMSException je = new JMSException("Could not decode string data: " + e);
+ je.setLinkedException(e);
+ throw je;
+ }
+ }
+ return _decodedValue;
+ }
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSTextMessage.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSTextMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSTextMessageFactory.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSTextMessageFactory.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSTextMessageFactory.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,39 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+import javax.jms.JMSException;
+
+public class JMSTextMessageFactory extends AbstractJMSMessageFactory
+{
+
+ public AbstractJMSMessage createMessage() throws JMSException
+ {
+ return new JMSTextMessage();
+ }
+
+ protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader) throws AMQException
+ {
+ return new JMSTextMessage(deliveryTag, data, (BasicContentHeaderProperties)contentHeader.properties);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSTextMessageFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/MessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/MessageFactory.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/MessageFactory.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/MessageFactory.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,35 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+import javax.jms.JMSException;
+import java.util.List;
+
+
+public interface MessageFactory
+{
+ AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
+ ContentHeaderBody contentHeader,
+ List bodies)
+ throws JMSException, AMQException;
+
+ AbstractJMSMessage createMessage() throws JMSException;
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/MessageFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/MessageFactoryRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/MessageFactoryRegistry.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/MessageFactoryRegistry.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/MessageFactoryRegistry.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,105 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+import javax.jms.JMSException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.List;
+
+public class MessageFactoryRegistry
+{
+ private final Map _mimeToFactoryMap = new HashMap();
+
+ public void registerFactory(String mimeType, MessageFactory mf)
+ {
+ if (mf == null)
+ {
+ throw new IllegalArgumentException("Message factory must not be null");
+ }
+ _mimeToFactoryMap.put(mimeType, mf);
+ }
+
+ public MessageFactory deregisterFactory(String mimeType)
+ {
+ return (MessageFactory) _mimeToFactoryMap.remove(mimeType);
+ }
+
+ /**
+ * Create a message. This looks up the MIME type from the content header and instantiates the appropriate
+ * concrete message type.
+ * @param deliveryTag the AMQ message id
+ * @param redelivered true if redelivered
+ * @param contentHeader the content header that was received
+ * @param bodies a list of ContentBody instances
+ * @return the message.
+ * @throws AMQException
+ * @throws JMSException
+ */
+ public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
+ ContentHeaderBody contentHeader,
+ List bodies) throws AMQException, JMSException
+ {
+ BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.properties;
+ MessageFactory mf = (MessageFactory) _mimeToFactoryMap.get(properties.getContentType());
+ if (mf == null)
+ {
+ throw new AMQException("Unsupport MIME type of " + properties.getContentType());
+ }
+ else
+ {
+ return mf.createMessage(deliveryTag, redelivered, contentHeader, bodies);
+ }
+ }
+
+ public AbstractJMSMessage createMessage(String mimeType) throws AMQException, JMSException
+ {
+ if (mimeType == null)
+ {
+ throw new IllegalArgumentException("Mime type must not be null");
+ }
+ MessageFactory mf = (MessageFactory) _mimeToFactoryMap.get(mimeType);
+ if (mf == null)
+ {
+ throw new AMQException("Unsupport MIME type of " + mimeType);
+ }
+ else
+ {
+ return mf.createMessage();
+ }
+ }
+
+ /**
+ * Construct a new registry with the default message factories registered
+ * @return a message factory registry
+ */
+ public static MessageFactoryRegistry newDefaultRegistry()
+ {
+ MessageFactoryRegistry mf = new MessageFactoryRegistry();
+ mf.registerFactory("text/plain", new JMSTextMessageFactory());
+ mf.registerFactory("text/xml", new JMSTextMessageFactory());
+ mf.registerFactory("application/octet-stream", new JMSBytesMessageFactory());
+ mf.registerFactory(JMSObjectMessage.MIME_TYPE, new JMSObjectMessageFactory());
+ mf.registerFactory(null, new JMSBytesMessageFactory());
+ return mf;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/MessageFactoryRegistry.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/UnexpectedBodyReceivedException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/UnexpectedBodyReceivedException.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/UnexpectedBodyReceivedException.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/UnexpectedBodyReceivedException.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,40 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import org.apache.qpid.AMQException;
+import org.apache.log4j.Logger;
+
+public class UnexpectedBodyReceivedException extends AMQException
+{
+
+ public UnexpectedBodyReceivedException(Logger logger, String msg, Throwable t)
+ {
+ super(logger, msg, t);
+ }
+
+ public UnexpectedBodyReceivedException(Logger logger, String msg)
+ {
+ super(logger, msg);
+ }
+
+ public UnexpectedBodyReceivedException(Logger logger, int errorCode, String msg)
+ {
+ super(logger, errorCode, msg);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/UnexpectedBodyReceivedException.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/UnprocessedMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/UnprocessedMessage.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/UnprocessedMessage.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/UnprocessedMessage.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,61 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import org.apache.qpid.framing.*;
+
+import java.util.List;
+import java.util.LinkedList;
+
+/**
+ * This class contains everything needed to process a JMS message. It assembles the
+ * deliver body, the content header and the content body/ies.
+ *
+ * Note that the actual work of creating a JMS message for the client code's use is done
+ * outside of the MINA dispatcher thread in order to minimise the amount of work done in
+ * the MINA dispatcher thread.
+ *
+ */
+public class UnprocessedMessage
+{
+ private long _bytesReceived = 0;
+
+ public BasicDeliverBody deliverBody;
+ public BasicReturnBody bounceBody; // TODO: check change (gustavo)
+ public int channelId;
+ public ContentHeaderBody contentHeader;
+
+ /**
+ * List of ContentBody instances. Due to fragmentation you don't know how big this will be in general
+ */
+ public List bodies = new LinkedList();
+
+ public void receiveBody(ContentBody body) throws UnexpectedBodyReceivedException
+ {
+ bodies.add(body);
+ if (body.payload != null)
+ {
+ _bytesReceived += body.payload.remaining();
+ }
+ }
+
+ public boolean isAllBodyDataReceived()
+ {
+ return _bytesReceived == contentHeader.bodySize;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/UnprocessedMessage.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQMethodEvent.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQMethodEvent.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQMethodEvent.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQMethodEvent.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,59 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.protocol;
+
+import org.apache.qpid.framing.AMQMethodBody;
+
+public class AMQMethodEvent
+{
+ private AMQMethodBody _method;
+
+ private int _channelId;
+
+ private AMQProtocolSession _protocolSession;
+
+ public AMQMethodEvent(int channelId, AMQMethodBody method, AMQProtocolSession protocolSession)
+ {
+ _channelId = channelId;
+ _method = method;
+ _protocolSession = protocolSession;
+ }
+
+ public AMQMethodBody getMethod()
+ {
+ return _method;
+ }
+
+ public int getChannelId()
+ {
+ return _channelId;
+ }
+
+ public AMQProtocolSession getProtocolSession()
+ {
+ return _protocolSession;
+ }
+
+ public String toString()
+ {
+ StringBuffer buf = new StringBuffer("Method event: ");
+ buf.append("\nChannel id: ").append(_channelId);
+ buf.append("\nMethod: ").append(_method);
+ return buf.toString();
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQMethodEvent.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQMethodListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQMethodListener.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQMethodListener.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQMethodListener.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,41 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.protocol;
+
+import org.apache.qpid.AMQException;
+
+public interface AMQMethodListener
+{
+ /**
+ * Invoked when a method frame has been received
+ * @param evt the event
+ * @return true if the handler has processed the method frame, false otherwise. Note
+ * that this does not prohibit the method event being delivered to subsequent listeners
+ * but can be used to determine if nobody has dealt with an incoming method frame.
+ * @throws AMQException if an error has occurred. This exception will be delivered
+ * to all registered listeners using the error() method (see below) allowing them to
+ * perform cleanup if necessary.
+ */
+ boolean methodReceived(AMQMethodEvent evt) throws AMQException;
+
+ /**
+ * Callback when an error has occurred. Allows listeners to clean up.
+ * @param e
+ */
+ void error(Exception e);
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQMethodListener.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQProtocolHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQProtocolHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQProtocolHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,530 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.protocol;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.SSLFilter;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.qpid.AMQDisconnectedException;
+import org.apache.qpid.AMQException;
+
+import org.apache.qpid.AMQConnectionClosedException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.failover.FailoverHandler;
+import org.apache.qpid.client.failover.FailoverState;
+
+import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
+import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.ssl.BogusSSLContextFactory;
+
+import java.util.Iterator;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+
+public class AMQProtocolHandler extends IoHandlerAdapter
+{
+ private static final Logger _logger = Logger.getLogger(AMQProtocolHandler.class);
+
+ /**
+ * The connection that this protocol handler is associated with. There is a 1-1
+ * mapping between connection instances and protocol handler instances.
+ */
+ private AMQConnection _connection;
+
+ /**
+ * Used only when determining whether to add the SSL filter or not. This should be made more
+ * generic in future since we will potentially have many transport layer options
+ */
+ private boolean _useSSL;
+
+ /**
+ * Our wrapper for a protocol session that provides access to session values
+ * in a typesafe manner.
+ */
+ private volatile AMQProtocolSession _protocolSession;
+
+ private AMQStateManager _stateManager = new AMQStateManager();
+
+ private final CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet();
+
+ /**
+ * We create the failover handler when the session is created since it needs a reference to the IoSession in order
+ * to be able to send errors during failover back to the client application. The session won't be available in the
+ * case where we failing over due to a Connection.Redirect message from the broker.
+ */
+ private FailoverHandler _failoverHandler;
+
+ /**
+ * This flag is used to track whether failover is being attempted. It is used to prevent the application constantly
+ * attempting failover where it is failing.
+ */
+ private FailoverState _failoverState = FailoverState.NOT_STARTED;
+
+ private CountDownLatch _failoverLatch;
+
+ public AMQProtocolHandler(AMQConnection con)
+ {
+ _connection = con;
+
+ // We add a proxy for the state manager so that we can substitute the state manager easily in this class.
+ // We substitute the state manager when performing failover
+ _frameListeners.add(new AMQMethodListener()
+ {
+ public boolean methodReceived(AMQMethodEvent evt) throws AMQException
+ {
+ return _stateManager.methodReceived(evt);
+ }
+
+ public void error(Exception e)
+ {
+ _stateManager.error(e);
+ }
+ });
+ }
+
+ public boolean isUseSSL()
+ {
+ return _useSSL;
+ }
+
+ public void setUseSSL(boolean useSSL)
+ {
+ _useSSL = useSSL;
+ }
+
+ public void sessionCreated(IoSession session) throws Exception
+ {
+ _logger.debug("Protocol session created for session " + System.identityHashCode(session));
+ _failoverHandler = new FailoverHandler(this, session);
+
+ final ProtocolCodecFilter pcf = new ProtocolCodecFilter(new AMQCodecFactory(false));
+
+ if (Boolean.getBoolean("amqj.shared_read_write_pool"))
+ {
+ session.getFilterChain().addBefore("AsynchronousWriteFilter", "protocolFilter", pcf);
+ }
+ else
+ {
+ session.getFilterChain().addLast("protocolFilter", pcf);
+ }
+ // we only add the SSL filter where we have an SSL connection
+ if (_useSSL)
+ {
+ //todo FIXME: Bogus context cannot be used in production.
+ SSLFilter sslFilter = new SSLFilter(BogusSSLContextFactory.getInstance(false));
+ sslFilter.setUseClientMode(true);
+ session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter);
+ }
+
+ _protocolSession = new AMQProtocolSession(this, session, _connection);
+ _protocolSession.init();
+ }
+
+ public void sessionOpened(IoSession session) throws Exception
+ {
+ System.setProperty("foo", "bar");
+ }
+
+ /**
+ * When the broker connection dies we can either get sessionClosed() called or exceptionCaught() followed by
+ * sessionClosed() depending on whether we were trying to send data at the time of failure.
+ *
+ * @param session
+ * @throws Exception
+ */
+ public void sessionClosed(IoSession session) throws Exception
+ {
+ //todo server just closes session with no warning if auth fails.
+ if (_connection.isClosed())
+ {
+ _logger.info("Session closed called by client");
+ }
+ else
+ {
+ _logger.info("Session closed called with failover state currently " + _failoverState);
+
+ //reconnetablility was introduced here so as not to disturb the client as they have made their intentions
+ // known through the policy settings.
+
+ if ((_failoverState != FailoverState.IN_PROGRESS) && _connection.failoverAllowed())
+ {
+ _logger.info("FAILOVER STARTING");
+ if (_failoverState == FailoverState.NOT_STARTED)
+ {
+ _failoverState = FailoverState.IN_PROGRESS;
+ startFailoverThread();
+ }
+ else
+ {
+ _logger.info("Not starting failover as state currently " + _failoverState);
+ }
+ }
+ else
+ {
+ _logger.info("Failover not allowed by policy.");
+
+ if (_failoverState != FailoverState.IN_PROGRESS)
+ {
+ _logger.info("sessionClose() not allowed to failover");
+ _connection.exceptionReceived(
+ new AMQDisconnectedException("Server closed connection and reconnection " +
+ "not permitted."));
+ }
+ else
+ {
+ _logger.info("sessionClose() failover in progress");
+ }
+ }
+ }
+
+ _logger.info("Protocol Session [" + this + "] closed");
+ }
+
+ /**
+ * See {@link FailoverHandler} to see rationale for separate thread.
+ */
+ private void startFailoverThread()
+ {
+ Thread failoverThread = new Thread(_failoverHandler);
+ failoverThread.setName("Failover");
+ // Do not inherit daemon-ness from current thread as this can be a daemon
+ // thread such as a AnonymousIoService thread.
+ failoverThread.setDaemon(false);
+ failoverThread.start();
+ }
+
+ public void sessionIdle(IoSession session, IdleStatus status) throws Exception
+ {
+ _logger.debug("Protocol Session [" + this + ":" + session + "] idle: " + status);
+ if (IdleStatus.WRITER_IDLE.equals(status))
+ {
+ //write heartbeat frame:
+ _logger.debug("Sent heartbeat");
+ session.write(HeartbeatBody.FRAME);
+ HeartbeatDiagnostics.sent();
+ }
+ else if (IdleStatus.READER_IDLE.equals(status))
+ {
+ //failover:
+ HeartbeatDiagnostics.timeout();
+ _logger.warn("Timed out while waiting for heartbeat from peer.");
+ session.close();
+ }
+ }
+
+ public void exceptionCaught(IoSession session, Throwable cause) throws Exception
+ {
+ if (_failoverState == FailoverState.NOT_STARTED)
+ {
+ //if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException)))
+ if (cause instanceof AMQConnectionClosedException)
+ {
+ _logger.info("Exception caught therefore going to attempt failover: " + cause, cause);
+ // this will attemp failover
+
+ sessionClosed(session);
+ }
+ }
+ // we reach this point if failover was attempted and failed therefore we need to let the calling app
+ // know since we cannot recover the situation
+ else if (_failoverState == FailoverState.FAILED)
+ {
+ _logger.error("Exception caught by protocol handler: " + cause, cause);
+ // we notify the state manager of the error in case we have any clients waiting on a state
+ // change. Those "waiters" will be interrupted and can handle the exception
+ AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
+ propagateExceptionToWaiters(amqe);
+ _connection.exceptionReceived(cause);
+ }
+ }
+
+ /**
+ * There are two cases where we have other threads potentially blocking for events to be handled by this
+ * class. These are for the state manager (waiting for a state change) or a frame listener (waiting for a
+ * particular type of frame to arrive). When an error occurs we need to notify these waiters so that they can
+ * react appropriately.
+ *
+ * @param e the exception to propagate
+ */
+ public void propagateExceptionToWaiters(Exception e)
+ {
+ _stateManager.error(e);
+ final Iterator it = _frameListeners.iterator();
+ while (it.hasNext())
+ {
+ final AMQMethodListener ml = (AMQMethodListener) it.next();
+ ml.error(e);
+ }
+ }
+
+ private static int _messageReceivedCount;
+
+ public void messageReceived(IoSession session, Object message) throws Exception
+ {
+
+ if (_messageReceivedCount++ % 1000 == 0)
+ {
+ _logger.debug("Received " + _messageReceivedCount + " protocol messages");
+ }
+ Iterator it = _frameListeners.iterator();
+ AMQFrame frame = (AMQFrame) message;
+
+ HeartbeatDiagnostics.received(frame.bodyFrame instanceof HeartbeatBody);
+
+ if (frame.bodyFrame instanceof AMQMethodBody)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Method frame received: " + frame);
+ }
+
+ final AMQMethodEvent evt = new AMQMethodEvent(frame.channel, (AMQMethodBody)frame.bodyFrame, _protocolSession);
+ try
+ {
+ boolean wasAnyoneInterested = false;
+ while (it.hasNext())
+ {
+ final AMQMethodListener listener = (AMQMethodListener) it.next();
+ wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
+ }
+ if (!wasAnyoneInterested)
+ {
+ throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener.");
+ }
+ }
+ catch (AMQException e)
+ {
+ it = _frameListeners.iterator();
+ while (it.hasNext())
+ {
+ final AMQMethodListener listener = (AMQMethodListener) it.next();
+ listener.error(e);
+ }
+ exceptionCaught(session, e);
+ }
+ }
+ else if (frame.bodyFrame instanceof ContentHeaderBody)
+ {
+ _protocolSession.messageContentHeaderReceived(frame.channel,
+ (ContentHeaderBody) frame.bodyFrame);
+ }
+ else if (frame.bodyFrame instanceof ContentBody)
+ {
+ _protocolSession.messageContentBodyReceived(frame.channel,
+ (ContentBody) frame.bodyFrame);
+ }
+ else if (frame.bodyFrame instanceof HeartbeatBody)
+ {
+ _logger.debug("Received heartbeat");
+ }
+ _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
+ }
+
+ private static int _messagesOut;
+
+ public void messageSent(IoSession session, Object message) throws Exception
+ {
+ if (_messagesOut++ % 1000 == 0)
+ {
+ _logger.debug("Sent " + _messagesOut + " protocol messages");
+ }
+ _connection.bytesSent(session.getWrittenBytes());
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Sent frame " + message);
+ }
+ }
+
+ public void addFrameListener(AMQMethodListener listener)
+ {
+ _frameListeners.add(listener);
+ }
+
+ public void removeFrameListener(AMQMethodListener listener)
+ {
+ _frameListeners.remove(listener);
+ }
+
+ public void attainState(AMQState s) throws AMQException
+ {
+ _stateManager.attainState(s);
+ }
+
+ /**
+ * Convenience method that writes a frame to the protocol session. Equivalent
+ * to calling getProtocolSession().write().
+ *
+ * @param frame the frame to write
+ */
+ public void writeFrame(AMQDataBlock frame)
+ {
+ _protocolSession.writeFrame(frame);
+ }
+
+ public void writeFrame(AMQDataBlock frame, boolean wait)
+ {
+ _protocolSession.writeFrame(frame, wait);
+ }
+
+ /**
+ * Convenience method that writes a frame to the protocol session and waits for
+ * a particular response. Equivalent to calling getProtocolSession().write() then
+ * waiting for the response.
+ *
+ * @param frame
+ * @param listener the blocking listener. Note the calling thread will block.
+ */
+ private AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame,
+ BlockingMethodFrameListener listener)
+ throws AMQException
+ {
+ _frameListeners.add(listener);
+ _protocolSession.writeFrame(frame);
+ return listener.blockForFrame();
+ // When control resumes before this line, a reply will have been received
+ // that matches the criteria defined in the blocking listener
+ }
+
+ /**
+ * More convenient method to write a frame and wait for it's response.
+ */
+ public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) throws AMQException
+ {
+ return writeCommandFrameAndWaitForReply(frame,
+ new SpecificMethodFrameListener(frame.channel, responseClass));
+ }
+
+ /**
+ * Convenience method to register an AMQSession with the protocol handler. Registering
+ * a session with the protocol handler will ensure that messages are delivered to the
+ * consumer(s) on that session.
+ *
+ * @param channelId the channel id of the session
+ * @param session the session instance.
+ */
+ public void addSessionByChannel(int channelId, AMQSession session)
+ {
+ _protocolSession.addSessionByChannel(channelId, session);
+ }
+
+ /**
+ * Convenience method to deregister an AMQSession with the protocol handler.
+ *
+ * @param channelId then channel id of the session
+ */
+ public void removeSessionByChannel(int channelId)
+ {
+ _protocolSession.removeSessionByChannel(channelId);
+ }
+
+ public void closeSession(AMQSession session) throws AMQException
+ {
+ _protocolSession.closeSession(session);
+ }
+
+ public void closeConnection() throws AMQException
+ {
+ _stateManager.changeState(AMQState.CONNECTION_CLOSING);
+
+ final AMQFrame frame = ConnectionCloseBody.createAMQFrame(
+ 0, AMQConstant.REPLY_SUCCESS.getCode(), "JMS client is closing the connection.", 0, 0);
+ syncWrite(frame, ConnectionCloseOkBody.class);
+
+ _protocolSession.closeProtocolSession();
+ }
+
+ /**
+ * @return the number of bytes read from this protocol session
+ */
+ public long getReadBytes()
+ {
+ return _protocolSession.getIoSession().getReadBytes();
+ }
+
+ /**
+ * @return the number of bytes written to this protocol session
+ */
+ public long getWrittenBytes()
+ {
+ return _protocolSession.getIoSession().getWrittenBytes();
+ }
+
+ public void failover(String host, int port)
+ {
+ _failoverHandler.setHost(host);
+ _failoverHandler.setPort(port);
+ // see javadoc for FailoverHandler to see rationale for separate thread
+ startFailoverThread();
+ }
+
+ public void blockUntilNotFailingOver() throws InterruptedException
+ {
+ if (_failoverLatch != null)
+ {
+ _failoverLatch.await();
+ }
+ }
+
+ public String generateQueueName()
+ {
+ return _protocolSession.generateQueueName();
+ }
+
+ public CountDownLatch getFailoverLatch()
+ {
+ return _failoverLatch;
+ }
+
+ public void setFailoverLatch(CountDownLatch failoverLatch)
+ {
+ _failoverLatch = failoverLatch;
+ }
+
+ public AMQConnection getConnection()
+ {
+ return _connection;
+ }
+
+ public AMQStateManager getStateManager()
+ {
+ return _stateManager;
+ }
+
+ public void setStateManager(AMQStateManager stateManager)
+ {
+ _stateManager = stateManager;
+ }
+
+ FailoverState getFailoverState()
+ {
+ return _failoverState;
+ }
+
+ public void setFailoverState(FailoverState failoverState)
+ {
+ _failoverState = failoverState;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQProtocolHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,379 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.protocol;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.CloseFuture;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.WriteFuture;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.ConnectionTuneParameters;
+import org.apache.qpid.client.message.UnexpectedBodyReceivedException;
+import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersionList;
+
+import javax.jms.JMSException;
+import javax.security.sasl.SaslClient;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Wrapper for protocol session that provides type-safe access to session attributes.
+ *
+ * The underlying protocol session is still available but clients should not
+ * use it to obtain session attributes.
+ */
+public class AMQProtocolSession implements ProtocolVersionList
+{
+ private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class);
+
+ public static final String PROTOCOL_INITIATION_RECEIVED = "ProtocolInitiatiionReceived";
+
+ protected static final String CONNECTION_TUNE_PARAMETERS = "ConnectionTuneParameters";
+
+ private static final String AMQ_CONNECTION = "AMQConnection";
+
+ private static final String SASL_CLIENT = "SASLClient";
+
+ private final IoSession _minaProtocolSession;
+
+ /**
+ * The handler from which this session was created and which is used to handle protocol events.
+ * We send failover events to the handler.
+ */
+ private final AMQProtocolHandler _protocolHandler;
+
+ /**
+ * Maps from the channel id to the AMQSession that it represents.
+ */
+ private ConcurrentMap _channelId2SessionMap = new ConcurrentHashMap();
+
+ private ConcurrentMap _closingChannels = new ConcurrentHashMap();
+
+ /**
+ * Maps from a channel id to an unprocessed message. This is used to tie together the
+ * JmsDeliverBody (which arrives first) with the subsequent content header and content bodies.
+ */
+ private ConcurrentMap _channelId2UnprocessedMsgMap = new ConcurrentHashMap();
+
+ /**
+ * Counter to ensure unique queue names
+ */
+ private int _queueId = 1;
+ private final Object _queueIdLock = new Object();
+
+ public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
+ {
+ _protocolHandler = protocolHandler;
+ _minaProtocolSession = protocolSession;
+ // properties of the connection are made available to the event handlers
+ _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
+ }
+
+ public void init()
+ {
+ // start the process of setting up the connection. This is the first place that
+ // data is written to the server.
+ /* Find last protocol version in protocol version list. Make sure last protocol version
+ listed in the build file (build-module.xml) is the latest version which will be used
+ here. */
+ int i = pv.length - 1;
+ _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]));
+ }
+
+ public String getClientID()
+ {
+ try
+ {
+ return getAMQConnection().getClientID();
+ }
+ catch (JMSException e)
+ {
+ // we never throw a JMSException here
+ return null;
+ }
+ }
+
+ public void setClientID(String clientID) throws JMSException
+ {
+ getAMQConnection().setClientID(clientID);
+ }
+
+ public String getVirtualHost()
+ {
+ return getAMQConnection().getVirtualHost();
+ }
+
+ public String getUsername()
+ {
+ return getAMQConnection().getUsername();
+ }
+
+ public String getPassword()
+ {
+ return getAMQConnection().getPassword();
+ }
+
+ public IoSession getIoSession()
+ {
+ return _minaProtocolSession;
+ }
+
+ public SaslClient getSaslClient()
+ {
+ return (SaslClient) _minaProtocolSession.getAttribute(SASL_CLIENT);
+ }
+
+ /**
+ * Store the SASL client currently being used for the authentication handshake
+ * @param client if non-null, stores this in the session. if null clears any existing client
+ * being stored
+ */
+ public void setSaslClient(SaslClient client)
+ {
+ if (client == null)
+ {
+ _minaProtocolSession.removeAttribute(SASL_CLIENT);
+ }
+ else
+ {
+ _minaProtocolSession.setAttribute(SASL_CLIENT, client);
+ }
+ }
+
+ public ConnectionTuneParameters getConnectionTuneParameters()
+ {
+ return (ConnectionTuneParameters) _minaProtocolSession.getAttribute(CONNECTION_TUNE_PARAMETERS);
+ }
+
+ public void setConnectionTuneParameters(ConnectionTuneParameters params)
+ {
+ _minaProtocolSession.setAttribute(CONNECTION_TUNE_PARAMETERS, params);
+ AMQConnection con = getAMQConnection();
+ con.setMaximumChannelCount(params.getChannelMax());
+ con.setMaximumFrameSize(params.getFrameMax());
+ initHeartbeats((int) params.getHeartbeat());
+ }
+
+ /**
+ * Callback invoked from the BasicDeliverMethodHandler when a message has been received.
+ * This is invoked on the MINA dispatcher thread.
+ * @param message
+ * @throws AMQException if this was not expected
+ */
+ public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException
+ {
+ _channelId2UnprocessedMsgMap.put(message.channelId, message);
+ }
+
+ public void messageContentHeaderReceived(int channelId, ContentHeaderBody contentHeader)
+ throws AMQException
+ {
+ UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId);
+ if (msg == null)
+ {
+ throw new AMQException("Error: received content header without having received a BasicDeliver frame first");
+ }
+ if (msg.contentHeader != null)
+ {
+ throw new AMQException("Error: received duplicate content header or did not receive correct number of content body frames");
+ }
+ msg.contentHeader = contentHeader;
+ if (contentHeader.bodySize == 0)
+ {
+ deliverMessageToAMQSession(channelId, msg);
+ }
+ }
+
+ public void messageContentBodyReceived(int channelId, ContentBody contentBody) throws AMQException
+ {
+ UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId);
+ if (msg == null)
+ {
+ throw new AMQException("Error: received content body without having received a JMSDeliver frame first");
+ }
+ if (msg.contentHeader == null)
+ {
+ _channelId2UnprocessedMsgMap.remove(channelId);
+ throw new AMQException("Error: received content body without having received a ContentHeader frame first");
+ }
+ try
+ {
+ msg.receiveBody(contentBody);
+ }
+ catch (UnexpectedBodyReceivedException e)
+ {
+ _channelId2UnprocessedMsgMap.remove(channelId);
+ throw e;
+ }
+ if (msg.isAllBodyDataReceived())
+ {
+ deliverMessageToAMQSession(channelId, msg);
+ }
+ }
+
+ /**
+ * Deliver a message to the appropriate session, removing the unprocessed message
+ * from our map
+ * @param channelId the channel id the message should be delivered to
+ * @param msg the message
+ */
+ private void deliverMessageToAMQSession(int channelId, UnprocessedMessage msg)
+ {
+ AMQSession session = (AMQSession) _channelId2SessionMap.get(channelId);
+ session.messageReceived(msg);
+ _channelId2UnprocessedMsgMap.remove(channelId);
+ }
+
+ /**
+ * Convenience method that writes a frame to the protocol session. Equivalent
+ * to calling getProtocolSession().write().
+ *
+ * @param frame the frame to write
+ */
+ public void writeFrame(AMQDataBlock frame)
+ {
+ _minaProtocolSession.write(frame);
+ }
+
+ public void writeFrame(AMQDataBlock frame, boolean wait)
+ {
+ WriteFuture f =_minaProtocolSession.write(frame);
+ if(wait)
+ {
+ f.join();
+ }
+ }
+
+ public void addSessionByChannel(int channelId, AMQSession session)
+ {
+ if (channelId <=0)
+ {
+ throw new IllegalArgumentException("Attempt to register a session with a channel id <= zero");
+ }
+ if (session == null)
+ {
+ throw new IllegalArgumentException("Attempt to register a null session");
+ }
+ _logger.debug("Add session with channel id " + channelId);
+ _channelId2SessionMap.put(channelId, session);
+ }
+
+ public void removeSessionByChannel(int channelId)
+ {
+ if (channelId <=0)
+ {
+ throw new IllegalArgumentException("Attempt to deregister a session with a channel id <= zero");
+ }
+ _logger.debug("Removing session with channelId " + channelId);
+ _channelId2SessionMap.remove(channelId);
+ }
+
+ /**
+ * Starts the process of closing a session
+ * @param session the AMQSession being closed
+ */
+ public void closeSession(AMQSession session)
+ {
+ _logger.debug("closeSession called on protocol session for session " + session.getChannelId());
+ final int channelId = session.getChannelId();
+ if (channelId <=0)
+ {
+ throw new IllegalArgumentException("Attempt to close a channel with id < 0");
+ }
+ // we need to know when a channel is closing so that we can respond
+ // with a channel.close frame when we receive any other type of frame
+ // on that channel
+ _closingChannels.putIfAbsent(channelId, session);
+ }
+
+ /**
+ * Called from the ChannelClose handler when a channel close frame is received.
+ * This method decides whether this is a response or an initiation. The latter
+ * case causes the AMQSession to be closed and an exception to be thrown if
+ * appropriate.
+ * @param channelId the id of the channel (session)
+ * @return true if the client must respond to the server, i.e. if the server
+ * initiated the channel close, false if the channel close is just the server
+ * responding to the client's earlier request to close the channel.
+ */
+ public boolean channelClosed(int channelId, int code, String text)
+ {
+ final Integer chId = channelId;
+ // if this is not a response to an earlier request to close the channel
+ if (_closingChannels.remove(chId) == null)
+ {
+ final AMQSession session = (AMQSession) _channelId2SessionMap.get(chId);
+ session.closed(new AMQException(_logger, code, text));
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ public AMQConnection getAMQConnection()
+ {
+ return (AMQConnection) _minaProtocolSession.getAttribute(AMQ_CONNECTION);
+ }
+
+ public void closeProtocolSession()
+ {
+ _logger.debug("Closing protocol session");
+ final CloseFuture future = _minaProtocolSession.close();
+ future.join();
+ }
+
+ public void failover(String host, int port)
+ {
+ _protocolHandler.failover(host, port);
+ }
+
+ String generateQueueName()
+ {
+ int id;
+ synchronized(_queueIdLock)
+ {
+ id = _queueId++;
+ }
+ //todo remove '/' and ':' from local Address as this doesn't conform to spec.
+ return "tmp_" + _minaProtocolSession.getLocalAddress() + "_" + id;
+ }
+
+ /**
+ *
+ * @param delay delay in seconds (not ms)
+ */
+ void initHeartbeats(int delay)
+ {
+ if (delay > 0)
+ {
+ _minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay);
+ _minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, HeartbeatConfig.CONFIG.getTimeout(delay));
+ HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
+ }
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,133 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.protocol;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQMethodBody;
+
+public abstract class BlockingMethodFrameListener implements AMQMethodListener
+{
+ private volatile boolean _ready = false;
+
+ public abstract boolean processMethod(int channelId, AMQMethodBody frame) throws AMQException;
+
+ private final Object _lock = new Object();
+
+ /**
+ * This is set if there is an exception thrown from processCommandFrame and the
+ * exception is rethrown to the caller of blockForFrame()
+ */
+ private volatile Exception _error;
+
+ protected int _channelId;
+
+ protected AMQMethodEvent _doneEvt = null;
+
+ public BlockingMethodFrameListener(int channelId)
+ {
+ _channelId = channelId;
+ }
+
+ /**
+ * This method is called by the MINA dispatching thread. Note that it could
+ * be called before blockForFrame() has been called.
+ * @param evt the frame event
+ * @return true if the listener has dealt with this frame
+ * @throws AMQException
+ */
+ public boolean methodReceived(AMQMethodEvent evt) throws AMQException
+ {
+ AMQMethodBody method = evt.getMethod();
+
+ try
+ {
+ boolean ready = (evt.getChannelId() == _channelId) && processMethod(evt.getChannelId(), method);
+ if (ready)
+ {
+ // we only update the flag from inside the synchronized block
+ // so that the blockForFrame method cannot "miss" an update - it
+ // will only ever read the flag from within the synchronized block
+ synchronized (_lock)
+ {
+ _doneEvt = evt;
+ _ready = ready;
+ _lock.notify();
+ }
+ }
+ return ready;
+ }
+ catch (AMQException e)
+ {
+ error(e);
+ // we rethrow the error here, and the code in the frame dispatcher will go round
+ // each listener informing them that an exception has been thrown
+ throw e;
+ }
+ }
+
+ /**
+ * This method is called by the thread that wants to wait for a frame.
+ */
+ public AMQMethodEvent blockForFrame() throws AMQException
+ {
+ synchronized (_lock)
+ {
+ while (!_ready)
+ {
+ try
+ {
+ _lock.wait();
+ }
+ catch (InterruptedException e)
+ {
+ // IGNORE
+ }
+ }
+ }
+ if (_error != null)
+ {
+ if (_error instanceof AMQException)
+ {
+ throw (AMQException)_error;
+ }
+ else
+ {
+ throw new AMQException("Woken up due to exception", _error); // FIXME: This will wrap FailoverException and prevent it being caught.
+ }
+ }
+
+ return _doneEvt;
+ }
+
+ /**
+ * This is a callback, called by the MINA dispatcher thread only. It is also called from within this
+ * class to avoid code repetition but again is only called by the MINA dispatcher thread.
+ * @param e
+ */
+ public void error(Exception e)
+ {
+ // set the error so that the thread that is blocking (against blockForFrame())
+ // can pick up the exception and rethrow to the caller
+ _error = e;
+ synchronized (_lock)
+ {
+ _ready = true;
+ _lock.notify();
+ }
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/HeartbeatConfig.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/HeartbeatConfig.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/HeartbeatConfig.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/HeartbeatConfig.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,57 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.protocol;
+
+import org.apache.log4j.Logger;
+
+class HeartbeatConfig
+{
+ private static final Logger _logger = Logger.getLogger(HeartbeatConfig.class);
+ static final HeartbeatConfig CONFIG = new HeartbeatConfig();
+
+ /**
+ * The factor used to get the timeout from the delay between heartbeats.
+ */
+ private float timeoutFactor = 2;
+
+ HeartbeatConfig()
+ {
+ String property = System.getProperty("amqj.heartbeat.timeoutFactor");
+ if(property != null)
+ {
+ try
+ {
+ timeoutFactor = Float.parseFloat(property);
+ }
+ catch(NumberFormatException e)
+ {
+ _logger.warn("Invalid timeout factor (amqj.heartbeat.timeoutFactor): " + property);
+ }
+ }
+ }
+
+ float getTimeoutFactor()
+ {
+ return timeoutFactor;
+ }
+
+ int getTimeout(int writeDelay)
+ {
+ return (int) (timeoutFactor * writeDelay);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/HeartbeatConfig.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,118 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.protocol;
+
+class HeartbeatDiagnostics
+{
+ private static final Diagnostics _impl = init();
+
+ private static Diagnostics init()
+ {
+ return Boolean.getBoolean("amqj.heartbeat.diagnostics") ? new On() : new Off();
+ }
+
+ static void sent()
+ {
+ _impl.sent();
+ }
+
+ static void timeout()
+ {
+ _impl.timeout();
+ }
+
+ static void received(boolean heartbeat)
+ {
+ _impl.received(heartbeat);
+ }
+
+ static void init(int delay, int timeout)
+ {
+ _impl.init(delay, timeout);
+ }
+
+ private static interface Diagnostics
+ {
+ void sent();
+ void timeout();
+ void received(boolean heartbeat);
+ void init(int delay, int timeout);
+ }
+
+ private static class On implements Diagnostics
+ {
+ private final String[] messages = new String[50];
+ private int i;
+
+ private void save(String msg)
+ {
+ messages[i++] = msg;
+ if(i >= messages.length){
+ i = 0;//i.e. a circular buffer
+ }
+ }
+
+ public void sent()
+ {
+ save(System.currentTimeMillis() + ": sent heartbeat");
+ }
+
+ public void timeout()
+ {
+ for(int i = 0; i < messages.length; i++)
+ {
+ if(messages[i] != null)
+ {
+ System.out.println(messages[i]);
+ }
+ }
+ System.out.println(System.currentTimeMillis() + ": timed out");
+ }
+
+ public void received(boolean heartbeat)
+ {
+ save(System.currentTimeMillis() + ": received " + (heartbeat ? "heartbeat" : "data"));
+ }
+
+ public void init(int delay, int timeout)
+ {
+ System.out.println(System.currentTimeMillis() + ": initialised delay=" + delay + ", timeout=" + timeout);
+ }
+ }
+
+ private static class Off implements Diagnostics
+ {
+ public void sent()
+ {
+
+ }
+ public void timeout()
+ {
+
+ }
+ public void received(boolean heartbeat)
+ {
+
+ }
+
+ public void init(int delay, int timeout)
+ {
+
+ }
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java
------------------------------------------------------------------------------
svn:eol-style = native