You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2007/09/04 15:02:58 UTC
svn commit: r572656 - in
/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms:
./ message/
Author: arnaudsimon
Date: Tue Sep 4 06:02:58 2007
New Revision: 572656
URL: http://svn.apache.org/viewvc?rev=572656&view=rev
Log:
added byteBuffer to Stream converter
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java?rev=572656&r1=572655&r2=572656&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java Tue Sep 4 06:02:58 2007
@@ -40,7 +40,8 @@
/**
* Implementation of JMS message consumer
*/
-public class MessageConsumerImpl extends MessageActor implements MessageConsumer, org.apache.qpidity.client.util.MessageListener
+public class MessageConsumerImpl extends MessageActor
+ implements MessageConsumer, org.apache.qpidity.client.util.MessageListener
{
// we can receive up to 100 messages for an asynchronous listener
public static final int MAX_MESSAGE_TRANSFERRED = 100;
@@ -78,28 +79,19 @@
private MessageListener _messageListener;
/**
- * The synchronous message just delivered
- */
- private QpidMessage _incomingMessage;
-
- /**
* A lcok on the syncrhonous message
*/
private final Object _incomingMessageLock = new Object();
- /**
- * Indicates that this consumer is receiving a synch message
- */
- private boolean _isReceiving = false;
/**
* Number of mesages received asynchronously
* Nether exceed MAX_MESSAGE_TRANSFERRED
*/
private int _messageAsyncrhonouslyReceived = 0;
-
+
private LinkedBlockingQueue<QpidMessage> _queue = new LinkedBlockingQueue<QpidMessage>();
-
+
//----- Constructors
/**
* Create a new MessageProducerImpl.
@@ -126,7 +118,7 @@
_subscriptionName = subscriptionName;
_isStopped = getSession().isStopped();
// let's create a message part assembler
-
+
MessagePartListener messageAssembler = new MessagePartListenerAdapter(this);
if (destination instanceof Queue)
@@ -283,12 +275,10 @@
// Check if we can get a message immediately
Message result;
result = receiveNoWait();
-
- if(result != null)
+ if (result != null)
{
return result;
}
-
try
{
// Now issue a credit and wait for the broker to send a message
@@ -296,7 +286,7 @@
// This will only overload the broker. After the initial try we can wait
// for the broker to send a message when it gets one
requestCredit(1);
- return (Message)_queue.take();
+ return (Message) _queue.take();
}
catch (Exception e)
{
@@ -323,19 +313,19 @@
{
throw new JMSException("Invalid timeout value: " + timeout);
}
-
+
Message result;
try
{
// first check if we have any in the queue already
- result = (Message)_queue.poll();
- if(result == null)
+ result = (Message) _queue.poll();
+ if (result == null)
{
requestCredit(1);
requestFlush();
// We shouldn't do a sync(). Bcos the timeout can happen
// before the sync() returns
- return (Message)_queue.poll(timeout,TimeUnit.MILLISECONDS);
+ return (Message) _queue.poll(timeout, TimeUnit.MILLISECONDS);
}
else
{
@@ -362,50 +352,71 @@
try
{
// first check if we have any in the queue already
- result = (Message)_queue.poll();
- if(result == null)
+ result = (Message) _queue.poll();
+ if (result == null)
{
requestCredit(1);
requestFlush();
requestSync();
- return (Message)_queue.poll();
+ return (Message) _queue.poll();
}
else
{
return result;
- }
+ }
}
catch (Exception e)
{
throw ExceptionHelper.convertQpidExceptionToJMSException(e);
}
}
-
- // not public methods
- private void requestCredit(int units)
+
+ // not public methods
+ /**
+ * Upon receipt of this method, the broker adds "value"
+ * number of messages to the available credit balance for this consumer.
+ *
+ * @param value Number of credits, a value of 0 indicates an infinite amount of credit.
+ */
+ private void requestCredit(int value)
{
getSession().getQpidSession()
- .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, units);
+ .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, value);
}
-
+
+ /**
+ * Forces the broker to exhaust its credit supply.
+ * <p> The broker's credit will always be zero when
+ * this method completes.
+ */
private void requestFlush()
{
getSession().getQpidSession().messageFlush(getMessageActorID());
}
-
+
+ /**
+ * Sync method will block until all outstanding broker
+ * commands
+ * are executed.
+ */
private void requestSync()
{
getSession().getQpidSession().sync();
}
-
+
+ /**
+ * Check whether this consumer is closed.
+ *
+ * @throws JMSException If this consumer is closed.
+ */
private void checkClosed() throws JMSException
{
- if(_isStopped)
+ if (_isStopped)
{
throw new JMSException("Session is closed");
}
}
-
+
/**
* Stop the delivery of messages to this consumer.
* <p>For asynchronous receiver, this operation blocks until the message listener
@@ -428,10 +439,14 @@
{
synchronized (_incomingMessageLock)
{
- _isStopped = false;
+ _isStopped = false;
}
}
+ /**
+ * This method notifies this consumer that a message has been delivered
+ * @param message The received message.
+ */
public void onMessage(org.apache.qpidity.api.Message message)
{
try
@@ -440,7 +455,7 @@
if (checkPreConditions(jmsMessage))
{
preApplicationProcessing(jmsMessage);
-
+
if (_messageListener == null)
{
_queue.offer(jmsMessage);
@@ -453,16 +468,16 @@
notifyMessageListener(jmsMessage);
}
}
- }
+ }
catch (Exception e)
{
throw new RuntimeException(e.getMessage());
}
}
-
-
- public void notifyMessageListener(QpidMessage message)throws RuntimeException
- {
+
+
+ public void notifyMessageListener(QpidMessage message) throws RuntimeException
+ {
try
{
_messageAsyncrhonouslyReceived++;
@@ -471,8 +486,7 @@
// ask the server for the delivery of MAX_MESSAGE_TRANSFERRED more messages
resetAsynchMessageReceived();
}
-
-
+
// The JMS specs says:
/* The result of a listener throwing a RuntimeException depends on the session?s
* acknowledgment mode.
@@ -484,9 +498,9 @@
*
* The number of time we try redelivering the message is 0
**/
- try
+ try
{
-
+
_messageListener.onMessage((Message) message);
}
catch (RuntimeException re)
@@ -494,14 +508,19 @@
// do nothing as this message will not be redelivered
}
-
+
}
catch (Exception e)
{
throw new RuntimeException(e.getMessage());
}
}
-
+
+ /**
+ * Check whether this consumer is asynchronous
+ *
+ * @throws javax.jms.IllegalStateException If this consumer is asynchronous.
+ */
private void checkIfListenerSet() throws javax.jms.IllegalStateException
{
@@ -510,8 +529,14 @@
throw new javax.jms.IllegalStateException("A listener has already been set.");
}
}
-
- private void preApplicationProcessing(QpidMessage message)throws Exception
+
+ /**
+ * pre process a received message.
+ *
+ * @param message The message to pre-process.
+ * @throws Exception If the message cannot be pre-processed due to some internal error.
+ */
+ private void preApplicationProcessing(QpidMessage message) throws Exception
{
getSession().preProcessMessage(message);
// If the session is transacted we need to ack the message first
@@ -522,41 +547,54 @@
}
message.afterMessageReceive();
}
-
- private boolean checkPreConditions(QpidMessage message)throws QpidException
+
+ /**
+ * Check whether a message can be delivered to this consumer.
+ *
+ * @param message The message to be checked.
+ * @return true if the message matches the selector and can be acquired, false otherwise.
+ * @throws QpidException If the message preConditions cannot be checked due to some internal error.
+ */
+ private boolean checkPreConditions(QpidMessage message) throws QpidException
{
boolean messageOk = true;
if (_messageSelector != null)
{
- messageOk = _filter.matches((Message) message);
- if (!messageOk)
- {
- System.out.println("Message not OK, releasing");
- releaseMessage(message);
- return false;
- }
+ messageOk = _filter.matches((Message) message);
+ }
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("messageOk " + messageOk);
+ _logger.debug("_preAcquire " + _preAcquire);
}
-
- System.out.println("messageOk " + messageOk);
- System.out.println("_preAcquire " + _preAcquire);
-
if (!messageOk && _preAcquire)
{
// this is the case for topics
// We need to ack this message
- System.out.println("filterMessage - trying to ack message");
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("filterMessage - trying to ack message");
+ }
acknowledgeMessage(message);
- System.out.println("filterMessage - acked message");
+ }
+ else if (!messageOk)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Message not OK, releasing");
+ }
+ releaseMessage(message);
}
// now we need to acquire this message if needed
// this is the case of queue with a message selector set
if (!_preAcquire && messageOk)
{
- System.out.println("filterMessage - trying to acquire message");
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("filterMessage - trying to acquire message");
+ }
messageOk = acquireMessage(message);
- System.out.println("filterMessage - acquired message");
}
-
return messageOk;
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java?rev=572656&r1=572655&r2=572656&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java Tue Sep 4 06:02:58 2007
@@ -836,25 +836,10 @@
{
try
{
- /*
- * messageData.array() throws an UnsupportedOperationException
- System.out.println("messageData Array : " +messageData.array().length);
-
- _dataIn = new DataInputStream(
- new ByteArrayInputStream(messageData.array(), messageData.arrayOffset() + messageData.position()
- , messageData.remaining()));
- */
-
- // temp hack
- byte[] b = new byte[messageData.limit()];
- messageData.get(b);
- _dataIn = new DataInputStream(
- new ByteArrayInputStream(b));
-
+ _dataIn = new DataInputStream(asInputStream());
}
catch (Exception e)
{
- e.printStackTrace();
throw new QpidException("Cannot retrieve data from message ", null, e);
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java?rev=572656&r1=572655&r2=572656&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java Tue Sep 4 06:02:58 2007
@@ -594,10 +594,7 @@
{
try
{
- ByteArrayInputStream bais = new ByteArrayInputStream(messageData.array(),
- messageData.arrayOffset() + messageData.position(),
- messageData.remaining());
- ObjectInputStream ois = new ObjectInputStream(bais);
+ ObjectInputStream ois = new ObjectInputStream(asInputStream());
_map = (Map<String, Object>) ois.readObject();
}
catch (IOException ioe)
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java?rev=572656&r1=572655&r2=572656&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java Tue Sep 4 06:02:58 2007
@@ -23,6 +23,12 @@
import javax.jms.*;
import java.util.Enumeration;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.CoderResult;
/**
* Implementation of javax.jms.Message
@@ -56,7 +62,7 @@
/**
* Indicate whether the message properties are in writeable status.
*/
- protected boolean _proertiesReadOnly = false;
+ protected boolean _propertiesReadOnly = false;
/**
* The message consumer through which this message was received.
@@ -83,7 +89,7 @@
{
super(message);
}
-
+
//---- javax.jms.Message interface
/**
* Get the message ID.
@@ -506,7 +512,7 @@
{
// The properties can now be written
// Properties are read only when the message is received.
- _proertiesReadOnly = false;
+ _propertiesReadOnly = false;
super.clearMessageProperties();
}
@@ -827,7 +833,7 @@
*/
public void setObjectProperty(String name, Object value) throws JMSException
{
- if (_proertiesReadOnly)
+ if (_propertiesReadOnly)
{
throw new MessageNotWriteableException("Error the message properties are read only");
}
@@ -895,7 +901,7 @@
// recreate a destination object for the encoded ReplyTo destination (if it exists)
// _replyTo = // todo
- _proertiesReadOnly = true;
+ _propertiesReadOnly = true;
_readOnly = true;
}
@@ -924,4 +930,87 @@
_messageConsumer = messageConsumer;
}
+ /**
+ * Returns an {@link java.io.InputStream} that reads the data from this mesage buffer.
+ * {@link java.io.InputStream#read()} returns <tt>-1</tt> if the buffer position
+ * reaches to the limit.
+ *
+ * @return An {@link java.io.InputStream} that reads the data from this mesage buffer.
+ */
+ public InputStream asInputStream()
+ {
+ return new InputStream()
+ {
+ @Override
+ public int available()
+ {
+ return getMessageData().remaining();
+ }
+
+ @Override
+ public synchronized void mark(int readlimit)
+ {
+ getMessageData().mark();
+ }
+
+ @Override
+ public boolean markSupported()
+ {
+ return true;
+ }
+
+ @Override
+ public int read()
+ {
+ if (getMessageData().hasRemaining())
+ {
+ return getMessageData().get() & 0xff;
+ }
+ else
+ {
+ return -1;
+ }
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len)
+ {
+ int remaining = getMessageData().remaining();
+ if (remaining > 0)
+ {
+ int readBytes = Math.min(remaining, len);
+ getMessageData().get(b, off, readBytes);
+ return readBytes;
+ }
+ else
+ {
+ return -1;
+ }
+ }
+
+ @Override
+ public synchronized void reset()
+ {
+ getMessageData().reset();
+ }
+
+ @Override
+ public long skip(long n)
+ {
+ int bytes;
+ if (n > Integer.MAX_VALUE)
+ {
+ bytes = getMessageData().remaining();
+ }
+ else
+ {
+ bytes = Math.min(getMessageData().remaining(), (int) n);
+ }
+ getMessageData().position(getMessageData().position() + bytes);
+ return bytes;
+ }
+ };
+ }
+
+
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java?rev=572656&r1=572655&r2=572656&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java Tue Sep 4 06:02:58 2007
@@ -158,10 +158,7 @@
{
try
{
- ByteArrayInputStream bais = new ByteArrayInputStream(messageData.array(),
- messageData.arrayOffset() + messageData.position(),
- messageData.remaining());
- ObjectInputStream ois = new ObjectInputStream(bais);
+ ObjectInputStream ois = new ObjectInputStream(asInputStream());
_object = (Serializable) ois.readObject();
}
catch (IOException ioe)
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java?rev=572656&r1=572655&r2=572656&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java Tue Sep 4 06:02:58 2007
@@ -22,6 +22,8 @@
import javax.jms.TextMessage;
import javax.jms.JMSException;
import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.*;
import java.io.UnsupportedEncodingException;
/**
@@ -119,10 +121,9 @@
{
try
{
- _messageText = new String(messageData.array(), messageData.arrayOffset() + messageData.position(),
- messageData.remaining(), CHARACTER_ENCODING);
+ _messageText = getString();
}
- catch (UnsupportedEncodingException e)
+ catch (Exception e)
{
throw new QpidException("Problem when decoding text", null, e);
}
@@ -142,6 +143,186 @@
{
super.clearBody();
_messageText = null;
+ }
+
+ /**
+ * This method is taken from Mina code
+ *
+ * Reads a <code>NUL</code>-terminated string from this buffer using the
+ * specified <code>decoder</code> and returns it. This method reads
+ * until the limit of this buffer if no <tt>NUL</tt> is found.
+ *
+ * @return
+ * @throws java.nio.charset.CharacterCodingException
+ *
+ */
+ public String getString() throws CharacterCodingException
+ {
+ if (!getMessageData().hasRemaining())
+ {
+ return "";
+ }
+ Charset charset = Charset.forName(CHARACTER_ENCODING);
+ CharsetDecoder decoder = charset.newDecoder();
+
+ boolean utf16 = decoder.charset().name().startsWith("UTF-16");
+
+ int oldPos = getMessageData().position();
+ int oldLimit = getMessageData().limit();
+ int end = -1;
+ int newPos;
+
+ if (!utf16)
+ {
+ end = indexOf((byte) 0x00);
+ if (end < 0)
+ {
+ newPos = end = oldLimit;
+ }
+ else
+ {
+ newPos = end + 1;
+ }
+ }
+ else
+ {
+ int i = oldPos;
+ for (; ;)
+ {
+ boolean wasZero = getMessageData().get(i) == 0;
+ i++;
+
+ if (i >= oldLimit)
+ {
+ break;
+ }
+
+ if (getMessageData().get(i) != 0)
+ {
+ i++;
+ if (i >= oldLimit)
+ {
+ break;
+ }
+ else
+ {
+ continue;
+ }
+ }
+
+ if (wasZero)
+ {
+ end = i - 1;
+ break;
+ }
+ }
+
+ if (end < 0)
+ {
+ newPos = end = oldPos + ((oldLimit - oldPos) & 0xFFFFFFFE);
+ }
+ else
+ {
+ if (end + 2 <= oldLimit)
+ {
+ newPos = end + 2;
+ }
+ else
+ {
+ newPos = end;
+ }
+ }
+ }
+
+ if (oldPos == end)
+ {
+ getMessageData().position(newPos);
+ return "";
+ }
+
+ getMessageData().limit(end);
+ decoder.reset();
+
+ int expectedLength = (int) (getMessageData().remaining() * decoder.averageCharsPerByte()) + 1;
+ CharBuffer out = CharBuffer.allocate(expectedLength);
+ for (; ;)
+ {
+ CoderResult cr;
+ if (getMessageData().hasRemaining())
+ {
+ cr = decoder.decode(getMessageData(), out, true);
+ }
+ else
+ {
+ cr = decoder.flush(out);
+ }
+
+ if (cr.isUnderflow())
+ {
+ break;
+ }
+
+ if (cr.isOverflow())
+ {
+ CharBuffer o = CharBuffer.allocate(out.capacity() + expectedLength);
+ out.flip();
+ o.put(out);
+ out = o;
+ continue;
+ }
+
+ if (cr.isError())
+ {
+ // Revert the buffer back to the previous state.
+ getMessageData().limit(oldLimit);
+ getMessageData().position(oldPos);
+ cr.throwException();
+ }
+ }
+
+ getMessageData().limit(oldLimit);
+ getMessageData().position(newPos);
+ return out.flip().toString();
+ }
+
+ /**
+ * Returns the first occurence position of the specified byte from the current position to
+ * the current limit.
+ *
+ * @return <tt>-1</tt> if the specified byte is not found
+ * @param b
+ */
+ public int indexOf(byte b)
+ {
+ if (getMessageData().hasArray())
+ {
+ int arrayOffset = getMessageData().arrayOffset();
+ int beginPos = arrayOffset + getMessageData().position();
+ int limit = arrayOffset + getMessageData().limit();
+ byte[] array = getMessageData().array();
+
+ for (int i = beginPos; i < limit; i++)
+ {
+ if (array[i] == b)
+ {
+ return i - arrayOffset;
+ }
+ }
+ }
+ else
+ {
+ int beginPos = getMessageData().position();
+ int limit = getMessageData().limit();
+
+ for (int i = beginPos; i < limit; i++)
+ {
+ if (getMessageData().get(i) == b)
+ {
+ return i;
+ }
+ }
+ }
+ return -1;
}
}