You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/07/08 16:23:23 UTC
svn commit: r961783 [1/2] - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/command/ main/java/org/apache/activemq/ope...
Author: gtully
Date: Thu Jul 8 14:23:21 2010
New Revision: 961783
URL: http://svn.apache.org/viewvc?rev=961783&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2800, https://issues.apache.org/activemq/browse/AMQ-2542, https://issues.apache.org/activemq/browse/AMQ-2803 - implement duplicate checker in transport for a failover: reconnect, uses last seqid from store. iimplemented for kahaDB and JDBC
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionInfoMarshaller.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/Visitor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArray.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java
activemq/trunk/activemq-core/src/main/proto/journal-data.proto
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/PerDestinationStoreLimitTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v6/ConnectionInfoTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StoreOrderTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCStoreOrderTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/BitArrayBinTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java Thu Jul 8 14:23:21 2010
@@ -16,35 +16,23 @@
*/
package org.apache.activemq;
-import javax.jms.JMSException;
-import javax.jms.Message;
-
-import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.MessageId;
-import org.apache.activemq.command.ProducerId;
-import org.apache.activemq.util.BitArrayBin;
-import org.apache.activemq.util.IdGenerator;
-import org.apache.activemq.util.LRUCache;
/**
* Provides basic audit functions for Messages
*
* @version $Revision: 1.1.1.1 $
*/
-public class ActiveMQMessageAudit {
+public class ActiveMQMessageAudit extends ActiveMQMessageAuditNoSync {
- public static final int DEFAULT_WINDOW_SIZE = 2048;
- public static final int MAXIMUM_PRODUCER_COUNT = 64;
- private int auditDepth;
- private int maximumNumberOfProducersToTrack;
- private LRUCache<Object, BitArrayBin> map;
+ private static final long serialVersionUID = 1L;
/**
* Default Constructor windowSize = 2048, maximumNumberOfProducersToTrack =
* 64
*/
public ActiveMQMessageAudit() {
- this(DEFAULT_WINDOW_SIZE, MAXIMUM_PRODUCER_COUNT);
+ super();
}
/**
@@ -55,198 +43,41 @@ public class ActiveMQMessageAudit {
* the system
*/
public ActiveMQMessageAudit(int auditDepth, final int maximumNumberOfProducersToTrack) {
- this.auditDepth = auditDepth;
- this.maximumNumberOfProducersToTrack=maximumNumberOfProducersToTrack;
- this.map = new LRUCache<Object, BitArrayBin>(0, maximumNumberOfProducersToTrack, 0.75f, true);
+ super(auditDepth, maximumNumberOfProducersToTrack);
}
- /**
- * @return the auditDepth
- */
- public int getAuditDepth() {
- return auditDepth;
- }
-
- /**
- * @param auditDepth the auditDepth to set
- */
- public void setAuditDepth(int auditDepth) {
- this.auditDepth = auditDepth;
- }
-
- /**
- * @return the maximumNumberOfProducersToTrack
- */
- public int getMaximumNumberOfProducersToTrack() {
- return maximumNumberOfProducersToTrack;
- }
-
- /**
- * @param maximumNumberOfProducersToTrack the maximumNumberOfProducersToTrack to set
- */
- public void setMaximumNumberOfProducersToTrack(
- int maximumNumberOfProducersToTrack) {
- this.maximumNumberOfProducersToTrack = maximumNumberOfProducersToTrack;
- this.map.setMaxCacheSize(maximumNumberOfProducersToTrack);
- }
-
- /**
- * Checks if this message has been seen before
- *
- * @param message
- * @return true if the message is a duplicate
- * @throws JMSException
- */
- public boolean isDuplicate(Message message) throws JMSException {
- return isDuplicate(message.getJMSMessageID());
- }
-
- /**
- * checks whether this messageId has been seen before and adds this
- * messageId to the list
- *
- * @param id
- * @return true if the message is a duplicate
- */
- public synchronized boolean isDuplicate(String id) {
- boolean answer = false;
- String seed = IdGenerator.getSeedFromId(id);
- if (seed != null) {
- BitArrayBin bab = map.get(seed);
- if (bab == null) {
- bab = new BitArrayBin(auditDepth);
- map.put(seed, bab);
- }
- long index = IdGenerator.getSequenceFromId(id);
- if (index >= 0) {
- answer = bab.setBit(index, true);
- }
+ @Override
+ public boolean isDuplicate(String id) {
+ synchronized (this) {
+ return super.isDuplicate(id);
}
- return answer;
}
- /**
- * Checks if this message has been seen before
- *
- * @param message
- * @return true if the message is a duplicate
- */
- public boolean isDuplicate(final MessageReference message) {
- MessageId id = message.getMessageId();
- return isDuplicate(id);
- }
-
- /**
- * Checks if this messageId has been seen before
- *
- * @param id
- * @return true if the message is a duplicate
- */
- public synchronized boolean isDuplicate(final MessageId id) {
- boolean answer = false;
-
- if (id != null) {
- ProducerId pid = id.getProducerId();
- if (pid != null) {
- BitArrayBin bab = map.get(pid);
- if (bab == null) {
- bab = new BitArrayBin(auditDepth);
- map.put(pid, bab);
- }
- answer = bab.setBit(id.getProducerSequenceId(), true);
- }
+ @Override
+ public boolean isDuplicate(final MessageId id) {
+ synchronized (this) {
+ return super.isDuplicate(id);
}
- return answer;
}
- /**
- * mark this message as being received
- *
- * @param message
- */
- public void rollback(final MessageReference message) {
- MessageId id = message.getMessageId();
- rollback(id);
- }
-
- /**
- * mark this message as being received
- *
- * @param id
- */
- public synchronized void rollback(final MessageId id) {
- if (id != null) {
- ProducerId pid = id.getProducerId();
- if (pid != null) {
- BitArrayBin bab = map.get(pid);
- if (bab != null) {
- bab.setBit(id.getProducerSequenceId(), false);
- }
- }
+ @Override
+ public void rollback(final MessageId id) {
+ synchronized (this) {
+ super.rollback(id);
}
}
- /**
- * Check the message is in order
- * @param msg
- * @return
- * @throws JMSException
- */
- public boolean isInOrder(Message msg) throws JMSException {
- return isInOrder(msg.getJMSMessageID());
- }
-
- /**
- * Check the message id is in order
- * @param id
- * @return
- */
- public synchronized boolean isInOrder(final String id) {
- boolean answer = true;
-
- if (id != null) {
- String seed = IdGenerator.getSeedFromId(id);
- if (seed != null) {
- BitArrayBin bab = map.get(seed);
- if (bab != null) {
- long index = IdGenerator.getSequenceFromId(id);
- answer = bab.isInOrder(index);
- }
-
- }
+ @Override
+ public boolean isInOrder(final String id) {
+ synchronized (this) {
+ return super.isInOrder(id);
}
- return answer;
}
- /**
- * Check the MessageId is in order
- * @param message
- * @return
- */
- public synchronized boolean isInOrder(final MessageReference message) {
- return isInOrder(message.getMessageId());
- }
-
- /**
- * Check the MessageId is in order
- * @param id
- * @return
- */
- public synchronized boolean isInOrder(final MessageId id) {
- boolean answer = false;
-
- if (id != null) {
- ProducerId pid = id.getProducerId();
- if (pid != null) {
- BitArrayBin bab = map.get(pid);
- if (bab == null) {
- bab = new BitArrayBin(auditDepth);
- map.put(pid, bab);
- }
- answer = bab.isInOrder(id.getProducerSequenceId());
-
- }
+ @Override
+ public boolean isInOrder(final MessageId id) {
+ synchronized (this) {
+ return isInOrder(id);
}
- return answer;
}
}
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java?rev=961783&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java Thu Jul 8 14:23:21 2010
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.Serializable;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.util.BitArrayBin;
+import org.apache.activemq.util.IdGenerator;
+import org.apache.activemq.util.LRUCache;
+
+/**
+ * Provides basic audit functions for Messages without sync
+ *
+ * @version $Revision$
+ */
+public class ActiveMQMessageAuditNoSync implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final int DEFAULT_WINDOW_SIZE = 2048;
+ public static final int MAXIMUM_PRODUCER_COUNT = 64;
+ private int auditDepth;
+ private int maximumNumberOfProducersToTrack;
+ private LRUCache<Object, BitArrayBin> map;
+
+ /**
+ * Default Constructor windowSize = 2048, maximumNumberOfProducersToTrack =
+ * 64
+ */
+ public ActiveMQMessageAuditNoSync() {
+ this(DEFAULT_WINDOW_SIZE, MAXIMUM_PRODUCER_COUNT);
+ }
+
+ /**
+ * Construct a MessageAudit
+ *
+ * @param auditDepth range of ids to track
+ * @param maximumNumberOfProducersToTrack number of producers expected in
+ * the system
+ */
+ public ActiveMQMessageAuditNoSync(int auditDepth, final int maximumNumberOfProducersToTrack) {
+ this.auditDepth = auditDepth;
+ this.maximumNumberOfProducersToTrack=maximumNumberOfProducersToTrack;
+ this.map = new LRUCache<Object, BitArrayBin>(0, maximumNumberOfProducersToTrack, 0.75f, true);
+ }
+
+ /**
+ * @return the auditDepth
+ */
+ public int getAuditDepth() {
+ return auditDepth;
+ }
+
+ /**
+ * @param auditDepth the auditDepth to set
+ */
+ public void setAuditDepth(int auditDepth) {
+ this.auditDepth = auditDepth;
+ }
+
+ /**
+ * @return the maximumNumberOfProducersToTrack
+ */
+ public int getMaximumNumberOfProducersToTrack() {
+ return maximumNumberOfProducersToTrack;
+ }
+
+ /**
+ * @param maximumNumberOfProducersToTrack the maximumNumberOfProducersToTrack to set
+ */
+ public void setMaximumNumberOfProducersToTrack(
+ int maximumNumberOfProducersToTrack) {
+ this.maximumNumberOfProducersToTrack = maximumNumberOfProducersToTrack;
+ this.map.setMaxCacheSize(maximumNumberOfProducersToTrack);
+ }
+
+ /**
+ * Checks if this message has been seen before
+ *
+ * @param message
+ * @return true if the message is a duplicate
+ * @throws JMSException
+ */
+ public boolean isDuplicate(Message message) throws JMSException {
+ return isDuplicate(message.getJMSMessageID());
+ }
+
+ /**
+ * checks whether this messageId has been seen before and adds this
+ * messageId to the list
+ *
+ * @param id
+ * @return true if the message is a duplicate
+ */
+ public boolean isDuplicate(String id) {
+ boolean answer = false;
+ String seed = IdGenerator.getSeedFromId(id);
+ if (seed != null) {
+ BitArrayBin bab = map.get(seed);
+ if (bab == null) {
+ bab = new BitArrayBin(auditDepth);
+ map.put(seed, bab);
+ }
+ long index = IdGenerator.getSequenceFromId(id);
+ if (index >= 0) {
+ answer = bab.setBit(index, true);
+ }
+ }
+ return answer;
+ }
+
+ /**
+ * Checks if this message has been seen before
+ *
+ * @param message
+ * @return true if the message is a duplicate
+ */
+ public boolean isDuplicate(final MessageReference message) {
+ MessageId id = message.getMessageId();
+ return isDuplicate(id);
+ }
+
+ /**
+ * Checks if this messageId has been seen before
+ *
+ * @param id
+ * @return true if the message is a duplicate
+ */
+ public boolean isDuplicate(final MessageId id) {
+ boolean answer = false;
+
+ if (id != null) {
+ ProducerId pid = id.getProducerId();
+ if (pid != null) {
+ BitArrayBin bab = map.get(pid);
+ if (bab == null) {
+ bab = new BitArrayBin(auditDepth);
+ map.put(pid, bab);
+ }
+ answer = bab.setBit(id.getProducerSequenceId(), true);
+ }
+ }
+ return answer;
+ }
+
+ /**
+ * mark this message as being received
+ *
+ * @param message
+ */
+ public void rollback(final MessageReference message) {
+ MessageId id = message.getMessageId();
+ rollback(id);
+ }
+
+ /**
+ * mark this message as being received
+ *
+ * @param id
+ */
+ public void rollback(final MessageId id) {
+ if (id != null) {
+ ProducerId pid = id.getProducerId();
+ if (pid != null) {
+ BitArrayBin bab = map.get(pid);
+ if (bab != null) {
+ bab.setBit(id.getProducerSequenceId(), false);
+ }
+ }
+ }
+ }
+
+ /**
+ * Check the message is in order
+ * @param msg
+ * @return
+ * @throws JMSException
+ */
+ public boolean isInOrder(Message msg) throws JMSException {
+ return isInOrder(msg.getJMSMessageID());
+ }
+
+ /**
+ * Check the message id is in order
+ * @param id
+ * @return
+ */
+ public boolean isInOrder(final String id) {
+ boolean answer = true;
+
+ if (id != null) {
+ String seed = IdGenerator.getSeedFromId(id);
+ if (seed != null) {
+ BitArrayBin bab = map.get(seed);
+ if (bab != null) {
+ long index = IdGenerator.getSequenceFromId(id);
+ answer = bab.isInOrder(index);
+ }
+
+ }
+ }
+ return answer;
+ }
+
+ /**
+ * Check the MessageId is in order
+ * @param message
+ * @return
+ */
+ public boolean isInOrder(final MessageReference message) {
+ return isInOrder(message.getMessageId());
+ }
+
+ /**
+ * Check the MessageId is in order
+ * @param id
+ * @return
+ */
+ public boolean isInOrder(final MessageId id) {
+ boolean answer = false;
+
+ if (id != null) {
+ ProducerId pid = id.getProducerId();
+ if (pid != null) {
+ BitArrayBin bab = map.get(pid);
+ if (bab == null) {
+ bab = new BitArrayBin(auditDepth);
+ map.put(pid, bab);
+ }
+ answer = bab.isInOrder(id.getProducerSequenceId());
+
+ }
+ }
+ return answer;
+ }
+
+ public long getLastSeqId(ProducerId id) {
+ long result = -1;
+ BitArrayBin bab = map.get(id.toString() + ":");
+ if (bab != null) {
+ result = bab.getLastSetIndex();
+ }
+ return result;
+ }
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java
------------------------------------------------------------------------------
svn:executable = *
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java Thu Jul 8 14:23:21 2010
@@ -47,7 +47,7 @@ public class ConnectionContext {
private ConnectionId connectionId;
private String clientId;
private String userName;
- private boolean haAware;
+ private boolean reconnect;
private WireFormatInfo wireFormatInfo;
private Object longTermStoreContext;
private boolean producerFlowControl = true;
@@ -86,7 +86,7 @@ public class ConnectionContext {
rc.connectionId = this.connectionId;
rc.clientId = this.clientId;
rc.userName = this.userName;
- rc.haAware = this.haAware;
+ rc.reconnect = this.reconnect;
rc.wireFormatInfo = this.wireFormatInfo;
rc.longTermStoreContext = this.longTermStoreContext;
rc.producerFlowControl = this.producerFlowControl;
@@ -212,12 +212,12 @@ public class ConnectionContext {
this.clientId = clientId;
}
- public boolean isHaAware() {
- return haAware;
+ public boolean isReconnect() {
+ return reconnect;
}
- public void setHaAware(boolean haAware) {
- this.haAware = haAware;
+ public void setReconnect(boolean reconnect) {
+ this.reconnect = reconnect;
}
public WireFormatInfo getWireFormatInfo() {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java Thu Jul 8 14:23:21 2010
@@ -18,7 +18,10 @@ package org.apache.activemq.broker;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Region;
+import org.apache.activemq.command.Message;
import org.apache.activemq.state.ProducerState;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* Holds internal state in the broker for a MessageProducer
@@ -27,11 +30,13 @@ import org.apache.activemq.state.Produce
*/
public class ProducerBrokerExchange {
+ private static final Log LOG = LogFactory.getLog(ProducerBrokerExchange.class);
private ConnectionContext connectionContext;
private Destination regionDestination;
private Region region;
private ProducerState producerState;
private boolean mutable = true;
+ private long lastSendSequenceNumber = -1;
public ProducerBrokerExchange() {
}
@@ -117,4 +122,25 @@ public class ProducerBrokerExchange {
this.producerState = producerState;
}
+ /**
+ * Enforce duplicate suppression using info from persistence adapter
+ * @param messageSend
+ * @return false if message should be ignored as a duplicate
+ */
+ public boolean canDispatch(Message messageSend) {
+ boolean canDispatch = true;
+ if (lastSendSequenceNumber > 0) {
+ if (messageSend.getMessageId().getProducerSequenceId() <= lastSendSequenceNumber) {
+ canDispatch = false;
+ LOG.debug("suppressing duplicate message send [" + messageSend.getMessageId() + "] with producerSequenceId ["
+ + messageSend.getMessageId().getProducerSequenceId() + "] less than last stored: " + lastSendSequenceNumber);
+ }
+ }
+ return canDispatch;
+ }
+
+ public void setLastStoredSequenceId(long l) {
+ lastSendSequenceNumber = l;
+ LOG.debug("last stored sequence id set: " + l);
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Thu Jul 8 14:23:21 2010
@@ -453,7 +453,9 @@ public class TransportConnection impleme
public Response processMessage(Message messageSend) throws Exception {
ProducerId producerId = messageSend.getProducerId();
ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
- broker.send(producerExchange, messageSend);
+ if (producerExchange.canDispatch(messageSend)) {
+ broker.send(producerExchange, messageSend);
+ }
return null;
}
@@ -680,6 +682,7 @@ public class TransportConnection impleme
context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>());
context.setUserName(info.getUserName());
context.setWireFormatInfo(wireFormatInfo);
+ context.setReconnect(info.isFailoverReconnect());
this.manageable = info.isManageable();
state.setContext(context);
state.setConnection(this);
@@ -1249,13 +1252,16 @@ public class TransportConnection impleme
}
}
- private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) {
+ private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException {
ProducerBrokerExchange result = producerExchanges.get(id);
if (result == null) {
synchronized (producerExchanges) {
result = new ProducerBrokerExchange();
- TransportConnectionState state = lookupConnectionState(id);
+ TransportConnectionState state = lookupConnectionState(id);
context = state.getContext();
+ if (context.isReconnect()) {
+ result.setLastStoredSequenceId(broker.getBrokerService().getPersistenceAdapter().getLastProducerSequenceId(id));
+ }
result.setConnectionContext(context);
SessionState ss = state.getSessionState(id.getParentId());
if (ss != null) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Thu Jul 8 14:23:21 2010
@@ -374,6 +374,7 @@ public abstract class AbstractRegion imp
LOG.warn("Ack for non existent subscription, ack:" + ack);
throw new IllegalArgumentException("The subscription does not exist: " + ack.getConsumerId());
} else {
+ LOG.debug("Ack for non existent subscription in recovery, ack:" + ack);
return;
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java Thu Jul 8 14:23:21 2010
@@ -37,6 +37,7 @@ public class ConnectionInfo extends Base
protected boolean clientMaster = true;
protected boolean faultTolerant = false;
protected transient Object transportContext;
+ private boolean failoverReconnect;
public ConnectionInfo() {
}
@@ -216,4 +217,15 @@ public class ConnectionInfo extends Base
this.faultTolerant = faultTolerant;
}
+ /**
+ * @openwire:property version=6 cache=false
+ * @return failoverReconnect true if this is a reconnect
+ */
+ public boolean isFailoverReconnect() {
+ return this.failoverReconnect;
+ }
+
+ public void setFailoverReconnect(boolean failoverReconnect) {
+ this.failoverReconnect = failoverReconnect;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionInfoMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionInfoMarshaller.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionInfoMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionInfoMarshaller.java Thu Jul 8 14:23:21 2010
@@ -86,6 +86,7 @@ public class ConnectionInfoMarshaller ex
info.setManageable(bs.readBoolean());
info.setClientMaster(bs.readBoolean());
info.setFaultTolerant(bs.readBoolean());
+ info.setFailoverReconnect(bs.readBoolean());
}
@@ -107,6 +108,7 @@ public class ConnectionInfoMarshaller ex
bs.writeBoolean(info.isManageable());
bs.writeBoolean(info.isClientMaster());
bs.writeBoolean(info.isFaultTolerant());
+ bs.writeBoolean(info.isFailoverReconnect());
return rc + 0;
}
@@ -131,6 +133,7 @@ public class ConnectionInfoMarshaller ex
bs.readBoolean();
bs.readBoolean();
bs.readBoolean();
+ bs.readBoolean();
}
@@ -165,6 +168,7 @@ public class ConnectionInfoMarshaller ex
info.setManageable(dataIn.readBoolean());
info.setClientMaster(dataIn.readBoolean());
info.setFaultTolerant(dataIn.readBoolean());
+ info.setFailoverReconnect(dataIn.readBoolean());
}
@@ -186,6 +190,7 @@ public class ConnectionInfoMarshaller ex
dataOut.writeBoolean(info.isManageable());
dataOut.writeBoolean(info.isClientMaster());
dataOut.writeBoolean(info.isFaultTolerant());
+ dataOut.writeBoolean(info.isFailoverReconnect());
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java Thu Jul 8 14:23:21 2010
@@ -140,6 +140,7 @@ public class ConnectionStateTracker exte
// Restore the connections.
for (Iterator<ConnectionState> iter = connectionStates.values().iterator(); iter.hasNext();) {
ConnectionState connectionState = iter.next();
+ connectionState.getInfo().setFailoverReconnect(true);
if (LOG.isDebugEnabled()) {
LOG.debug("conn: " + connectionState.getInfo().getConnectionId());
}
@@ -156,6 +157,9 @@ public class ConnectionStateTracker exte
}
//now flush messages
for (Message msg:messageCache.values()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("message: " + msg.getMessageId());
+ }
transport.oneway(msg);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java Thu Jul 8 14:23:21 2010
@@ -26,6 +26,7 @@ import org.apache.activemq.broker.region
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ProducerId;
import org.apache.activemq.usage.SystemUsage;
/**
@@ -157,4 +158,13 @@ public interface PersistenceAdapter exte
* @return disk space used in bytes of 0 if not implemented
*/
long size();
+
+ /**
+ * return the last stored producer sequenceId for this producer Id
+ * used to suppress duplicate sends on failover reconnect at the transport
+ * when a reconnect occurs
+ * @param id the producerId to find a sequenceId for
+ * @return the last stored sequence id or -1 if no suppression needed
+ */
+ long getLastProducerSequenceId(ProducerId id) throws IOException;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Thu Jul 8 14:23:21 2010
@@ -43,6 +43,7 @@ import org.apache.activemq.command.Journ
import org.apache.activemq.command.JournalTrace;
import org.apache.activemq.command.JournalTransaction;
import org.apache.activemq.command.Message;
+import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.kaha.impl.async.AsyncDataManager;
@@ -1117,4 +1118,10 @@ public class AMQPersistenceAdapter imple
+ ".DisableLocking",
"false"));
}
+
+
+ public long getLastProducerSequenceId(ProducerId id) {
+ // reference store send has adequate duplicate suppression
+ return -1;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java Thu Jul 8 14:23:21 2010
@@ -21,6 +21,7 @@ import java.sql.SQLException;
import java.util.Set;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.SubscriptionInfo;
/**
@@ -60,7 +61,7 @@ public interface JDBCAdapter {
SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName) throws SQLException, IOException;
- long getStoreSequenceId(TransactionContext c, MessageId messageID) throws SQLException, IOException;
+ long getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException;
void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException, IOException;
@@ -85,4 +86,6 @@ public interface JDBCAdapter {
long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriberName) throws SQLException, IOException;
void doMessageIdScan(TransactionContext c, int limit, JDBCMessageIdScanListener listener) throws SQLException, IOException;
+
+ long doGetLastProducerSequenceId(TransactionContext c, ProducerId id) throws SQLException, IOException;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java Thu Jul 8 14:23:21 2010
@@ -285,7 +285,7 @@ public class JDBCMessageStore extends Ab
long result = -1;
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
- result = adapter.getStoreSequenceId(c, messageId);
+ result = adapter.getStoreSequenceId(c, destination, messageId);
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to get store sequenceId for messageId: " + messageId +", on: " + destination + ". Reason: " + e, e);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java Thu Jul 8 14:23:21 2010
@@ -37,6 +37,7 @@ import org.apache.activemq.command.Activ
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
@@ -93,7 +94,7 @@ public class JDBCPersistenceAdapter exte
protected int maxProducersToAudit=1024;
protected int maxAuditDepth=1000;
- protected boolean enableAudit=true;
+ protected boolean enableAudit=false;
protected int auditRecoveryDepth = 1024;
protected ActiveMQMessageAudit audit;
@@ -245,6 +246,19 @@ public class JDBCPersistenceAdapter exte
c.close();
}
}
+
+ public long getLastProducerSequenceId(ProducerId id) throws IOException {
+ TransactionContext c = getTransactionContext();
+ try {
+ return getAdapter().doGetLastProducerSequenceId(c, id);
+ } catch (SQLException e) {
+ JDBCPersistenceAdapter.log("JDBC Failure: ", e);
+ throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e);
+ } finally {
+ c.close();
+ }
+ }
+
public void start() throws Exception {
getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
@@ -699,6 +713,5 @@ public class JDBCPersistenceAdapter exte
synchronized(sequenceGenerator) {
return sequenceGenerator.getNextSequenceId();
}
- }
-
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java Thu Jul 8 14:23:21 2010
@@ -49,7 +49,7 @@ public class JDBCTopicMessageStore exten
// Get a connection and insert the message into the DB.
TransactionContext c = persistenceAdapter.getTransactionContext(context);
try {
- long seq = adapter.getStoreSequenceId(c, messageId);
+ long seq = adapter.getStoreSequenceId(c, destination, messageId);
adapter.doSetLastAck(c, destination, clientId, subscriptionName, seq);
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java Thu Jul 8 14:23:21 2010
@@ -67,6 +67,7 @@ public class Statements {
private String findNextMessagesStatement;
private boolean useLockCreateWhereClause;
private String findAllMessageIdsStatement;
+ private String lastProducerSequenceIdStatement;
public String[] getCreateSchemaStatements() {
if (createSchemaStatements == null) {
@@ -128,7 +129,7 @@ public class Statements {
public String getFindMessageSequenceIdStatement() {
if (findMessageSequenceIdStatement == null) {
findMessageSequenceIdStatement = "SELECT ID FROM " + getFullMessageTableName()
- + " WHERE MSGID_PROD=? AND MSGID_SEQ=?";
+ + " WHERE MSGID_PROD=? AND MSGID_SEQ=? AND CONTAINER=?";
}
return findMessageSequenceIdStatement;
}
@@ -172,6 +173,15 @@ public class Statements {
return findLastSequenceIdInMsgsStatement;
}
+ public String getLastProducerSequenceIdStatement() {
+ if (lastProducerSequenceIdStatement == null) {
+ lastProducerSequenceIdStatement = "SELECT MAX(MSGID_SEQ) FROM " + getFullMessageTableName()
+ + " WHERE MSGID_PROD=?";
+ }
+ return lastProducerSequenceIdStatement;
+ }
+
+
public String getFindLastSequenceIdInAcksStatement() {
if (findLastSequenceIdInAcksStatement == null) {
findLastSequenceIdInAcksStatement = "SELECT MAX(LAST_ACKED_ID) FROM " + getFullAckTableName();
@@ -656,4 +666,9 @@ public class Statements {
this.lastAckedDurableSubscriberMessageStatement = lastAckedDurableSubscriberMessageStatement;
}
+
+ public void setLastProducerSequenceIdStatement(String lastProducerSequenceIdStatement) {
+ this.lastProducerSequenceIdStatement = lastProducerSequenceIdStatement;
+ }
+
}
\ No newline at end of file
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Thu Jul 8 14:23:21 2010
@@ -28,6 +28,7 @@ import java.util.Set;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.jdbc.JDBCAdapter;
import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener;
@@ -246,13 +247,14 @@ public class DefaultJDBCAdapter implemen
}
}
- public long getStoreSequenceId(TransactionContext c, MessageId messageID) throws SQLException, IOException {
+ public long getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException {
PreparedStatement s = null;
ResultSet rs = null;
try {
s = c.getConnection().prepareStatement(this.statements.getFindMessageSequenceIdStatement());
s.setString(1, messageID.getProducerId().toString());
s.setLong(2, messageID.getProducerSequenceId());
+ s.setString(3, destination.getQualifiedName());
rs = s.executeQuery();
if (!rs.next()) {
return 0;
@@ -819,4 +821,23 @@ public class DefaultJDBCAdapter implemen
* try { s.close(); } catch (Throwable ignore) {} } }
*/
+ public long doGetLastProducerSequenceId(TransactionContext c, ProducerId id)
+ throws SQLException, IOException {
+ PreparedStatement s = null;
+ ResultSet rs = null;
+ try {
+ s = c.getConnection().prepareStatement(this.statements.getLastProducerSequenceIdStatement());
+ s.setString(1, id.toString());
+ rs = s.executeQuery();
+ long seq = -1;
+ if (rs.next()) {
+ seq = rs.getLong(1);
+ }
+ return seq;
+ } finally {
+ close(rs);
+ close(s);
+ }
+ }
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java Thu Jul 8 14:23:21 2010
@@ -50,6 +50,7 @@ import org.apache.activemq.command.Journ
import org.apache.activemq.command.JournalTransaction;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.ProducerId;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.MessageStore;
@@ -745,4 +746,8 @@ public class JournalPersistenceAdapter i
}
}
+ public long getLastProducerSequenceId(ProducerId id) {
+ return -1;
+ }
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java Thu Jul 8 14:23:21 2010
@@ -32,6 +32,7 @@ import org.apache.activemq.command.Activ
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
import org.apache.activemq.kaha.CommandMarshaller;
import org.apache.activemq.kaha.ContainerId;
import org.apache.activemq.kaha.ListContainer;
@@ -369,6 +370,11 @@ public class KahaPersistenceAdapter impl
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}
+
+ public long getLastProducerSequenceId(ProducerId id) {
+ // reference store send has adequate duplicate suppression
+ return -1;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java Thu Jul 8 14:23:21 2010
@@ -26,6 +26,7 @@ import org.apache.activemq.broker.Connec
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ProducerId;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
@@ -124,6 +125,10 @@ public class KahaDBPersistenceAdapter im
return this.letter.getLastMessageBrokerSequenceId();
}
+ public long getLastProducerSequenceId(ProducerId id) throws IOException {
+ return this.letter.getLastProducerSequenceId(id);
+ }
+
/**
* @param destination
* @see org.apache.activemq.store.PersistenceAdapter#removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
@@ -209,6 +214,29 @@ public class KahaDBPersistenceAdapter im
}
/**
+ * Set the max number of producers (LRU cache) to track for duplicate sends
+ */
+ public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
+ this.letter.setMaxFailoverProducersToTrack(maxFailoverProducersToTrack);
+ }
+
+ public int getMaxFailoverProducersToTrack() {
+ return this.letter.getMaxFailoverProducersToTrack();
+ }
+
+ /**
+ * set the audit window depth for duplicate suppression (should exceed the max transaction
+ * batch)
+ */
+ public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
+ this.letter.setFailoverProducersAuditDepth(failoverProducersAuditDepth);
+ }
+
+ public int getFailoverProducersAuditDepth() {
+ return this.getFailoverProducersAuditDepth();
+ }
+
+ /**
* Get the checkpointInterval
*
* @return the checkpointInterval
@@ -477,4 +505,5 @@ public class KahaDBPersistenceAdapter im
String path = getDirectory() != null ? getDirectory().toString() : "DIRECTORY_NOT_SET";
return "KahaDBPersistenceAdapter[" + path + "]";
}
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Thu Jul 8 14:23:21 2010
@@ -48,6 +48,7 @@ import org.apache.activemq.command.Local
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
@@ -363,6 +364,7 @@ public class KahaDBStore extends Message
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null, null);
+
}
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
@@ -901,6 +903,15 @@ public class KahaDBStore extends Message
public long getLastMessageBrokerSequenceId() throws IOException {
return 0;
}
+
+ public long getLastProducerSequenceId(ProducerId id) {
+ indexLock.readLock().lock();
+ try {
+ return metadata.producerSequenceIdTracker.getLastSeqId(id);
+ } finally {
+ indexLock.readLock().unlock();
+ }
+ }
public long size() {
if (!isStarted()) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Thu Jul 8 14:23:21 2010
@@ -16,11 +16,15 @@
*/
package org.apache.activemq.store.kahadb;
+import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
+import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
@@ -35,22 +39,25 @@ import java.util.TreeMap;
import java.util.TreeSet;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.activemq.ActiveMQMessageAuditNoSync;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
import org.apache.activemq.store.kahadb.data.KahaDestination;
import org.apache.activemq.store.kahadb.data.KahaEntryType;
import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
+import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
@@ -93,9 +100,9 @@ public class MessageDatabase extends Ser
private static final Log LOG = LogFactory.getLog(MessageDatabase.class);
private static final int DEFAULT_DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
- static final int CLOSED_STATE = 1;
- static final int OPEN_STATE = 2;
- static final long NOT_ACKED = -1;
+ static final int CLOSED_STATE = 1;
+ static final int OPEN_STATE = 2;
+ static final long NOT_ACKED = -1;
protected class Metadata {
@@ -104,6 +111,8 @@ public class MessageDatabase extends Ser
protected BTreeIndex<String, StoredDestination> destinations;
protected Location lastUpdate;
protected Location firstInProgressTransactionLocation;
+ protected Location producerSequenceIdTrackerLocation = null;
+ protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync();
public void read(DataInput is) throws IOException {
state = is.readInt();
@@ -118,6 +127,14 @@ public class MessageDatabase extends Ser
} else {
firstInProgressTransactionLocation = null;
}
+ try {
+ if (is.readBoolean()) {
+ producerSequenceIdTrackerLocation = LocationMarshaller.INSTANCE.readPayload(is);
+ } else {
+ producerSequenceIdTrackerLocation = null;
+ }
+ } catch (EOFException expectedOnUpgrade) {
+ }
}
public void write(DataOutput os) throws IOException {
@@ -137,6 +154,13 @@ public class MessageDatabase extends Ser
} else {
os.writeBoolean(false);
}
+
+ if (producerSequenceIdTrackerLocation != null) {
+ os.writeBoolean(true);
+ LocationMarshaller.INSTANCE.writePayload(producerSequenceIdTrackerLocation, os);
+ } else {
+ os.writeBoolean(false);
+ }
}
}
@@ -154,7 +178,7 @@ public class MessageDatabase extends Ser
protected PageFile pageFile;
protected Journal journal;
- protected Metadata metadata = new Metadata();
+ protected Metadata metadata = new Metadata();
protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller();
@@ -171,7 +195,8 @@ public class MessageDatabase extends Ser
int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
boolean enableIndexWriteAsync = false;
- int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
+ int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
+
protected AtomicBoolean opened = new AtomicBoolean();
private LockFile lockFile;
@@ -381,15 +406,15 @@ public class MessageDatabase extends Ser
private Location getFirstInProgressTxLocation() {
Location l = null;
synchronized (inflightTransactions) {
- if (!inflightTransactions.isEmpty()) {
- l = inflightTransactions.values().iterator().next().get(0).getLocation();
- }
- if (!preparedTransactions.isEmpty()) {
- Location t = preparedTransactions.values().iterator().next().get(0).getLocation();
- if (l==null || t.compareTo(l) <= 0) {
- l = t;
+ if (!inflightTransactions.isEmpty()) {
+ l = inflightTransactions.values().iterator().next().get(0).getLocation();
+ }
+ if (!preparedTransactions.isEmpty()) {
+ Location t = preparedTransactions.values().iterator().next().get(0).getLocation();
+ if (l==null || t.compareTo(l) <= 0) {
+ l = t;
+ }
}
- }
}
return l;
}
@@ -407,21 +432,25 @@ public class MessageDatabase extends Ser
try {
long start = System.currentTimeMillis();
- Location recoveryPosition = getRecoveryPosition();
- if( recoveryPosition!=null ) {
- int redoCounter = 0;
- LOG.info("Recoverying from the journal ...");
- while (recoveryPosition != null) {
- JournalCommand message = load(recoveryPosition);
- metadata.lastUpdate = recoveryPosition;
- process(message, recoveryPosition);
- redoCounter++;
- recoveryPosition = journal.getNextLocation(recoveryPosition);
- }
- long end = System.currentTimeMillis();
- LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
+ Location producerAuditPosition = recoverProducerAudit();
+ Location lastIndoubtPosition = getRecoveryPosition();
+
+ Location recoveryPosition = minimum(producerAuditPosition, lastIndoubtPosition);
+
+ if (recoveryPosition != null) {
+ int redoCounter = 0;
+ LOG.info("Recoverying from the journal ...");
+ while (recoveryPosition != null) {
+ JournalCommand<?> message = load(recoveryPosition);
+ metadata.lastUpdate = recoveryPosition;
+ process(message, recoveryPosition, lastIndoubtPosition);
+ redoCounter++;
+ recoveryPosition = journal.getNextLocation(recoveryPosition);
+ }
+ long end = System.currentTimeMillis();
+ LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
}
-
+
// We may have to undo some index updates.
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
@@ -433,7 +462,39 @@ public class MessageDatabase extends Ser
}
}
- protected void recoverIndex(Transaction tx) throws IOException {
+ private Location minimum(Location producerAuditPosition,
+ Location lastIndoubtPosition) {
+ Location min = null;
+ if (producerAuditPosition != null) {
+ min = producerAuditPosition;
+ if (lastIndoubtPosition != null && lastIndoubtPosition.compareTo(producerAuditPosition) < 0) {
+ min = lastIndoubtPosition;
+ }
+ } else {
+ min = lastIndoubtPosition;
+ }
+ return min;
+ }
+
+ private Location recoverProducerAudit() throws IOException {
+ if (metadata.producerSequenceIdTrackerLocation != null) {
+ KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation);
+ try {
+ ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput());
+ metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
+ } catch (ClassNotFoundException cfe) {
+ IOException ioe = new IOException("Failed to read producerAudit: " + cfe);
+ ioe.initCause(cfe);
+ throw ioe;
+ }
+ return journal.getNextLocation(metadata.producerSequenceIdTrackerLocation);
+ } else {
+ // got no audit stored so got to recreate via replay from start of the journal
+ return journal.getNextLocation(null);
+ }
+ }
+
+ protected void recoverIndex(Transaction tx) throws IOException {
long start = System.currentTimeMillis();
// It is possible index updates got applied before the journal updates..
// in that case we need to removed references to messages that are not in the journal
@@ -457,6 +518,7 @@ public class MessageDatabase extends Ser
MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
sd.locationIndex.remove(tx, keys.location);
sd.messageIdIndex.remove(tx, keys.messageId);
+ metadata.producerSequenceIdTracker.rollback(new MessageId(keys.messageId));
undoCounter++;
// TODO: do we need to modify the ack positions for the pub sub case?
}
@@ -588,7 +650,7 @@ public class MessageDatabase extends Ser
while (nextRecoveryPosition != null) {
lastRecoveryPosition = nextRecoveryPosition;
metadata.lastUpdate = lastRecoveryPosition;
- JournalCommand message = load(lastRecoveryPosition);
+ JournalCommand<?> message = load(lastRecoveryPosition);
process(message, lastRecoveryPosition);
nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
}
@@ -601,8 +663,8 @@ public class MessageDatabase extends Ser
return metadata.lastUpdate;
}
- private Location getRecoveryPosition() throws IOException {
-
+ private Location getRecoveryPosition() throws IOException {
+
// If we need to recover the transactions..
if (metadata.firstInProgressTransactionLocation != null) {
return metadata.firstInProgressTransactionLocation;
@@ -613,7 +675,7 @@ public class MessageDatabase extends Ser
// Start replay at the record after the last one recorded in the index file.
return journal.getNextLocation(metadata.lastUpdate);
}
-
+
// This loads the first position.
return journal.getNextLocation(null);
}
@@ -658,7 +720,7 @@ public class MessageDatabase extends Ser
// /////////////////////////////////////////////////////////////////
// Methods call by the broker to update and query the store.
// /////////////////////////////////////////////////////////////////
- public Location store(JournalCommand data) throws IOException {
+ public Location store(JournalCommand<?> data) throws IOException {
return store(data, false, null,null);
}
@@ -669,7 +731,7 @@ public class MessageDatabase extends Ser
* during a recovery process.
* @param done
*/
- public Location store(JournalCommand data, boolean sync, Runnable before,Runnable after) throws IOException {
+ public Location store(JournalCommand<?> data, boolean sync, Runnable before,Runnable after) throws IOException {
if (before != null) {
before.run();
}
@@ -716,7 +778,7 @@ public class MessageDatabase extends Ser
* @return
* @throws IOException
*/
- public JournalCommand load(Location location) throws IOException {
+ public JournalCommand<?> load(Location location) throws IOException {
ByteSequence data = journal.read(location);
DataByteArrayInputStream is = new DataByteArrayInputStream(data);
byte readByte = is.readByte();
@@ -724,10 +786,30 @@ public class MessageDatabase extends Ser
if( type == null ) {
throw new IOException("Could not load journal record. Invalid location: "+location);
}
- JournalCommand message = (JournalCommand)type.createMessage();
+ JournalCommand<?> message = (JournalCommand<?>)type.createMessage();
message.mergeFramed(is);
return message;
}
+
+ /**
+ * do minimal recovery till we reach the last inDoubtLocation
+ * @param data
+ * @param location
+ * @param inDoubtlocation
+ * @throws IOException
+ */
+ void process(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException {
+ if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) {
+ process(data, location);
+ } else {
+ // just recover producer audit
+ data.visit(new Visitor() {
+ public void visit(KahaAddMessageCommand command) throws IOException {
+ metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
+ }
+ });
+ }
+ }
// /////////////////////////////////////////////////////////////////
// Journaled record processing methods. Once the record is journaled,
@@ -735,7 +817,7 @@ public class MessageDatabase extends Ser
// from the recovery method too so they need to be idempotent
// /////////////////////////////////////////////////////////////////
- void process(JournalCommand data, final Location location) throws IOException {
+ void process(JournalCommand<?> data, final Location location) throws IOException {
data.visit(new Visitor() {
@Override
public void visit(KahaAddMessageCommand command) throws IOException {
@@ -911,7 +993,7 @@ public class MessageDatabase extends Ser
if( previous == null ) {
previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
if( previous == null ) {
- sd.orderIndex.put(tx, id, new MessageKeys(command.getMessageId(), location));
+ sd.orderIndex.put(tx, id, new MessageKeys(command.getMessageId(), location));
} else {
// If the message ID as indexed, then the broker asked us to store a DUP
// message. Bad BOY! Don't do it, and log a warning.
@@ -927,7 +1009,8 @@ public class MessageDatabase extends Ser
// TODO: consider just rolling back the tx.
sd.locationIndex.put(tx, location, previous);
}
-
+ // record this id in any event, initial send or recovery
+ metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
}
void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
@@ -1025,6 +1108,7 @@ public class MessageDatabase extends Ser
LOG.debug("Checkpoint started.");
metadata.state = OPEN_STATE;
+ metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit();
metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
tx.store(metadata.page, metadataMarshaller, true);
pageFile.flush();
@@ -1111,6 +1195,15 @@ public class MessageDatabase extends Ser
LOG.debug("Checkpoint done.");
}
+ private Location checkpointProducerAudit() throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oout = new ObjectOutputStream(baos);
+ oout.writeObject(metadata.producerSequenceIdTracker);
+ oout.flush();
+ oout.close();
+ return store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())));
+ }
+
public HashSet<Integer> getJournalFilesBeingReplicated() {
return journalFilesBeingReplicated;
}
@@ -1580,6 +1673,22 @@ public class MessageDatabase extends Ser
return journalMaxFileLength;
}
+ public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
+ this.metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack);
+ }
+
+ public int getMaxFailoverProducersToTrack() {
+ return this.metadata.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack();
+ }
+
+ public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
+ this.metadata.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth);
+ }
+
+ public int getFailoverProducersAuditDepth() {
+ return this.metadata.producerSequenceIdTracker.getAuditDepth();
+ }
+
public PageFile getPageFile() {
if (pageFile == null) {
pageFile = createPageFile();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java Thu Jul 8 14:23:21 2010
@@ -33,6 +33,7 @@ import org.apache.activemq.command.Activ
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
@@ -569,5 +570,9 @@ public class TempKahaDBStore extends Tem
throw new IllegalArgumentException("Not in the valid destination format");
}
}
+
+ public long getLastProducerSequenceId(ProducerId id) {
+ return -1;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/Visitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/Visitor.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/Visitor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/Visitor.java Thu Jul 8 14:23:21 2010
@@ -21,6 +21,7 @@ import java.io.IOException;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
+import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
@@ -52,5 +53,8 @@ public class Visitor {
public void visit(KahaSubscriptionCommand kahaUpdateSubscriptionCommand) throws IOException {
}
+
+ public void visit(KahaProducerAuditCommand kahaProducerAuditCommand) throws IOException {
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java Thu Jul 8 14:23:21 2010
@@ -27,6 +27,7 @@ import org.apache.activemq.broker.Connec
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ProducerId;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.ProxyMessageStore;
@@ -201,4 +202,9 @@ public class MemoryPersistenceAdapter im
createTransactionStore();
}
}
+
+ public long getLastProducerSequenceId(ProducerId id) {
+ // memory map does duplicate suppression
+ return -1;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Thu Jul 8 14:23:21 2010
@@ -727,7 +727,7 @@ public class FailoverTransport implement
for (Iterator<Command> iter2 = tmpMap.values().iterator(); iter2.hasNext();) {
Command command = iter2.next();
if (LOG.isTraceEnabled()) {
- LOG.trace("restore, replay: " + command);
+ LOG.trace("restore requestMap, replay: " + command);
}
t.oneway(command);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArray.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArray.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArray.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArray.java Thu Jul 8 14:23:21 2010
@@ -19,6 +19,7 @@ package org.apache.activemq.util;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.io.Serializable;
/**
* Simple BitArray to enable setting multiple boolean values efficently Used
@@ -27,7 +28,10 @@ import java.io.IOException;
*
* @version $Revision: 1.1.1.1 $
*/
-public class BitArray {
+public class BitArray implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
static final int LONG_SIZE = 64;
static final int INT_SIZE = 32;
static final int SHORT_SIZE = 16;
@@ -113,6 +117,14 @@ public class BitArray {
this.bits = bits;
}
+ private void writeObject(java.io.ObjectOutputStream out) throws IOException {
+ writeToStream(out);
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
+ readFromStream(in);
+ }
+
/**
* write the bits to an output stream
*
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java Thu Jul 8 14:23:21 2010
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.util;
+import java.io.Serializable;
import java.util.LinkedList;
/**
@@ -23,8 +24,9 @@ import java.util.LinkedList;
*
* @version $Revision: 1.1.1.1 $
*/
-public class BitArrayBin {
+public class BitArrayBin implements Serializable {
+ private static final long serialVersionUID = 1L;
private LinkedList<BitArray> list;
private int maxNumberOfArrays;
private int firstIndex = -1;
@@ -162,4 +164,22 @@ public class BitArrayBin {
}
return answer;
}
+
+ public long getLastSetIndex() {
+ long result = -1;
+
+ if (firstIndex >=0) {
+ result = firstIndex;
+ BitArray last = null;
+ for (int lastBitArrayIndex = maxNumberOfArrays -1; lastBitArrayIndex >= 0; lastBitArrayIndex--) {
+ last = list.get(lastBitArrayIndex);
+ if (last != null) {
+ result += last.length() -1;
+ result += lastBitArrayIndex * BitArray.LONG_SIZE;
+ break;
+ }
+ }
+ }
+ return result;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/proto/journal-data.proto
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/proto/journal-data.proto?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/proto/journal-data.proto (original)
+++ activemq/trunk/activemq-core/src/main/proto/journal-data.proto Thu Jul 8 14:23:21 2010
@@ -29,6 +29,7 @@ enum KahaEntryType {
KAHA_ROLLBACK_COMMAND = 5;
KAHA_REMOVE_DESTINATION_COMMAND = 6;
KAHA_SUBSCRIPTION_COMMAND = 7;
+ KAHA_PRODUCER_AUDIT_COMMAND = 8;
}
message KahaTraceCommand {
@@ -109,6 +110,18 @@ message KahaSubscriptionCommand {
optional bytes subscriptionInfo = 4;
}
+message KahaProducerAuditCommand {
+ // We make use of the wonky comment style bellow because the following options
+ // are not valid for protoc, but they are valid for the ActiveMQ proto compiler.
+ // In the ActiveMQ proto compiler, comments terminate with the pipe character: |
+
+ //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaProducerAuditCommand>";
+ //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
+ //| option java_type_method = "KahaEntryType";
+
+ required bytes audit = 1;
+}
+
message KahaDestination {
enum DestinationType {
QUEUE = 0;
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/PerDestinationStoreLimitTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/PerDestinationStoreLimitTest.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/PerDestinationStoreLimitTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/PerDestinationStoreLimitTest.java Thu Jul 8 14:23:21 2010
@@ -152,6 +152,7 @@ public class PerDestinationStoreLimitTes
Thread.sleep(1000);
// the producer is blocked once the done flag stays true
if (done.get()) {
+ LOG.info("Blocked....");
break;
}
done.set(true);
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v6/ConnectionInfoTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v6/ConnectionInfoTest.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v6/ConnectionInfoTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v6/ConnectionInfoTest.java Thu Jul 8 14:23:21 2010
@@ -66,5 +66,6 @@ public class ConnectionInfoTest extends
info.setManageable(false);
info.setClientMaster(true);
info.setFaultTolerant(false);
+ info.setFailoverReconnect(true);
}
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StoreOrderTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StoreOrderTest.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StoreOrderTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StoreOrderTest.java Thu Jul 8 14:23:21 2010
@@ -17,6 +17,7 @@
package org.apache.activemq.store;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -120,6 +121,23 @@ public abstract class StoreOrderTest {
}
@Test
+ public void testCompositeSendReceiveAfterRestart() throws Exception {
+ destination = new ActiveMQQueue("StoreOrderTest,SecondStoreOrderTest");
+ enqueueOneMessage();
+
+ LOG.info("restart broker");
+ stopBroker();
+ broker = createRestartedBroker();
+ dumpMessages();
+ initConnection();
+ destination = new ActiveMQQueue("StoreOrderTest");
+ assertNotNull("got one message from first dest", receiveOne());
+ dumpMessages();
+ destination = new ActiveMQQueue("SecondStoreOrderTest");
+ assertNotNull("got one message from second dest", receiveOne());
+ }
+
+ @Test
public void validateUnorderedTxCommit() throws Exception {
Executor executor = Executors.newCachedThreadPool();
@@ -247,6 +265,7 @@ public abstract class StoreOrderTest {
PolicyEntry defaultEntry = new PolicyEntry();
defaultEntry.setMemoryLimit(1024*3);
defaultEntry.setCursorMemoryHighWaterMark(68);
+ defaultEntry.setExpireMessagesPeriod(0);
map.setDefaultEntry(defaultEntry);
brokerService.setDestinationPolicy(map);
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java?rev=961783&r1=961782&r2=961783&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java Thu Jul 8 14:23:21 2010
@@ -28,6 +28,11 @@ public class JDBCPersistenceAdapterTest
protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws IOException {
JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
+
+ // explicitly enable audit as it is now off by default
+ // due to org.apache.activemq.broker.ProducerBrokerExchange.canDispatch(Message)
+ jdbc.setEnableAudit(true);
+
brokerService.setSchedulerSupport(false);
brokerService.setPersistenceAdapter(jdbc);
jdbc.setBrokerService(brokerService);
@@ -56,6 +61,5 @@ public class JDBCPersistenceAdapterTest
if (!failed) {
fail("Should have failed with audit turned off");
}
- }
-
+ }
}