You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/05/18 17:47:16 UTC
[2/4] activemq-artemis git commit: ARTEMIS-524 Paging could lose data
eventually after crashes
ARTEMIS-524 Paging could lose data eventually after crashes
https://issues.apache.org/jira/browse/ARTEMIS-524
I am keeping all the debug ad tracing I added during the debug of this issue,
for that reason this commit may look longer than expected
The fix will be highlited by the tests added on org.apache.activemq.artemis.tests.integration.client.PagingTest
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3e2adf12
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3e2adf12
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3e2adf12
Branch: refs/heads/master
Commit: 3e2adf123b96c3dfade3d1584ea0ddf65a876941
Parents: ec52693
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue May 17 15:06:02 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue May 17 20:28:40 2016 -0400
----------------------------------------------------------------------
.../core/client/impl/ClientConsumerImpl.java | 90 ++++++-
.../core/impl/ActiveMQConsumerContext.java | 7 +
.../core/protocol/core/impl/ChannelImpl.java | 3 +
.../artemis/utils/SoftValueHashMap.java | 1 +
.../jms/client/ActiveMQMessageConsumer.java | 2 +
.../artemis/jms/client/JMSExceptionHelper.java | 10 +
.../jms/client/JMSMessageListenerWrapper.java | 3 +
.../artemis/core/paging/PagingStoreFactory.java | 5 +
.../core/paging/cursor/NonExistentPage.java | 43 ++++
.../core/paging/cursor/PageCursorProvider.java | 3 +
.../core/paging/cursor/PageSubscription.java | 2 +-
.../paging/cursor/impl/LivePageCacheImpl.java | 6 +-
.../cursor/impl/PageCursorProviderImpl.java | 91 ++++---
.../cursor/impl/PageSubscriptionImpl.java | 53 +++-
.../activemq/artemis/core/paging/impl/Page.java | 4 +-
.../paging/impl/PageTransactionInfoImpl.java | 20 ++
.../core/paging/impl/PagingStoreFactoryNIO.java | 7 +
.../core/paging/impl/PagingStoreImpl.java | 11 +-
.../impl/journal/JournalStorageManager.java | 6 +-
.../impl/journal/LargeServerMessageImpl.java | 15 +-
.../postoffice/impl/DuplicateIDCacheImpl.java | 2 +-
.../server/impl/RemotingServiceImpl.java | 2 +
.../core/server/impl/ActiveMQServerImpl.java | 8 +-
.../artemis/core/server/impl/RefsOperation.java | 5 +
.../core/server/impl/ServerConsumerImpl.java | 8 +
.../tests/integration/client/PagingTest.java | 254 +++++++++++++++++++
.../core/paging/impl/PagingStoreImplTest.java | 14 +-
27 files changed, 605 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
index 7b72188..57bb869 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
@@ -171,6 +171,10 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
this.contextClassLoader = contextClassLoader;
this.flowControlExecutor = flowControlExecutor;
+
+ if (logger.isTraceEnabled()) {
+ logger.trace(this + ":: being created at", new Exception("trace"));
+ }
}
// ClientConsumer implementation
@@ -182,9 +186,16 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
}
private ClientMessage receive(final long timeout, final boolean forcingDelivery) throws ActiveMQException {
+ if (logger.isTraceEnabled()) {
+ logger.trace(this + "::receive(" + timeout + ", " + forcingDelivery + ")");
+ }
+
checkClosed();
if (largeMessageReceived != null) {
+ if (logger.isTraceEnabled()) {
+ logger.trace(this + "::receive(" + timeout + ", " + forcingDelivery + ") -> discard LargeMessage body for " + largeMessageReceived);
+ }
// Check if there are pending packets to be received
largeMessageReceived.discardBody();
largeMessageReceived = null;
@@ -195,10 +206,16 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
}
if (handler != null) {
+ if (logger.isTraceEnabled()) {
+ logger.trace(this + "::receive(" + timeout + ", " + forcingDelivery + ") -> throwing messageHandlerSet");
+ }
throw ActiveMQClientMessageBundle.BUNDLE.messageHandlerSet();
}
if (clientWindowSize == 0) {
+ if (logger.isTraceEnabled()) {
+ logger.trace(this + "::receive(" + timeout + ", " + forcingDelivery + ") -> start slowConsumer");
+ }
startSlowConsumer();
}
@@ -235,6 +252,10 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
}
}
+ if ( m != null) {
+ session.workDone();
+ }
+
try {
wait(toWait);
}
@@ -256,6 +277,10 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
if (failedOver) {
if (m == null) {
+ if (logger.isTraceEnabled()) {
+ logger.trace(this + "::receive(" + timeout + ", " + forcingDelivery + ") -> m == null and failover");
+ }
+
// if failed over and the buffer is null, we reset the state and try it again
failedOver = false;
deliveryForced = false;
@@ -263,13 +288,16 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
continue;
}
else {
+ if (logger.isTraceEnabled()) {
+ logger.trace(this + "::receive(" + timeout + ", " + forcingDelivery + ") -> failedOver, but m != null, being " + m);
+ }
failedOver = false;
}
}
if (callForceDelivery) {
if (logger.isTraceEnabled()) {
- logger.trace("Forcing delivery");
+ logger.trace(this + "::Forcing delivery");
}
// JBPAPP-6030 - Calling forceDelivery outside of the lock to avoid distributed dead locks
sessionContext.forceDelivery(this, forceDeliveryCount++);
@@ -291,14 +319,14 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
resetIfSlowConsumer();
if (logger.isTraceEnabled()) {
- logger.trace("There was nothing on the queue, leaving it now:: returning null");
+ logger.trace(this + "::There was nothing on the queue, leaving it now:: returning null");
}
return null;
}
else {
if (logger.isTraceEnabled()) {
- logger.trace("Ignored force delivery answer as it belonged to another call");
+ logger.trace(this + "::Ignored force delivery answer as it belonged to another call");
}
// Ignore the message
continue;
@@ -331,14 +359,14 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
}
if (logger.isTraceEnabled()) {
- logger.trace("Returning " + m);
+ logger.trace(this + "::Returning " + m);
}
return m;
}
else {
if (logger.isTraceEnabled()) {
- logger.trace("Returning null");
+ logger.trace(this + "::Returning null");
}
resetIfSlowConsumer();
return null;
@@ -352,12 +380,23 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
@Override
public ClientMessage receive(final long timeout) throws ActiveMQException {
+
+ if (logger.isTraceEnabled()) {
+ logger.trace(this + ":: receive(" + timeout + ")");
+ }
ClientMessage msg = receive(timeout, false);
if (msg == null && !closed) {
+ if (logger.isTraceEnabled()) {
+ logger.trace(this + ":: receive(" + timeout + ") -> null, trying again with receive(0)");
+ }
msg = receive(0, true);
}
+ if (logger.isTraceEnabled()) {
+ logger.trace(this + ":: returning " + msg);
+ }
+
return msg;
}
@@ -471,6 +510,9 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
@Override
public void clearAtFailover() {
+ if (logger.isTraceEnabled()) {
+ logger.trace(this + "::ClearAtFailover");
+ }
clearBuffer();
// failover will issue a start later
@@ -647,7 +689,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
}
if (currentLargeMessageController == null) {
if (logger.isTraceEnabled()) {
- logger.trace("Sending back credits for largeController = null " + flowControlSize);
+ logger.trace(this + "::Sending back credits for largeController = null " + flowControlSize);
}
flowControl(flowControlSize, false);
}
@@ -722,12 +764,23 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
individualAcknowledge(message);
}
else {
+
ackBytes += message.getEncodeSize();
+ if (logger.isTraceEnabled()) {
+ logger.trace(this + "::acknowledge ackBytes=" + ackBytes + " and ackBatchSize=" + ackBatchSize + ", encodeSize=" + message.getEncodeSize());
+ }
+
if (ackBytes >= ackBatchSize) {
+ if (logger.isTraceEnabled()) {
+ logger.trace(this + ":: acknowledge acking " + cmi);
+ }
doAck(cmi);
}
else {
+ if (logger.isTraceEnabled()) {
+ logger.trace(this + ":: acknowledge setting lastAckedMessage = " + cmi);
+ }
lastAckedMessage = cmi;
}
}
@@ -745,6 +798,9 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
@Override
public void flushAcks() throws ActiveMQException {
if (lastAckedMessage != null) {
+ if (logger.isTraceEnabled()) {
+ logger.trace(this + "::FlushACK acking lastMessage::" + lastAckedMessage);
+ }
doAck(lastAckedMessage);
}
}
@@ -763,7 +819,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
if (creditsToSend >= clientWindowSize) {
if (clientWindowSize == 0 && discountSlowConsumer) {
if (logger.isTraceEnabled()) {
- logger.trace("FlowControl::Sending " + creditsToSend + " -1, for slow consumer");
+ logger.trace(this + "::FlowControl::Sending " + creditsToSend + " -1, for slow consumer");
}
// sending the credits - 1 initially send to fire the slow consumer, or the slow consumer would be
@@ -810,7 +866,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
*/
private void startSlowConsumer() {
if (logger.isTraceEnabled()) {
- logger.trace("Sending 1 credit to start delivering of one message to slow consumer");
+ logger.trace(this + "::Sending 1 credit to start delivering of one message to slow consumer");
}
sendCredits(1);
try {
@@ -855,7 +911,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
private void queueExecutor() {
if (logger.isTraceEnabled()) {
- logger.trace("Adding Runner on Executor for delivery");
+ logger.trace(this + "::Adding Runner on Executor for delivery");
}
sessionExecutor.execute(runner);
@@ -946,7 +1002,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
if (!expired) {
if (logger.isTraceEnabled()) {
- logger.trace("Calling handler.onMessage");
+ logger.trace(this + "::Calling handler.onMessage");
}
final ClassLoader originalLoader = AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
@Override
@@ -981,7 +1037,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
}
if (logger.isTraceEnabled()) {
- logger.trace("Handler.onMessage done");
+ logger.trace(this + "::Handler.onMessage done");
}
if (message.isLargeMessage()) {
@@ -1065,9 +1121,21 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
lastAckedMessage = null;
+ if (logger.isTraceEnabled()) {
+ logger.trace(this + "::Acking message " + message);
+ }
+
session.acknowledge(this, message);
}
+ @Override
+ public String toString() {
+ return super.toString() + "{" +
+ "consumerContext=" + consumerContext +
+ ", queueName=" + queueName +
+ '}';
+ }
+
// Inner classes
// --------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQConsumerContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQConsumerContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQConsumerContext.java
index 08abb91..65540ee 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQConsumerContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQConsumerContext.java
@@ -46,6 +46,13 @@ public class ActiveMQConsumerContext extends ConsumerContext {
}
@Override
+ public String toString() {
+ return "ActiveMQConsumerContext{" +
+ "id=" + id +
+ '}';
+ }
+
+ @Override
public int hashCode() {
return (int) (id ^ (id >>> 32));
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
index 957a3a9..b4ac75d 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
@@ -363,6 +363,9 @@ public final class ChannelImpl implements Channel {
checkReconnectID(reconnectID);
+ if (logger.isTraceEnabled()) {
+ logger.trace("Sending blocking " + packet);
+ }
connection.getTransportConnection().write(buffer, false, false);
long toWait = connection.getBlockingCallTimeout();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/SoftValueHashMap.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/SoftValueHashMap.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/SoftValueHashMap.java
index 6428c8a..b499910 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/SoftValueHashMap.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/SoftValueHashMap.java
@@ -316,6 +316,7 @@ public class SoftValueHashMap<K, V extends SoftValueHashMap.ValueCache> implemen
private void processQueue() {
AggregatedSoftReference ref = null;
while ((ref = (AggregatedSoftReference) this.refQueue.poll()) != null) {
+ logger.tracef("Removing reference through processQueue:: %s", ref.get());
mapDelegate.remove(ref.key);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
index 04e4f41..8929fa5 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
@@ -27,12 +27,14 @@ import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
+import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
/**
* ActiveMQ Artemis implementation of a JMS MessageConsumer.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSExceptionHelper.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSExceptionHelper.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSExceptionHelper.java
index 666fa9d..1a8456f 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSExceptionHelper.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSExceptionHelper.java
@@ -22,9 +22,19 @@ import javax.jms.JMSException;
import javax.jms.JMSSecurityException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
public final class JMSExceptionHelper {
+ public static JMSException convertFromActiveMQException(final ActiveMQInterruptedException me) {
+ JMSException je = new javax.jms.IllegalStateException(me.getMessage());
+
+ je.setStackTrace(me.getStackTrace());
+
+ je.initCause(me);
+ return je;
+ }
+
public static JMSException convertFromActiveMQException(final ActiveMQException me) {
JMSException je;
switch (me.getType()) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java
index ab62dbc..af5b158 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java
@@ -24,6 +24,7 @@ import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
+import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
public class JMSMessageListenerWrapper implements MessageHandler {
@@ -83,6 +84,7 @@ public class JMSMessageListenerWrapper implements MessageHandler {
message.acknowledge();
}
catch (ActiveMQException e) {
+ ((ClientSessionInternal)session.getCoreSession()).markRollbackOnly();
ActiveMQJMSClientLogger.LOGGER.errorProcessingMessage(e);
}
}
@@ -122,6 +124,7 @@ public class JMSMessageListenerWrapper implements MessageHandler {
}
}
catch (ActiveMQException e) {
+ ((ClientSessionInternal)session.getCoreSession()).markRollbackOnly();
ActiveMQJMSClientLogger.LOGGER.errorProcessingMessage(e);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java
index 8c2d11a..7c52c63 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java
@@ -17,9 +17,12 @@
package org.apache.activemq.artemis.core.paging;
import java.util.List;
+import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@@ -30,6 +33,8 @@ public interface PagingStoreFactory {
PagingStore newStore(SimpleString address, AddressSettings addressSettings);
+ PageCursorProvider newCursorProvider(PagingStore store, StorageManager storageManager, AddressSettings addressSettings, Executor executor);
+
void stop() throws InterruptedException;
void setPagingManager(PagingManager manager);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/NonExistentPage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/NonExistentPage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/NonExistentPage.java
new file mode 100644
index 0000000..73a22ce
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/NonExistentPage.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor;
+
+/** This is an internal exception.
+ * In certain cases AfterCommit could try to decrease the reference counting on large messages.
+ * But if the whole page is cleaned an exception could happen, which is ok on that path, and we need to identify it. */
+public class NonExistentPage extends RuntimeException {
+
+ public NonExistentPage() {
+ }
+
+ public NonExistentPage(String message) {
+ super(message);
+ }
+
+ public NonExistentPage(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public NonExistentPage(Throwable cause) {
+ super(cause);
+ }
+
+ public NonExistentPage(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
index 951b83c..b2a6aff 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
@@ -24,6 +24,9 @@ import org.apache.activemq.artemis.core.paging.PagedMessage;
*/
public interface PageCursorProvider {
+ /** Used on tests, to simulate a scenario where the VM cleared space */
+ void clearCache();
+
PageCache getPageCache(long pageNr);
PagedReference newReference(final PagePosition pos, final PagedMessage msg, PageSubscription sub);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
index df2ccc3..89c6d44 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
@@ -93,7 +93,7 @@ public interface PageSubscription {
*/
void reloadACK(PagePosition position);
- void reloadPageCompletion(PagePosition position);
+ void reloadPageCompletion(PagePosition position) throws Exception;
void reloadPageInfo(long pageNr);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
index 29d990a..b964b56 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
@@ -23,13 +23,16 @@ import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.cursor.LivePageCache;
import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
+import org.jboss.logging.Logger;
/**
* This is the same as PageCache, however this is for the page that's being currently written.
*/
public class LivePageCacheImpl implements LivePageCache {
- private final List<PagedMessage> messages = new LinkedList<>();
+ private static final Logger logger = Logger.getLogger(LivePageCacheImpl.class);
+
+ private final List<PagedMessage> messages = new LinkedList<PagedMessage>();
private final Page page;
@@ -82,6 +85,7 @@ public class LivePageCacheImpl implements LivePageCache {
@Override
public synchronized void close() {
+ logger.tracef("Closing %s", this);
this.isLive = false;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
index 9862a1f..4f3a6a5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.NonExistentPage;
import org.apache.activemq.artemis.core.paging.cursor.PageCache;
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
@@ -58,20 +59,20 @@ public class PageCursorProviderImpl implements PageCursorProvider {
/**
* As an optimization, avoid subsequent schedules as they are unnecessary
*/
- private final AtomicInteger scheduledCleanup = new AtomicInteger(0);
+ protected final AtomicInteger scheduledCleanup = new AtomicInteger(0);
- private volatile boolean cleanupEnabled = true;
+ protected volatile boolean cleanupEnabled = true;
- private final PagingStore pagingStore;
+ protected final PagingStore pagingStore;
- private final StorageManager storageManager;
+ protected final StorageManager storageManager;
// This is the same executor used at the PageStoreImpl. One Executor per pageStore
private final Executor executor;
private final SoftValueHashMap<Long, PageCache> softCache;
- private final ConcurrentMap<Long, PageSubscription> activeCursors = new ConcurrentHashMap<>();
+ private final ConcurrentMap<Long, PageSubscription> activeCursors = new ConcurrentHashMap<Long, PageSubscription>();
// Static --------------------------------------------------------
@@ -115,7 +116,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
if (cache == null || pos.getMessageNr() >= cache.getNumberOfMessages()) {
// sanity check, this should never happen unless there's a bug
- throw new IllegalStateException("Invalid messageNumber passed = " + pos + " on " + cache);
+ throw new NonExistentPage("Invalid messageNumber passed = " + pos + " on " + cache);
}
return cache.getMessage(pos.getMessageNr());
@@ -146,9 +147,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
cache = createPageCache(pageId);
// anyone reading from this cache will have to wait reading to finish first
// we also want only one thread reading this cache
- if (logger.isTraceEnabled()) {
- logger.trace("adding " + pageId + " into cursor = " + this.pagingStore.getAddress());
- }
+ logger.tracef("adding pageCache pageNr=%d into cursor = %s", pageId, this.pagingStore.getAddress());
readPage((int) pageId, cache);
softCache.put(pageId, cache);
}
@@ -186,6 +185,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
@Override
public void addPageCache(PageCache cache) {
+ logger.tracef("Add page cache %s", cache);
synchronized (softCache) {
softCache.put(cache.getPageId(), cache);
}
@@ -203,6 +203,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
}
}
+ @Override
public void clearCache() {
synchronized (softCache) {
softCache.clear();
@@ -273,6 +274,9 @@ public class PageCursorProviderImpl implements PageCursorProvider {
@Override
public void scheduleCleanup() {
+ if (logger.isTraceEnabled()) {
+ logger.trace("scheduling cleanup", new Exception("trace"));
+ }
if (!cleanupEnabled || scheduledCleanup.intValue() > 2) {
// Scheduled cleanup was already scheduled before.. never mind!
// or we have cleanup disabled
@@ -286,7 +290,9 @@ public class PageCursorProviderImpl implements PageCursorProvider {
public void run() {
storageManager.setContext(storageManager.newSingleThreadContext());
try {
- cleanup();
+ if (cleanupEnabled) {
+ cleanup();
+ }
}
finally {
storageManager.clearContext();
@@ -336,7 +342,10 @@ public class PageCursorProviderImpl implements PageCursorProvider {
@Override
public void cleanup() {
- ArrayList<Page> depagedPages = new ArrayList<>();
+
+ logger.tracef("performing page cleanup %s", this);
+
+ ArrayList<Page> depagedPages = new ArrayList<Page>();
while (true) {
if (pagingStore.lock(100)) {
@@ -346,6 +355,8 @@ public class PageCursorProviderImpl implements PageCursorProvider {
return;
}
+ logger.tracef("%s locked", this);
+
synchronized (this) {
try {
if (!pagingStore.isStarted()) {
@@ -356,14 +367,12 @@ public class PageCursorProviderImpl implements PageCursorProvider {
return;
}
- if (logger.isDebugEnabled()) {
- logger.debug("Asserting cleanup for address " + this.pagingStore.getAddress());
- }
-
ArrayList<PageSubscription> cursorList = cloneSubscriptions();
long minPage = checkMinPage(cursorList);
+ logger.debugf("Asserting cleanup for address %s, firstPage=%d", pagingStore.getAddress(), minPage);
+
// if the current page is being written...
// on that case we need to move to verify it in a different way
if (minPage == pagingStore.getCurrentWritingPage() && pagingStore.getCurrentPage().getNumberOfMessages() > 0) {
@@ -376,18 +385,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
// All the pages on the cursor are complete.. so we will cleanup everything and store a bookmark
if (complete) {
- if (logger.isDebugEnabled()) {
- logger.debug("Address " + pagingStore.getAddress() +
- " is leaving page mode as all messages are consumed and acknowledged from the page store");
- }
-
- pagingStore.forceAnotherPage();
-
- Page currentPage = pagingStore.getCurrentPage();
-
- storeBookmark(cursorList, currentPage);
-
- pagingStore.stopPaging();
+ cleanupComplete(cursorList);
}
}
@@ -423,7 +421,30 @@ public class PageCursorProviderImpl implements PageCursorProvider {
pagingStore.unlock();
}
}
+ finishCleanup(depagedPages);
+
+
+ }
+
+ // Protected as a way to inject testing
+ protected void cleanupComplete(ArrayList<PageSubscription> cursorList) throws Exception {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Address " + pagingStore.getAddress() +
+ " is leaving page mode as all messages are consumed and acknowledged from the page store");
+ }
+
+ pagingStore.forceAnotherPage();
+
+ Page currentPage = pagingStore.getCurrentPage();
+ storeBookmark(cursorList, currentPage);
+
+ pagingStore.stopPaging();
+ }
+
+ // Protected as a way to inject testing
+ protected void finishCleanup(ArrayList<Page> depagedPages) {
+ logger.tracef("this(%s) finishing cleanup on %s", this, depagedPages);
try {
for (Page depagedPage : depagedPages) {
PageCache cache;
@@ -433,7 +454,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
}
if (logger.isTraceEnabled()) {
- logger.trace("Removing page " + depagedPage.getPageId() + " from page-cache");
+ logger.trace("Removing pageNr=" + depagedPage.getPageId() + " from page-cache");
}
if (cache == null) {
@@ -479,12 +500,15 @@ public class PageCursorProviderImpl implements PageCursorProvider {
}
private boolean checkPageCompletion(ArrayList<PageSubscription> cursorList, long minPage) {
+
+ logger.tracef("checkPageCompletion(%d)", minPage);
+
boolean complete = true;
for (PageSubscription cursor : cursorList) {
if (!cursor.isComplete(minPage)) {
if (logger.isDebugEnabled()) {
- logger.debug("Cursor " + cursor + " was considered incomplete at page " + minPage);
+ logger.debug("Cursor " + cursor + " was considered incomplete at pageNr=" + minPage);
}
complete = false;
@@ -492,7 +516,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
}
else {
if (logger.isDebugEnabled()) {
- logger.debug("Cursor " + cursor + "was considered **complete** at page " + minPage);
+ logger.debug("Cursor " + cursor + " was considered **complete** at pageNr=" + minPage);
}
}
}
@@ -545,6 +569,13 @@ public class PageCursorProviderImpl implements PageCursorProvider {
}
}
+ @Override
+ public String toString() {
+ return "PageCursorProviderImpl{" +
+ "pagingStore=" + pagingStore +
+ '}';
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
index 57b4efe..440f845 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
@@ -191,7 +191,11 @@ final class PageSubscriptionImpl implements PageSubscription {
* cursor/subscription.
*/
@Override
- public void reloadPageCompletion(PagePosition position) {
+ public void reloadPageCompletion(PagePosition position) throws Exception {
+ // if the current page is complete, we must move it out of the way
+ if (pageStore.getCurrentPage().getPageId() == position.getPageNr()) {
+ pageStore.forceAnotherPage();
+ }
PageCursorInfo info = new PageCursorInfo(position.getPageNr(), position.getMessageNr(), null);
info.setCompleteInfo(position);
synchronized (consumedPages) {
@@ -202,6 +206,9 @@ final class PageSubscriptionImpl implements PageSubscription {
@Override
public void scheduleCleanupCheck() {
if (autoCleanup) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Scheduling cleanup", new Exception("trace"));
+ }
if (scheduledCleanupCount.get() > 2) {
return;
}
@@ -212,7 +219,9 @@ final class PageSubscriptionImpl implements PageSubscription {
@Override
public void run() {
try {
- cleanupEntries(false);
+ if (autoCleanup) {
+ cleanupEntries(false);
+ }
}
catch (Exception e) {
ActiveMQServerLogger.LOGGER.problemCleaningCursorPages(e);
@@ -242,6 +251,9 @@ final class PageSubscriptionImpl implements PageSubscription {
if (completeDelete) {
counter.delete();
}
+ if (logger.isTraceEnabled()) {
+ logger.trace("cleanupEntries", new Exception("trace"));
+ }
Transaction tx = new TransactionImpl(store);
boolean persist = false;
@@ -564,17 +576,23 @@ final class PageSubscriptionImpl implements PageSubscription {
@Override
public boolean isComplete(long page) {
+ logger.tracef("%s isComplete %d", this, page);
synchronized (consumedPages) {
if (empty && consumedPages.isEmpty()) {
+ if (logger.isTraceEnabled()) {
+ logger.tracef("isComplete(%d)::Subscription %s has empty=%s, consumedPages.isEmpty=%s", (Object)page, this, consumedPages.isEmpty());
+ }
return true;
}
PageCursorInfo info = consumedPages.get(page);
if (info == null && empty) {
+ logger.tracef("isComplete(%d)::::Couldn't find info and it is empty", page);
return true;
}
else {
+ logger.tracef("isComplete(%d)::calling is %s", (Object)page, this, consumedPages.isEmpty());
return info != null && info.isDone();
}
}
@@ -731,18 +749,18 @@ final class PageSubscriptionImpl implements PageSubscription {
@Override
public void reloadPageInfo(long pageNr) {
- getPageInfo(pageNr, true);
+ getPageInfo(pageNr);
}
private PageCursorInfo getPageInfo(final PagePosition pos) {
- return getPageInfo(pos.getPageNr(), true);
+ return getPageInfo(pos.getPageNr());
}
- private PageCursorInfo getPageInfo(final long pageNr, boolean create) {
+ private PageCursorInfo getPageInfo(final long pageNr) {
synchronized (consumedPages) {
PageCursorInfo pageInfo = consumedPages.get(pageNr);
- if (create && pageInfo == null) {
+ if (pageInfo == null) {
PageCache cache = cursorProvider.getPageCache(pageNr);
if (cache == null) {
return null;
@@ -814,7 +832,11 @@ final class PageSubscriptionImpl implements PageSubscription {
tx.setContainsPersistent();
}
- getPageInfo(position).remove(position);
+ PageCursorInfo info = getPageInfo(position);
+
+ logger.tracef("InstallTXCallback looking up pagePosition %s, result=%s", position, info);
+
+ info.remove(position);
PageCursorTX cursorTX = (PageCursorTX) tx.getProperty(TransactionPropertyIndexes.PAGE_CURSOR_POSITIONS);
@@ -897,16 +919,17 @@ final class PageSubscriptionImpl implements PageSubscription {
@Override
public String toString() {
try {
- return "PageCursorInfo::PageID=" + pageId +
+ return "PageCursorInfo::pageNr=" + pageId +
" numberOfMessage = " +
numberOfMessages +
", confirmed = " +
confirmed +
", isDone=" +
- this.isDone();
+ this.isDone() +
+ " wasLive = " + wasLive;
}
catch (Exception e) {
- return "PageCursorInfo::PageID=" + pageId +
+ return "PageCursorInfo::pageNr=" + pageId +
" numberOfMessage = " +
numberOfMessages +
", confirmed = " +
@@ -917,6 +940,7 @@ final class PageSubscriptionImpl implements PageSubscription {
}
public PageCursorInfo(final long pageId, final int numberOfMessages, final PageCache cache) {
+ logger.tracef("Created PageCursorInfo for pageNr=%d, numberOfMessages=%d, cache=%s", pageId, numberOfMessages, cache);
this.pageId = pageId;
this.numberOfMessages = numberOfMessages;
if (cache != null) {
@@ -932,6 +956,7 @@ final class PageSubscriptionImpl implements PageSubscription {
* @param completePage
*/
public void setCompleteInfo(final PagePosition completePage) {
+ logger.tracef("Setting up complete page %s on cursor %s on subscription %s", completePage, this, PageSubscriptionImpl.this);
this.completePage = completePage;
}
@@ -940,6 +965,10 @@ final class PageSubscriptionImpl implements PageSubscription {
}
public boolean isDone() {
+ if (logger.isTraceEnabled()) {
+ logger.trace(PageSubscriptionImpl.this + "::PageCursorInfo(" + pageId + ")::isDone checking with completePage!=null->" + (completePage != null) + " getNumberOfMessages=" + getNumberOfMessages() + ", confirmed=" + confirmed.get() + " and pendingTX=" + pendingTX.get());
+
+ }
return completePage != null || (getNumberOfMessages() == confirmed.get() && pendingTX.get() == 0);
}
@@ -983,7 +1012,7 @@ final class PageSubscriptionImpl implements PageSubscription {
" confirmed = " +
(confirmed.get() + 1) +
" pendingTX = " + pendingTX +
- ", page = " +
+ ", pageNr = " +
pageId + " posACK = " + posACK);
}
catch (Throwable ignored) {
@@ -1189,7 +1218,7 @@ final class PageSubscriptionImpl implements PageSubscription {
ignored = true;
}
- PageCursorInfo info = getPageInfo(message.getPosition().getPageNr(), false);
+ PageCursorInfo info = getPageInfo(message.getPosition().getPageNr());
if (info != null && (info.isRemoved(message.getPosition()) || info.getCompleteInfo() != null)) {
continue;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
index 83a6c53..0888416 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
@@ -251,7 +251,7 @@ public final class Page implements Comparable<Page> {
}
if (logger.isDebugEnabled()) {
- logger.debug("Deleting pageId=" + pageId + " on store " + storeName);
+ logger.debug("Deleting pageNr=" + pageId + " on store " + storeName);
}
if (messages != null) {
@@ -294,7 +294,7 @@ public final class Page implements Comparable<Page> {
@Override
public String toString() {
- return "Page::pageID=" + this.pageId + ", file=" + this.file;
+ return "Page::pageNr=" + this.pageId + ", file=" + this.file;
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java
index 55569b2..1502855 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java
@@ -34,12 +34,15 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
import org.apache.activemq.artemis.utils.DataConstants;
+import org.jboss.logging.Logger;
public final class PageTransactionInfoImpl implements PageTransactionInfo {
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
+ private static final Logger logger = Logger.getLogger(PageTransactionInfoImpl.class);
+
private long transactionID;
private volatile long recordID = -1;
@@ -239,19 +242,36 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo {
public synchronized boolean deliverAfterCommit(PageIterator iterator,
PageSubscription cursor,
PagePosition cursorPos) {
+
+ if (logger.isTraceEnabled()) {
+ logger.trace("deliver after commit on " + cursor + ", position=" + cursorPos);
+ }
+
if (committed && useRedelivery) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("commit & useRedelivery on " + cursor + ", position=" + cursorPos);
+ }
cursor.addPendingDelivery(cursorPos);
cursor.redeliver(iterator, cursorPos);
return true;
}
else if (committed) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("committed on " + cursor + ", position=" + cursorPos + ", ignoring position");
+ }
return false;
}
else if (rolledback) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("rolled back, position ignored on " + cursor + ", position=" + cursorPos);
+ }
cursor.positionIgnored(cursorPos);
return true;
}
else {
+ if (logger.isTraceEnabled()) {
+ logger.trace("deliverAftercommit/else, marking useRedelivery on " + cursor + ", position " + cursorPos);
+ }
useRedelivery = true;
if (lateDeliveries == null) {
lateDeliveries = new LinkedList<>();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
index 0f36a31..00da382 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
@@ -26,6 +26,7 @@ import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.SimpleString;
@@ -35,6 +36,8 @@ import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
+import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
+import org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
@@ -92,6 +95,10 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
}
@Override
+ public PageCursorProvider newCursorProvider(PagingStore store, StorageManager storageManager, AddressSettings addressSettings, Executor executor) {
+ return new PageCursorProviderImpl(store, storageManager, executor, addressSettings.getPageCacheMaxSize());
+ }
+
public synchronized PagingStore newStore(final SimpleString address, final AddressSettings settings) {
return new PagingStoreImpl(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, null, this, address, settings, executorFactory.getExecutor(), syncNonTransactional);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index a7baf84..8fec06c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -47,7 +47,6 @@ import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
import org.apache.activemq.artemis.core.paging.cursor.LivePageCache;
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
import org.apache.activemq.artemis.core.paging.cursor.impl.LivePageCacheImpl;
-import org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.replication.ReplicationManager;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
@@ -69,7 +68,7 @@ import org.jboss.logging.Logger;
*/
public class PagingStoreImpl implements PagingStore {
- private static final Logger logger = Logger.getLogger(Page.class);
+ private static final Logger logger = Logger.getLogger(PagingStoreImpl.class);
private final SimpleString address;
@@ -173,7 +172,7 @@ public class PagingStoreImpl implements PagingStore {
this.syncTimer = null;
}
- this.cursorProvider = new PageCursorProviderImpl(this, this.storageManager, executor, addressSettings.getPageCacheMaxSize());
+ this.cursorProvider = storeFactory.newCursorProvider(this, this.storageManager, addressSettings, executor);
}
@@ -831,7 +830,7 @@ public class PagingStoreImpl implements PagingStore {
if (logger.isTraceEnabled()) {
logger.trace("Paging message " + pagedMessage + " on pageStore " + this.getStoreName() +
- " pageId=" + currentPage.getPageId());
+ " pageNr=" + currentPage.getPageId());
}
return true;
@@ -1021,6 +1020,10 @@ public class PagingStoreImpl implements PagingStore {
int tmpCurrentPageId = currentPageId + 1;
+ if (logger.isTraceEnabled()) {
+ logger.trace("new pageNr=" + tmpCurrentPageId, new Exception("trace"));
+ }
+
if (currentPage != null) {
currentPage.close(true);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index acdf57b..1379308 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -543,13 +543,15 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
}
bindingsJournal = new ReplicatedJournal(((byte) 0), originalBindingsJournal, replicator);
messageJournal = new ReplicatedJournal((byte) 1, originalMessageJournal, replicator);
+
+ // We need to send the list while locking otherwise part of the body might get sent too soon
+ // it will send a list of IDs that we are allocating
+ replicator.sendLargeMessageIdListMessage(pendingLargeMessages);
}
finally {
storageManagerLock.writeLock().unlock();
}
- // it will send a list of IDs that we are allocating
- replicator.sendLargeMessageIdListMessage(pendingLargeMessages);
sendJournalFile(messageFiles, JournalContent.MESSAGES);
sendJournalFile(bindingsFiles, JournalContent.BINDINGS);
sendLargeMessageFiles(pendingLargeMessages);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
index be193eb..578db6b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
@@ -340,11 +340,22 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L
@Override
public String toString() {
- return "LargeServerMessage[messageID=" + messageID + ",priority=" + this.getPriority() +
- ",expiration=[" + (this.getExpiration() != 0 ? new java.util.Date(this.getExpiration()) : "null") + "]" +
+ return "LargeServerMessage[messageID=" + messageID + ",durable=" + isDurable() + ",userID=" + getUserID() + ",priority=" + this.getPriority() +
+ ", timestamp=" + toDate(getTimestamp()) + ",expiration=" + toDate(getExpiration()) +
", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]@" + System.identityHashCode(this);
}
+ private static String toDate(long timestamp) {
+ if (timestamp == 0) {
+ return "0";
+ }
+ else {
+ return new java.util.Date(timestamp).toString();
+ }
+
+ }
+
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java
index 29774d6..7f35638 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java
@@ -222,7 +222,7 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
}
else {
if (logger.isTraceEnabled()) {
- logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCache Adding duplicateID TX operation for " + describeID(duplID, recordID));
+ logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCache Adding duplicateID TX operation for " + describeID(duplID, recordID) + ", tx=" + tx);
}
// For a tx, it's important that the entry is not added to the cache until commit
// since if the client fails then resends them tx we don't want it to get rejected
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
index 9b3329a..3672fe2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
@@ -176,6 +176,8 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
return;
}
+ logger.tracef("Starting remoting service %s", this);
+
paused = false;
// The remoting service maintains it's own thread pool for handling remoting traffic
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 60235de..50437a1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -1671,9 +1671,13 @@ public class ActiveMQServerImpl implements ActiveMQServer {
this.queueFactory = factory;
}
- private PagingManager createPagingManager() {
+ protected PagingManager createPagingManager() {
- return new PagingManagerImpl(new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), configuration.getJournalBufferTimeout_NIO(), scheduledPool, executorFactory, configuration.isJournalSyncNonTransactional(), shutdownOnCriticalIO), addressSettingsRepository);
+ return new PagingManagerImpl(getPagingStoreFactory(), addressSettingsRepository);
+ }
+
+ protected PagingStoreFactoryNIO getPagingStoreFactory() {
+ return new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), configuration.getJournalBufferTimeout_NIO(), scheduledPool, executorFactory, configuration.isJournalSyncNonTransactional(), shutdownOnCriticalIO);
}
/**
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
index 3cdaa66..1f5c74c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
@@ -22,6 +22,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import org.apache.activemq.artemis.core.paging.cursor.NonExistentPage;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
@@ -165,6 +166,10 @@ public class RefsOperation extends TransactionOperationAbstract {
try {
refmsg.getMessage().decrementRefCount();
}
+ catch (NonExistentPage e) {
+ // This could happen on after commit, since the page could be deleted on file earlier by another thread
+ logger.debug(e);
+ }
catch (Exception e) {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 0224c7d..ae1f5b1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -854,7 +854,15 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
boolean startedTransaction = false;
+
+ if (logger.isTraceEnabled()) {
+ logger.trace("individualACK messageID=" + messageID);
+ }
+
if (tx == null) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("individualACK starting new TX");
+ }
startedTransaction = true;
tx = new TransactionImpl(storageManager);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingTest.java
index 4f64f42..f658fae 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingTest.java
@@ -21,6 +21,7 @@ import javax.transaction.xa.Xid;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
+import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
@@ -29,6 +30,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -58,7 +60,11 @@ import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
+import org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl;
+import org.apache.activemq.artemis.core.paging.cursor.impl.PagePositionImpl;
+import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
import org.apache.activemq.artemis.core.persistence.OperationContext;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.AckDescribe;
import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal;
import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal.ReferenceDescribe;
@@ -70,14 +76,18 @@ import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManagerImpl;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class PagingTest extends ActiveMQTestBase {
+ private static final Logger logger = Logger.getLogger(PagingTest.class);
+
private ServerLocator locator;
private ActiveMQServer server;
private ClientSessionFactory sf;
@@ -2914,6 +2924,250 @@ public class PagingTest extends ActiveMQTestBase {
session.close();
}
+
+ @Test
+ public void testRollbackOnSendThenSendMore() throws Exception {
+ clearDataRecreateServerDirs();
+
+ Configuration config = createDefaultInVMConfig();
+
+ server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
+
+ sf = createSessionFactory(locator);
+ ClientSession session = sf.createSession(null, null, false, false, true, false, 0);
+
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+ Queue queue = server.locateQueue(ADDRESS);
+
+ queue.getPageSubscription().getPagingStore().startPaging();
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message;
+
+ for (int i = 0; i < 20; i++) {
+ message = session.createMessage(true);
+
+ ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(new byte[100 * 4]);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ session.commit();
+ queue.getPageSubscription().getPagingStore().forceAnotherPage();
+
+ }
+
+ for (int i = 20; i < 24; i++) {
+ message = session.createMessage(true);
+
+ ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(new byte[100 * 4]);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ }
+
+ session.rollback();
+
+ ClientSession consumerSession = sf.createSession(false, false);
+
+
+ queue.getPageSubscription().getPagingStore().disableCleanup();
+
+ queue.getPageSubscription().getPagingStore().getCursorProvider().cleanup();
+
+ consumerSession.start();
+ ClientConsumer consumer = consumerSession.createConsumer(ADDRESS, SimpleString.toSimpleString("id > 0"));
+ for (int i = 0; i < 19; i++) {
+ ClientMessage messageRec = consumer.receive(5000);
+ System.err.println("msg::" + messageRec);
+ Assert.assertNotNull(messageRec);
+ messageRec.acknowledge();
+ consumerSession.commit();
+
+ // The only reason I'm calling cleanup directly is that it would be easy to debug in case of bugs
+ // if you see an issue with cleanup here, enjoy debugging this method
+ queue.getPageSubscription().getPagingStore().getCursorProvider().cleanup();
+ }
+ queue.getPageSubscription().getPagingStore().enableCleanup();
+
+ consumerSession.close();
+
+
+ session.close();
+ sf.close();
+
+
+ server.stop();
+ }
+
+ // The pages are complete, and this is simulating a scenario where the server crashed before deleting the pages.
+ @Test
+ public void testRestartWithComplete() throws Exception {
+ clearDataRecreateServerDirs();
+
+ Configuration config = createDefaultInVMConfig();
+
+ final AtomicBoolean mainCleanup = new AtomicBoolean(true);
+
+ class InterruptedCursorProvider extends PageCursorProviderImpl {
+
+ public InterruptedCursorProvider(PagingStore pagingStore,
+ StorageManager storageManager,
+ Executor executor,
+ int maxCacheSize) {
+ super(pagingStore, storageManager, executor, maxCacheSize);
+ }
+
+ @Override
+ public void cleanup() {
+ if (mainCleanup.get()) {
+ super.cleanup();
+ }
+ else {
+ try {
+ pagingStore.unlock();
+ }
+ catch (Throwable ignored) {
+ }
+ }
+ }
+ }
+
+ server = new ActiveMQServerImpl(config, ManagementFactory.getPlatformMBeanServer(), new ActiveMQSecurityManagerImpl()) {
+ @Override
+ protected PagingStoreFactoryNIO getPagingStoreFactory() {
+ return new PagingStoreFactoryNIO(this.getStorageManager(), this.getConfiguration().getPagingLocation(), this.getConfiguration().getJournalBufferTimeout_NIO(), this.getScheduledPool(), this.getExecutorFactory(), this.getConfiguration().isJournalSyncNonTransactional(), null) {
+ @Override
+ public PageCursorProvider newCursorProvider(PagingStore store, StorageManager storageManager, AddressSettings addressSettings, Executor executor) {
+ return new InterruptedCursorProvider(store, storageManager, executor, addressSettings.getPageCacheMaxSize());
+ }
+ };
+ }
+
+ };
+
+ addServer(server);
+
+ AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes( PagingTest.PAGE_SIZE).setMaxSizeBytes(PagingTest.PAGE_MAX).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+
+ server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+
+ server.start();
+
+ locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
+
+ sf = createSessionFactory(locator);
+ ClientSession session = sf.createSession(true, true, 0);
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+ Queue queue = server.locateQueue(ADDRESS);
+
+ queue.getPageSubscription().getPagingStore().startPaging();
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message;
+
+ for (int i = 0; i < 20; i++) {
+ message = session.createMessage(true);
+
+ ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(new byte[100 * 4]);
+
+ message.putIntProperty(new SimpleString("idi"), i);
+
+ producer.send(message);
+ session.commit();
+ if (i < 19) {
+ queue.getPageSubscription().getPagingStore().forceAnotherPage();
+ }
+
+ }
+
+ Assert.assertEquals(20, queue.getPageSubscription().getPagingStore().getCurrentWritingPage());
+
+ // This will force a scenario where the pages are cleaned up. When restarting we need to check if the current page is complete
+ // if it is complete we must move to another page avoiding races on cleanup
+ // which could happen during a crash / restart
+ long tx = server.getStorageManager().generateID();
+ for (int i = 1; i <= 20; i++) {
+ server.getStorageManager().storePageCompleteTransactional(tx, queue.getID(), new PagePositionImpl(i, 1));
+ }
+
+ server.getStorageManager().commit(tx);
+
+ session.close();
+ sf.close();
+
+ server.stop();
+ mainCleanup.set(false);
+
+ logger.trace("Server restart");
+
+ server.start();
+
+ queue = server.locateQueue(ADDRESS);
+
+ locator = createInVMNonHALocator();
+ sf = createSessionFactory(locator);
+ session = sf.createSession(null, null, false, false, true, false, 0);
+ producer = session.createProducer(PagingTest.ADDRESS);
+
+ for (int i = 0; i < 10; i++) {
+ message = session.createMessage(true);
+
+ ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(new byte[100 * 4]);
+
+ message.putIntProperty(new SimpleString("newid"), i);
+
+ producer.send(message);
+ session.commit();
+
+ if (i == 5) {
+ queue.getPageSubscription().getPagingStore().forceAnotherPage();
+ }
+ }
+
+
+ mainCleanup.set(true);
+
+ queue = server.locateQueue(ADDRESS);
+ queue.getPageSubscription().cleanupEntries(false);
+ queue.getPageSubscription().getPagingStore().getCursorProvider().cleanup();
+
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+ session.start();
+
+ for (int i = 0; i < 10; i++) {
+ message = consumer.receive(5000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals(i, message.getIntProperty("newid").intValue());
+ message.acknowledge();
+ }
+
+ server.stop();
+
+ // Thread.sleep(5000);
+
+
+
+ }
+
@Test
public void testCommitOnSend() throws Exception {
clearDataRecreateServerDirs();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
index d16da9f..498beb4 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
@@ -42,6 +42,8 @@ import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
+import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
+import org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl;
import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl;
import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl;
@@ -105,7 +107,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
public void testDoubleStart() throws Exception {
SequentialFileFactory factory = new FakeSequentialFileFactory();
- PagingStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, null, PagingStoreImplTest.destinationTestName, new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE), getExecutorFactory().getExecutor(), true);
+ PagingStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, new FakeStoreFactory(factory), PagingStoreImplTest.destinationTestName, new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE), getExecutorFactory().getExecutor(), true);
storeImpl.start();
@@ -160,7 +162,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
storeImpl.sync();
- storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, null, PagingStoreImplTest.destinationTestName, addressSettings, getExecutorFactory().getExecutor(), true);
+ storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, storeFactory, PagingStoreImplTest.destinationTestName, addressSettings, getExecutorFactory().getExecutor(), true);
storeImpl.start();
@@ -809,6 +811,14 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
}
@Override
+ public PageCursorProvider newCursorProvider(PagingStore store,
+ StorageManager storageManager,
+ AddressSettings addressSettings,
+ Executor executor) {
+ return new PageCursorProviderImpl(store, storageManager, executor, addressSettings.getPageCacheMaxSize());
+ }
+
+ @Override
public void setPagingManager(final PagingManager manager) {
}