You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2007/09/04 15:02:58 UTC

svn commit: r572656 - in /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms: ./ message/

Author: arnaudsimon
Date: Tue Sep  4 06:02:58 2007
New Revision: 572656

URL: http://svn.apache.org/viewvc?rev=572656&view=rev
Log:
added byteBuffer to Stream converter 

Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java?rev=572656&r1=572655&r2=572656&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java Tue Sep  4 06:02:58 2007
@@ -40,7 +40,8 @@
 /**
  * Implementation of JMS message consumer
  */
-public class MessageConsumerImpl extends MessageActor implements MessageConsumer, org.apache.qpidity.client.util.MessageListener
+public class MessageConsumerImpl extends MessageActor
+        implements MessageConsumer, org.apache.qpidity.client.util.MessageListener
 {
     // we can receive up to 100 messages for an asynchronous listener
     public static final int MAX_MESSAGE_TRANSFERRED = 100;
@@ -78,28 +79,19 @@
     private MessageListener _messageListener;
 
     /**
-     * The synchronous message just delivered
-     */
-    private QpidMessage _incomingMessage;
-
-    /**
      * A lcok on the syncrhonous message
      */
     private final Object _incomingMessageLock = new Object();
 
-    /**
-     * Indicates that this consumer is receiving a synch message
-     */
-    private boolean _isReceiving = false;
 
     /**
      * Number of mesages received asynchronously
      * Nether exceed MAX_MESSAGE_TRANSFERRED
      */
     private int _messageAsyncrhonouslyReceived = 0;
-    
+
     private LinkedBlockingQueue<QpidMessage> _queue = new LinkedBlockingQueue<QpidMessage>();
-    
+
     //----- Constructors
     /**
      * Create a new MessageProducerImpl.
@@ -126,7 +118,7 @@
         _subscriptionName = subscriptionName;
         _isStopped = getSession().isStopped();
         // let's create a message part assembler
-        
+
         MessagePartListener messageAssembler = new MessagePartListenerAdapter(this);
 
         if (destination instanceof Queue)
@@ -283,12 +275,10 @@
         // Check if we can get a message immediately
         Message result;
         result = receiveNoWait();
-        
-        if(result != null)
+        if (result != null)
         {
             return result;
         }
-        
         try
         {
             // Now issue a credit and wait for the broker to send a message
@@ -296,7 +286,7 @@
             // This will only overload the broker. After the initial try we can wait
             // for the broker to send a message when it gets one
             requestCredit(1);
-            return (Message)_queue.take();
+            return (Message) _queue.take();
         }
         catch (Exception e)
         {
@@ -323,19 +313,19 @@
         {
             throw new JMSException("Invalid timeout value: " + timeout);
         }
-        
+
         Message result;
         try
         {
             // first check if we have any in the queue already
-            result = (Message)_queue.poll();
-            if(result == null)
+            result = (Message) _queue.poll();
+            if (result == null)
             {
                 requestCredit(1);
                 requestFlush();
                 // We shouldn't do a sync(). Bcos the timeout can happen
                 // before the sync() returns
-                return (Message)_queue.poll(timeout,TimeUnit.MILLISECONDS);
+                return (Message) _queue.poll(timeout, TimeUnit.MILLISECONDS);
             }
             else
             {
@@ -362,50 +352,71 @@
         try
         {
             // first check if we have any in the queue already
-            result = (Message)_queue.poll();
-            if(result == null)
+            result = (Message) _queue.poll();
+            if (result == null)
             {
                 requestCredit(1);
                 requestFlush();
                 requestSync();
-                return (Message)_queue.poll();
+                return (Message) _queue.poll();
             }
             else
             {
                 return result;
-            }            
+            }
         }
         catch (Exception e)
         {
             throw ExceptionHelper.convertQpidExceptionToJMSException(e);
         }
     }
-    
-    // not public methods    
-    private void requestCredit(int units)
+
+    // not public methods
+    /**
+     * Upon receipt of this method, the broker adds "value"
+     * number of messages to the available credit balance for this consumer.
+     *
+     * @param value Number of credits, a value of 0 indicates an infinite amount of credit.
+     */
+    private void requestCredit(int value)
     {
         getSession().getQpidSession()
-        .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, units);
+                .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, value);
     }
-    
+
+    /**
+     * Forces the broker to exhaust its credit supply.
+     * <p> The broker's credit will always be zero when
+     * this method completes.
+     */
     private void requestFlush()
     {
         getSession().getQpidSession().messageFlush(getMessageActorID());
     }
-        
+
+    /**
+     * Sync method will block until all outstanding broker
+     * commands
+     * are executed.
+     */
     private void requestSync()
     {
         getSession().getQpidSession().sync();
     }
-    
+
+    /**
+     * Check whether this consumer is closed.
+     *
+     * @throws JMSException If this consumer is closed.
+     */
     private void checkClosed() throws JMSException
     {
-        if(_isStopped)
+        if (_isStopped)
         {
             throw new JMSException("Session is closed");
         }
     }
-    
+
     /**
      * Stop the delivery of messages to this consumer.
      * <p>For asynchronous receiver, this operation blocks until the message listener
@@ -428,10 +439,14 @@
     {
         synchronized (_incomingMessageLock)
         {
-            _isStopped = false;            
+            _isStopped = false;
         }
     }
 
+    /**
+     * This method notifies this consumer that a message has been delivered
+     * @param message The received message.
+     */
     public void onMessage(org.apache.qpidity.api.Message message)
     {
         try
@@ -440,7 +455,7 @@
             if (checkPreConditions(jmsMessage))
             {
                 preApplicationProcessing(jmsMessage);
-            
+
                 if (_messageListener == null)
                 {
                     _queue.offer(jmsMessage);
@@ -453,16 +468,16 @@
                     notifyMessageListener(jmsMessage);
                 }
             }
-        }    
+        }
         catch (Exception e)
         {
             throw new RuntimeException(e.getMessage());
         }
     }
-    
-   
-    public void notifyMessageListener(QpidMessage message)throws RuntimeException
-    {        
+
+
+    public void notifyMessageListener(QpidMessage message) throws RuntimeException
+    {
         try
         {
             _messageAsyncrhonouslyReceived++;
@@ -471,8 +486,7 @@
                 // ask the server for the delivery of MAX_MESSAGE_TRANSFERRED more messages
                 resetAsynchMessageReceived();
             }
-            
-            
+
             // The JMS specs says:
             /* The result of a listener throwing a RuntimeException depends on the session?s
             * acknowledgment mode.
@@ -484,9 +498,9 @@
             *
             * The number of time we try redelivering the message is 0
             **/
-           try
+            try
             {
-                
+
                 _messageListener.onMessage((Message) message);
             }
             catch (RuntimeException re)
@@ -494,14 +508,19 @@
                 // do nothing as this message will not be redelivered
             }
 
-            
+
         }
         catch (Exception e)
         {
             throw new RuntimeException(e.getMessage());
         }
     }
-    
+
+    /**
+     * Check whether this consumer is asynchronous
+     *
+     * @throws javax.jms.IllegalStateException If this consumer is asynchronous.
+     */
     private void checkIfListenerSet() throws javax.jms.IllegalStateException
     {
 
@@ -510,8 +529,14 @@
             throw new javax.jms.IllegalStateException("A listener has already been set.");
         }
     }
-    
-    private void preApplicationProcessing(QpidMessage message)throws Exception
+
+    /**
+     * pre process a received message.
+     *
+     * @param message The message to pre-process.
+     * @throws Exception If the message  cannot be pre-processed due to some internal error.
+     */
+    private void preApplicationProcessing(QpidMessage message) throws Exception
     {
         getSession().preProcessMessage(message);
         // If the session is transacted we need to ack the message first
@@ -522,41 +547,54 @@
         }
         message.afterMessageReceive();
     }
-                    
-    private boolean checkPreConditions(QpidMessage message)throws QpidException
+
+    /**
+     * Check whether a message can be delivered to this consumer.
+     *
+     * @param message The message to be checked.
+     * @return true if the message matches the selector and can be acquired, false otherwise.
+     * @throws QpidException If the message preConditions cannot be checked due to some internal error.
+     */
+    private boolean checkPreConditions(QpidMessage message) throws QpidException
     {
         boolean messageOk = true;
         if (_messageSelector != null)
         {
-            messageOk = _filter.matches((Message) message);     
-            if (!messageOk)
-            {
-                System.out.println("Message not OK, releasing");
-                releaseMessage(message);
-                return false;
-            }
+            messageOk = _filter.matches((Message) message);
+        }
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("messageOk " + messageOk);
+            _logger.debug("_preAcquire " + _preAcquire);
         }
-        
-        System.out.println("messageOk " + messageOk);
-        System.out.println("_preAcquire " + _preAcquire);
-        
         if (!messageOk && _preAcquire)
         {
             // this is the case for topics
             // We need to ack this message
-            System.out.println("filterMessage - trying to ack message");                
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("filterMessage - trying to ack message");
+            }
             acknowledgeMessage(message);
-            System.out.println("filterMessage - acked message");
+        }
+        else if (!messageOk)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Message not OK, releasing");
+            }
+            releaseMessage(message);
         }
         // now we need to acquire this message if needed
         // this is the case of queue with a message selector set
         if (!_preAcquire && messageOk)
         {
-            System.out.println("filterMessage - trying to acquire message");
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("filterMessage - trying to acquire message");
+            }
             messageOk = acquireMessage(message);
-            System.out.println("filterMessage - acquired message");
         }
-        
         return messageOk;
     }
 

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java?rev=572656&r1=572655&r2=572656&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java Tue Sep  4 06:02:58 2007
@@ -836,25 +836,10 @@
         {
             try
             {
-                /*
-                 * messageData.array() throws an UnsupportedOperationException
-                System.out.println("messageData Array : " +messageData.array().length);
-                
-                _dataIn = new DataInputStream(
-                        new ByteArrayInputStream(messageData.array(), messageData.arrayOffset() + messageData.position()
-                                , messageData.remaining()));
-                */
-                
-                // temp hack
-                byte[] b = new byte[messageData.limit()];
-                messageData.get(b);
-                _dataIn = new DataInputStream(
-                        new ByteArrayInputStream(b));
-                                
+                _dataIn = new DataInputStream(asInputStream());
             }
             catch (Exception e)
             {
-                e.printStackTrace();
                 throw new QpidException("Cannot retrieve data from message ", null, e);
             }
         }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java?rev=572656&r1=572655&r2=572656&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java Tue Sep  4 06:02:58 2007
@@ -594,10 +594,7 @@
         {
             try
             {
-                ByteArrayInputStream bais = new ByteArrayInputStream(messageData.array(),
-                                                                     messageData.arrayOffset() + messageData.position(),
-                                                                     messageData.remaining());
-                ObjectInputStream ois = new ObjectInputStream(bais);
+                ObjectInputStream ois = new ObjectInputStream(asInputStream());
                 _map = (Map<String, Object>) ois.readObject();
             }
             catch (IOException ioe)

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java?rev=572656&r1=572655&r2=572656&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java Tue Sep  4 06:02:58 2007
@@ -23,6 +23,12 @@
 
 import javax.jms.*;
 import java.util.Enumeration;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.CoderResult;
 
 /**
  * Implementation of javax.jms.Message
@@ -56,7 +62,7 @@
     /**
      * Indicate whether the message properties are in writeable status.
      */
-    protected boolean _proertiesReadOnly = false;
+    protected boolean _propertiesReadOnly = false;
 
     /**
      * The message consumer through which this message was received.
@@ -83,7 +89,7 @@
     {
         super(message);
     }
-    
+
     //---- javax.jms.Message interface
     /**
      * Get the message ID.
@@ -506,7 +512,7 @@
     {
         // The properties can now be written
         // Properties are read only when the message is received.
-        _proertiesReadOnly = false;
+        _propertiesReadOnly = false;
         super.clearMessageProperties();
     }
 
@@ -827,7 +833,7 @@
      */
     public void setObjectProperty(String name, Object value) throws JMSException
     {
-        if (_proertiesReadOnly)
+        if (_propertiesReadOnly)
         {
             throw new MessageNotWriteableException("Error the message properties are read only");
         }
@@ -895,7 +901,7 @@
         // recreate a destination object for the encoded ReplyTo destination (if it exists)
         //          _replyTo = // todo
 
-        _proertiesReadOnly = true;
+        _propertiesReadOnly = true;
         _readOnly = true;
     }
 
@@ -924,4 +930,87 @@
         _messageConsumer = messageConsumer;
     }
 
+    /**
+     * Returns an {@link java.io.InputStream} that reads the data from this mesage buffer.
+     * {@link java.io.InputStream#read()} returns <tt>-1</tt> if the buffer position
+     * reaches to the limit.
+     *
+     * @return An {@link java.io.InputStream} that reads the data from this mesage buffer.
+     */
+    public InputStream asInputStream()
+    {
+        return new InputStream()
+        {
+            @Override
+            public int available()
+            {
+                return getMessageData().remaining();
+            }
+
+            @Override
+            public synchronized void mark(int readlimit)
+            {
+                getMessageData().mark();
+            }
+
+            @Override
+            public boolean markSupported()
+            {
+                return true;
+            }
+
+            @Override
+            public int read()
+            {
+                if (getMessageData().hasRemaining())
+                {
+                    return getMessageData().get() & 0xff;
+                }
+                else
+                {
+                    return -1;
+                }
+            }
+
+            @Override
+            public int read(byte[] b, int off, int len)
+            {
+                int remaining = getMessageData().remaining();
+                if (remaining > 0)
+                {
+                    int readBytes = Math.min(remaining, len);
+                    getMessageData().get(b, off, readBytes);
+                    return readBytes;
+                }
+                else
+                {
+                    return -1;
+                }
+            }
+
+            @Override
+            public synchronized void reset()
+            {
+                getMessageData().reset();
+            }
+
+            @Override
+            public long skip(long n)
+            {
+                int bytes;
+                if (n > Integer.MAX_VALUE)
+                {
+                    bytes = getMessageData().remaining();
+                }
+                else
+                {
+                    bytes = Math.min(getMessageData().remaining(), (int) n);
+                }
+                getMessageData().position(getMessageData().position() + bytes);
+                return bytes;
+            }
+        };
+    }
+
+  
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java?rev=572656&r1=572655&r2=572656&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java Tue Sep  4 06:02:58 2007
@@ -158,10 +158,7 @@
         {
             try
             {
-                ByteArrayInputStream bais = new ByteArrayInputStream(messageData.array(),
-                                                                     messageData.arrayOffset() + messageData.position(),
-                                                                     messageData.remaining());
-                ObjectInputStream ois = new ObjectInputStream(bais);
+                ObjectInputStream ois = new ObjectInputStream(asInputStream());
                 _object = (Serializable) ois.readObject();
             }
             catch (IOException ioe)

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java?rev=572656&r1=572655&r2=572656&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java Tue Sep  4 06:02:58 2007
@@ -22,6 +22,8 @@
 import javax.jms.TextMessage;
 import javax.jms.JMSException;
 import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.*;
 import java.io.UnsupportedEncodingException;
 
 /**
@@ -119,10 +121,9 @@
         {
             try
             {
-                _messageText = new String(messageData.array(), messageData.arrayOffset() + messageData.position(),
-                                          messageData.remaining(), CHARACTER_ENCODING);
+                _messageText = getString();
             }
-            catch (UnsupportedEncodingException e)
+            catch (Exception e)
             {
                 throw new QpidException("Problem when decoding text", null, e);
             }
@@ -142,6 +143,186 @@
     {
         super.clearBody();
         _messageText = null;
+    }
+
+    /**
+     * This method is taken from Mina code
+     * 
+     * Reads a <code>NUL</code>-terminated string from this buffer using the
+     * specified <code>decoder</code> and returns it.  This method reads
+     * until the limit of this buffer if no <tt>NUL</tt> is found.
+     *
+     * @return
+     * @throws java.nio.charset.CharacterCodingException
+     *
+     */
+    public String getString() throws CharacterCodingException
+    {
+        if (!getMessageData().hasRemaining())
+        {
+            return "";
+        }
+        Charset charset = Charset.forName(CHARACTER_ENCODING);
+        CharsetDecoder decoder = charset.newDecoder();
+
+        boolean utf16 = decoder.charset().name().startsWith("UTF-16");
+
+        int oldPos = getMessageData().position();
+        int oldLimit = getMessageData().limit();
+        int end = -1;
+        int newPos;
+
+        if (!utf16)
+        {
+            end = indexOf((byte) 0x00);
+            if (end < 0)
+            {
+                newPos = end = oldLimit;
+            }
+            else
+            {
+                newPos = end + 1;
+            }
+        }
+        else
+        {
+            int i = oldPos;
+            for (; ;)
+            {
+                boolean wasZero = getMessageData().get(i) == 0;
+                i++;
+
+                if (i >= oldLimit)
+                {
+                    break;
+                }
+
+                if (getMessageData().get(i) != 0)
+                {
+                    i++;
+                    if (i >= oldLimit)
+                    {
+                        break;
+                    }
+                    else
+                    {
+                        continue;
+                    }
+                }
+
+                if (wasZero)
+                {
+                    end = i - 1;
+                    break;
+                }
+            }
+
+            if (end < 0)
+            {
+                newPos = end = oldPos + ((oldLimit - oldPos) & 0xFFFFFFFE);
+            }
+            else
+            {
+                if (end + 2 <= oldLimit)
+                {
+                    newPos = end + 2;
+                }
+                else
+                {
+                    newPos = end;
+                }
+            }
+        }
+
+        if (oldPos == end)
+        {
+            getMessageData().position(newPos);
+            return "";
+        }
+
+        getMessageData().limit(end);
+        decoder.reset();
+
+        int expectedLength = (int) (getMessageData().remaining() * decoder.averageCharsPerByte()) + 1;
+        CharBuffer out = CharBuffer.allocate(expectedLength);
+        for (; ;)
+        {
+            CoderResult cr;
+            if (getMessageData().hasRemaining())
+            {
+                cr = decoder.decode(getMessageData(), out, true);
+            }
+            else
+            {
+                cr = decoder.flush(out);
+            }
+
+            if (cr.isUnderflow())
+            {
+                break;
+            }
+
+            if (cr.isOverflow())
+            {
+                CharBuffer o = CharBuffer.allocate(out.capacity() + expectedLength);
+                out.flip();
+                o.put(out);
+                out = o;
+                continue;
+            }
+
+            if (cr.isError())
+            {
+                // Revert the buffer back to the previous state.
+                getMessageData().limit(oldLimit);
+                getMessageData().position(oldPos);
+                cr.throwException();
+            }
+        }
+
+        getMessageData().limit(oldLimit);
+        getMessageData().position(newPos);
+        return out.flip().toString();
+    }
+
+    /**
+     * Returns the first occurence position of the specified byte from the current position to
+     * the current limit.
+     *
+     * @return <tt>-1</tt> if the specified byte is not found
+     * @param b
+     */
+    public int indexOf(byte b)
+    {
+        if (getMessageData().hasArray())
+        {
+            int arrayOffset = getMessageData().arrayOffset();
+            int beginPos = arrayOffset + getMessageData().position();
+            int limit = arrayOffset + getMessageData().limit();
+            byte[] array = getMessageData().array();
+
+            for (int i = beginPos; i < limit; i++)
+            {
+                if (array[i] == b)
+                {
+                    return i - arrayOffset;
+                }
+            }
+        }
+        else
+        {
+            int beginPos = getMessageData().position();
+            int limit = getMessageData().limit();
+
+            for (int i = beginPos; i < limit; i++)
+            {
+                if (getMessageData().get(i) == b)
+                {
+                    return i;
+                }
+            }
+        }
+        return -1;
     }
 }