You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/05/31 20:53:48 UTC
svn commit: r1488353 - in /activemq/trunk:
activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/
activemq-client/src/main/java/org/apache/activemq/command/
activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/
Author: chirino
Date: Fri May 31 18:53:48 2013
New Revision: 1488353
URL: http://svn.apache.org/r1488353
Log:
Fix for AMQ-4563: Changes the KahaDB store to use a more consistent key for message ids. MessageId.toString can change depending on how the message was encoded.
Support storing the externally generated message id of a message in the MessageID class so that Selectors can operate against that external message id.
Modified:
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMessage.java
activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java
activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java?rev=1488353&r1=1488352&r2=1488353&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java Fri May 31 18:53:48 2013
@@ -512,9 +512,10 @@ class AmqpProtocolConverter {
message.setJMSDestination(destination);
}
message.setProducerId(producerId);
- if (message.getMessageId() == null) {
- message.setMessageId(new MessageId(producerId, messageIdGenerator.getNextSequenceId()));
- }
+
+ MessageId messageId = message.getMessageId();
+ messageId.setProducerId(producerId);
+ messageId.setProducerSequenceId(messageIdGenerator.getNextSequenceId());
if (LOG.isTraceEnabled()) {
LOG.trace("Inbound Message:{} from Producer:{}", message.getMessageId(), producerId);
Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMessage.java?rev=1488353&r1=1488352&r2=1488353&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMessage.java (original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMessage.java Fri May 31 18:53:48 2013
@@ -141,7 +141,7 @@ public class ActiveMQMessage extends Mes
// so lets set the IDs to be 1
MessageId id = new MessageId();
id.setTextView(value);
- this.setMessageId(messageId);
+ this.setMessageId(id);
}
} else {
this.setMessageId(null);
Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java?rev=1488353&r1=1488352&r2=1488353&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java (original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/MessageId.java Fri May 31 18:53:48 2013
@@ -26,6 +26,7 @@ public class MessageId implements DataSt
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_ID;
+ protected String textView;
protected ProducerId producerId;
protected long producerSequenceId;
protected long brokerSequenceId;
@@ -69,6 +70,8 @@ public class MessageId implements DataSt
if (p >= 0) {
producerSequenceId = Long.parseLong(messageKey.substring(p + 1));
messageKey = messageKey.substring(0, p);
+ } else {
+ throw new NumberFormatException();
}
producerId = new ProducerId(messageKey);
}
@@ -79,9 +82,17 @@ public class MessageId implements DataSt
* accommodate foreign JMS message IDs
*/
public void setTextView(String key) {
+ this.textView = key;
this.key = key;
}
+ /**
+ * @return
+ */
+ public String getTextView() {
+ return textView;
+ }
+
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
}
@@ -105,9 +116,21 @@ public class MessageId implements DataSt
return hashCode;
}
+ public String toProducerKey() {
+ if( textView==null ) {
+ return toString();
+ } else {
+ return producerId.toString() + ":" + producerSequenceId;
+ }
+ }
+
public String toString() {
if (key == null) {
- key = producerId.toString() + ":" + producerSequenceId;
+ if( textView!=null ) {
+ key = textView;
+ } else {
+ key = producerId.toString() + ":" + producerSequenceId;
+ }
}
return key;
}
Modified: activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=1488353&r1=1488352&r2=1488353&view=diff
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Fri May 31 18:53:48 2013
@@ -423,7 +423,7 @@ public class KahaDBStore extends Message
public void addMessage(ConnectionContext context, Message message) throws IOException {
KahaAddMessageCommand command = new KahaAddMessageCommand();
command.setDestination(dest);
- command.setMessageId(message.getMessageId().toString());
+ command.setMessageId(message.getMessageId().toProducerKey());
command.setTransactionInfo(transactionIdTransformer.transform(message.getTransactionId()));
command.setPriority(message.getPriority());
command.setPrioritySupported(isPrioritizedMessages());
@@ -436,7 +436,7 @@ public class KahaDBStore extends Message
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
command.setDestination(dest);
- command.setMessageId(ack.getLastMessageId().toString());
+ command.setMessageId(ack.getLastMessageId().toProducerKey());
command.setTransactionInfo(transactionIdTransformer.transform(ack.getTransactionId()));
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
@@ -451,7 +451,7 @@ public class KahaDBStore extends Message
}
public Message getMessage(MessageId identity) throws IOException {
- final String key = identity.toString();
+ final String key = identity.toProducerKey();
// Hopefully one day the page file supports concurrent read
// operations... but for now we must
@@ -590,7 +590,7 @@ public class KahaDBStore extends Message
@Override
public void setBatch(MessageId identity) throws IOException {
try {
- final String key = identity.toString();
+ final String key = identity.toProducerKey();
lockAsyncJobQueue();
// Hopefully one day the page file supports concurrent read
@@ -707,7 +707,7 @@ public class KahaDBStore extends Message
KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
command.setDestination(dest);
command.setSubscriptionKey(subscriptionKey);
- command.setMessageId(messageId.toString());
+ command.setMessageId(messageId.toProducerKey());
command.setTransactionInfo(ack != null ? transactionIdTransformer.transform(ack.getTransactionId()) : null);
if (ack != null && ack.isUnmatchedAck()) {
command.setAck(UNMATCHED);
Modified: activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1488353&r1=1488352&r2=1488353&view=diff
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Fri May 31 18:53:48 2013
@@ -2268,7 +2268,7 @@ public abstract class MessageDatabase ex
this.indexLock.writeLock().lock();
try {
for (MessageAck ack : acks) {
- ackedAndPrepared.add(ack.getLastMessageId().toString());
+ ackedAndPrepared.add(ack.getLastMessageId().toProducerKey());
}
} finally {
this.indexLock.writeLock().unlock();
@@ -2280,7 +2280,7 @@ public abstract class MessageDatabase ex
this.indexLock.writeLock().lock();
try {
for (MessageAck ack : acks) {
- ackedAndPrepared.remove(ack.getLastMessageId().toString());
+ ackedAndPrepared.remove(ack.getLastMessageId().toProducerKey());
}
} finally {
this.indexLock.writeLock().unlock();
Modified: activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java?rev=1488353&r1=1488352&r2=1488353&view=diff
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java (original)
+++ activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java Fri May 31 18:53:48 2013
@@ -135,14 +135,14 @@ public class TempKahaDBStore extends Tem
public void addMessage(ConnectionContext context, Message message) throws IOException {
KahaAddMessageCommand command = new KahaAddMessageCommand();
command.setDestination(dest);
- command.setMessageId(message.getMessageId().toString());
+ command.setMessageId(message.getMessageId().toProducerKey());
processAdd(command, message.getTransactionId(), wireFormat.marshal(message));
}
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
command.setDestination(dest);
- command.setMessageId(ack.getLastMessageId().toString());
+ command.setMessageId(ack.getLastMessageId().toProducerKey());
processRemove(command, ack.getTransactionId());
}
@@ -153,7 +153,7 @@ public class TempKahaDBStore extends Tem
}
public Message getMessage(MessageId identity) throws IOException {
- final String key = identity.toString();
+ final String key = identity.toProducerKey();
// Hopefully one day the page file supports concurrent read operations... but for now we must
// externally synchronize...
@@ -241,7 +241,7 @@ public class TempKahaDBStore extends Tem
@Override
public void setBatch(MessageId identity) throws IOException {
- final String key = identity.toString();
+ final String key = identity.toProducerKey();
// Hopefully one day the page file supports concurrent read operations... but for now we must
// externally synchronize...
@@ -282,7 +282,7 @@ public class TempKahaDBStore extends Tem
KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
command.setDestination(dest);
command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
- command.setMessageId(messageId.toString());
+ command.setMessageId(messageId.toProducerKey());
// We are not passed a transaction info.. so we can't participate in a transaction.
// Looks like a design issue with the TopicMessageStore interface. Also we can't recover the original ack
// to pass back to the XA recover method.