You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/05/31 20:53:48 UTC

svn commit: r1488353 - in /activemq/trunk: activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ activemq-client/src/main/java/org/apache/activemq/command/ activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/

Author: chirino
Date: Fri May 31 18:53:48 2013
New Revision: 1488353

URL: http://svn.apache.org/r1488353
Log:
Fix for AMQ-4563: Changes the KahaDB store to use a more consistent key for message ids.  MessageId.toString can change depending on how the message was encoded.
Support storing the externally generated message id of a message in the MessageID class so that Selectors can operate against that external message id.

Modified:
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
    activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMessage.java
    activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java
    activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
    activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java?rev=1488353&r1=1488352&r2=1488353&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java Fri May 31 18:53:48 2013
@@ -512,9 +512,10 @@ class AmqpProtocolConverter {
                 message.setJMSDestination(destination);
             }
             message.setProducerId(producerId);
-            if (message.getMessageId() == null) {
-                message.setMessageId(new MessageId(producerId, messageIdGenerator.getNextSequenceId()));
-            }
+
+            MessageId messageId = message.getMessageId();
+            messageId.setProducerId(producerId);
+            messageId.setProducerSequenceId(messageIdGenerator.getNextSequenceId());
 
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Inbound Message:{} from Producer:{}", message.getMessageId(), producerId);

Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMessage.java?rev=1488353&r1=1488352&r2=1488353&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMessage.java (original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMessage.java Fri May 31 18:53:48 2013
@@ -141,7 +141,7 @@ public class ActiveMQMessage extends Mes
                 // so lets set the IDs to be 1
                 MessageId id = new MessageId();
                 id.setTextView(value);
-                this.setMessageId(messageId);
+                this.setMessageId(id);
             }
         } else {
             this.setMessageId(null);

Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java?rev=1488353&r1=1488352&r2=1488353&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java (original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java Fri May 31 18:53:48 2013
@@ -26,6 +26,7 @@ public class MessageId implements DataSt
 
     public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_ID;
 
+    protected String textView;
     protected ProducerId producerId;
     protected long producerSequenceId;
     protected long brokerSequenceId;
@@ -69,6 +70,8 @@ public class MessageId implements DataSt
         if (p >= 0) {
             producerSequenceId = Long.parseLong(messageKey.substring(p + 1));
             messageKey = messageKey.substring(0, p);
+        } else {
+            throw new NumberFormatException();
         }
         producerId = new ProducerId(messageKey);
     }
@@ -79,9 +82,17 @@ public class MessageId implements DataSt
      * accommodate foreign JMS message IDs
      */
     public void setTextView(String key) {
+        this.textView = key;
         this.key = key;
     }
 
+    /**
+     * @return
+     */
+    public String getTextView() {
+        return textView;
+    }
+
     public byte getDataStructureType() {
         return DATA_STRUCTURE_TYPE;
     }
@@ -105,9 +116,21 @@ public class MessageId implements DataSt
         return hashCode;
     }
 
+    public String toProducerKey() {
+        if( textView==null ) {
+            return toString();
+        } else {
+            return producerId.toString() + ":" + producerSequenceId;
+        }
+    }
+
     public String toString() {
         if (key == null) {
-            key = producerId.toString() + ":" + producerSequenceId;
+            if( textView!=null ) {
+                key = textView;
+            } else {
+                key = producerId.toString() + ":" + producerSequenceId;
+            }
         }
         return key;
     }

Modified: activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=1488353&r1=1488352&r2=1488353&view=diff
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Fri May 31 18:53:48 2013
@@ -423,7 +423,7 @@ public class KahaDBStore extends Message
         public void addMessage(ConnectionContext context, Message message) throws IOException {
             KahaAddMessageCommand command = new KahaAddMessageCommand();
             command.setDestination(dest);
-            command.setMessageId(message.getMessageId().toString());
+            command.setMessageId(message.getMessageId().toProducerKey());
             command.setTransactionInfo(transactionIdTransformer.transform(message.getTransactionId()));
             command.setPriority(message.getPriority());
             command.setPrioritySupported(isPrioritizedMessages());
@@ -436,7 +436,7 @@ public class KahaDBStore extends Message
         public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
             KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
             command.setDestination(dest);
-            command.setMessageId(ack.getLastMessageId().toString());
+            command.setMessageId(ack.getLastMessageId().toProducerKey());
             command.setTransactionInfo(transactionIdTransformer.transform(ack.getTransactionId()));
 
             org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
@@ -451,7 +451,7 @@ public class KahaDBStore extends Message
         }
 
         public Message getMessage(MessageId identity) throws IOException {
-            final String key = identity.toString();
+            final String key = identity.toProducerKey();
 
             // Hopefully one day the page file supports concurrent read
             // operations... but for now we must
@@ -590,7 +590,7 @@ public class KahaDBStore extends Message
         @Override
         public void setBatch(MessageId identity) throws IOException {
             try {
-                final String key = identity.toString();
+                final String key = identity.toProducerKey();
                 lockAsyncJobQueue();
 
                 // Hopefully one day the page file supports concurrent read
@@ -707,7 +707,7 @@ public class KahaDBStore extends Message
             KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
             command.setDestination(dest);
             command.setSubscriptionKey(subscriptionKey);
-            command.setMessageId(messageId.toString());
+            command.setMessageId(messageId.toProducerKey());
             command.setTransactionInfo(ack != null ? transactionIdTransformer.transform(ack.getTransactionId()) : null);
             if (ack != null && ack.isUnmatchedAck()) {
                 command.setAck(UNMATCHED);

Modified: activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1488353&r1=1488352&r2=1488353&view=diff
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Fri May 31 18:53:48 2013
@@ -2268,7 +2268,7 @@ public abstract class MessageDatabase ex
         this.indexLock.writeLock().lock();
         try {
             for (MessageAck ack : acks) {
-                ackedAndPrepared.add(ack.getLastMessageId().toString());
+                ackedAndPrepared.add(ack.getLastMessageId().toProducerKey());
             }
         } finally {
             this.indexLock.writeLock().unlock();
@@ -2280,7 +2280,7 @@ public abstract class MessageDatabase ex
             this.indexLock.writeLock().lock();
             try {
                 for (MessageAck ack : acks) {
-                    ackedAndPrepared.remove(ack.getLastMessageId().toString());
+                    ackedAndPrepared.remove(ack.getLastMessageId().toProducerKey());
                 }
             } finally {
                 this.indexLock.writeLock().unlock();

Modified: activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java?rev=1488353&r1=1488352&r2=1488353&view=diff
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java (original)
+++ activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java Fri May 31 18:53:48 2013
@@ -135,14 +135,14 @@ public class TempKahaDBStore extends Tem
         public void addMessage(ConnectionContext context, Message message) throws IOException {
             KahaAddMessageCommand command = new KahaAddMessageCommand();
             command.setDestination(dest);
-            command.setMessageId(message.getMessageId().toString());
+            command.setMessageId(message.getMessageId().toProducerKey());
             processAdd(command, message.getTransactionId(), wireFormat.marshal(message));
         }
         
         public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
             KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
             command.setDestination(dest);
-            command.setMessageId(ack.getLastMessageId().toString());
+            command.setMessageId(ack.getLastMessageId().toProducerKey());
             processRemove(command, ack.getTransactionId());
         }
 
@@ -153,7 +153,7 @@ public class TempKahaDBStore extends Tem
         }
 
         public Message getMessage(MessageId identity) throws IOException {
-            final String key = identity.toString();
+            final String key = identity.toProducerKey();
             
             // Hopefully one day the page file supports concurrent read operations... but for now we must
             // externally synchronize...
@@ -241,7 +241,7 @@ public class TempKahaDBStore extends Tem
         
         @Override
         public void setBatch(MessageId identity) throws IOException {
-            final String key = identity.toString();
+            final String key = identity.toProducerKey();
             
             // Hopefully one day the page file supports concurrent read operations... but for now we must
             // externally synchronize...
@@ -282,7 +282,7 @@ public class TempKahaDBStore extends Tem
             KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
             command.setDestination(dest);
             command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
-            command.setMessageId(messageId.toString());
+            command.setMessageId(messageId.toProducerKey());
             // We are not passed a transaction info.. so we can't participate in a transaction.
             // Looks like a design issue with the TopicMessageStore interface.  Also we can't recover the original ack
             // to pass back to the XA recover method.