You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/07/05 21:51:38 UTC
[1/2] activemq-artemis git commit: This closes #620
Repository: activemq-artemis
Updated Branches:
refs/heads/master 62f414fd3 -> ee28eac38
This closes #620
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ee28eac3
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ee28eac3
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ee28eac3
Branch: refs/heads/master
Commit: ee28eac38e70c2c33c6d4f0e6d8c613489b3efb3
Parents: 62f414f 8154120
Author: jbertram <jb...@apache.org>
Authored: Tue Jul 5 16:51:23 2016 -0500
Committer: jbertram <jb...@apache.org>
Committed: Tue Jul 5 16:51:23 2016 -0500
----------------------------------------------------------------------
.../core/paging/impl/PagingStoreFactoryNIO.java | 7 +-
.../artemis/core/server/ActiveMQServer.java | 6 +
.../core/server/ActiveMQServerLogger.java | 7 +-
.../core/server/impl/ActiveMQServerImpl.java | 127 +++----
.../core/server/impl/FileMoveManager.java | 218 ++++++++++++
.../impl/SharedNothingBackupActivation.java | 4 +-
.../impl/SharedNothingLiveActivation.java | 20 +-
.../core/server/impl/FileMoveManagerTest.java | 346 +++++++++++++++++++
.../artemis/tests/util/ActiveMQTestBase.java | 13 +-
tests/config/logging.properties.trace | 4 +-
.../cluster/failover/BackupSyncJournalTest.java | 24 +-
.../cluster/failover/FailoverTest.java | 74 ++--
.../failover/LiveToLiveFailoverTest.java | 2 +-
.../failover/ReplicatedFailoverTest.java | 25 +-
14 files changed, 746 insertions(+), 131 deletions(-)
----------------------------------------------------------------------
[2/2] activemq-artemis git commit: ARTEMIS-612 Improving Failback's
max replication
Posted by jb...@apache.org.
ARTEMIS-612 Improving Failback's max replication
The server will always restart now, with older files being removed.
The system will now move current data into ./oldreplica.#, and remove old ones.
All the logic for moving these files is encapsulated at FileMoveManager.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/81541200
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/81541200
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/81541200
Branch: refs/heads/master
Commit: 8154120027ba2b758ae3632204831247288bcc99
Parents: 62f414f
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Jun 27 14:52:59 2016 -0400
Committer: jbertram <jb...@apache.org>
Committed: Tue Jul 5 16:51:23 2016 -0500
----------------------------------------------------------------------
.../core/paging/impl/PagingStoreFactoryNIO.java | 7 +-
.../artemis/core/server/ActiveMQServer.java | 6 +
.../core/server/ActiveMQServerLogger.java | 7 +-
.../core/server/impl/ActiveMQServerImpl.java | 127 +++----
.../core/server/impl/FileMoveManager.java | 218 ++++++++++++
.../impl/SharedNothingBackupActivation.java | 4 +-
.../impl/SharedNothingLiveActivation.java | 20 +-
.../core/server/impl/FileMoveManagerTest.java | 346 +++++++++++++++++++
.../artemis/tests/util/ActiveMQTestBase.java | 13 +-
tests/config/logging.properties.trace | 4 +-
.../cluster/failover/BackupSyncJournalTest.java | 24 +-
.../cluster/failover/FailoverTest.java | 74 ++--
.../failover/LiveToLiveFailoverTest.java | 2 +-
.../failover/ReplicatedFailoverTest.java | 25 +-
14 files changed, 746 insertions(+), 131 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/81541200/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
index 00da382..d81591c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
@@ -40,6 +40,7 @@ import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
import org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.impl.FileMoveManager;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.utils.ExecutorFactory;
@@ -147,7 +148,11 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
final File addressFile = new File(file, PagingStoreFactoryNIO.ADDRESS_FILE);
if (!addressFile.exists()) {
- ActiveMQServerLogger.LOGGER.pageStoreFactoryNoIdFile(file.toString(), PagingStoreFactoryNIO.ADDRESS_FILE);
+
+ // This means this folder is from a replication copy, nothing to worry about it, we just skip it
+ if (!file.getName().contains(FileMoveManager.PREFIX)) {
+ ActiveMQServerLogger.LOGGER.pageStoreFactoryNoIdFile(file.toString(), PagingStoreFactoryNIO.ADDRESS_FILE);
+ }
continue;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/81541200/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index e416205..af2f7cf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -94,6 +94,12 @@ public interface ActiveMQServer extends ActiveMQComponent {
NodeManager getNodeManager();
+ /** it will release hold a lock for the activation. */
+ void unlockActivation();
+
+ /** it will hold a lock for the activation. This will prevent the activation from happening. */
+ void lockActivation();
+
/**
* Returns the resource to manage this ActiveMQ Artemis server.
*
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/81541200/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index ba08b7b..b9ac8a4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -998,7 +998,7 @@ public interface ActiveMQServerLogger extends BasicLogger {
@Message(id = 222161, value = "Group Handler timed-out waiting for sendCondition", format = Message.Format.MESSAGE_FORMAT)
void groupHandlerSendTimeout();
- @LogMessage(level = Logger.Level.WARN)
+ @LogMessage(level = Logger.Level.INFO)
@Message(id = 222162, value = "Moving data directory {0} to {1}", format = Message.Format.MESSAGE_FORMAT)
void backupMovingDataAway(String oldPath, String newPath);
@@ -1219,6 +1219,11 @@ public interface ActiveMQServerLogger extends BasicLogger {
format = Message.Format.MESSAGE_FORMAT)
void sslHandshakeFailed(String clientAddress, String cause);
+ @LogMessage(level = Logger.Level.INFO)
+ @Message(id = 222209, value = "There were too many old replicated folders upon startup, removing {0}",
+ format = Message.Format.MESSAGE_FORMAT)
+ void removingBackupData(String path);
+
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
void initializationError(@Cause Throwable e);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/81541200/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index d2d7783..8acdc11 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -19,7 +19,7 @@ package org.apache.activemq.artemis.core.server.impl;
import javax.management.MBeanServer;
import javax.security.cert.X509Certificate;
import java.io.File;
-import java.io.FilenameFilter;
+import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.management.ManagementFactory;
@@ -39,6 +39,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@@ -253,6 +254,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private final Map<String, ServerSession> sessions = new ConcurrentHashMap<>();
+ private final Semaphore activationLock = new Semaphore(1);
/**
* This class here has the same principle of CountDownLatch but you can reuse the counters.
* It's based on the same super classes of {@code CountDownLatch}
@@ -436,7 +438,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
activation = haPolicy.createActivation(this, wasLive, activationParams, shutdownOnCriticalIO);
}
- backupActivationThread = new Thread(activation, ActiveMQMessageBundle.BUNDLE.activationForServer(this));
+ if (logger.isTraceEnabled()) {
+ logger.trace("starting backupActivation");
+ }
+ backupActivationThread = new ActivationThread(activation, ActiveMQMessageBundle.BUNDLE.activationForServer(this));
backupActivationThread.start();
}
else {
@@ -453,6 +458,21 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
@Override
+ public void unlockActivation() {
+ activationLock.release();
+ }
+
+ @Override
+ public void lockActivation() {
+ try {
+ activationLock.acquire();
+ }
+ catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+
+ @Override
protected final void finalize() throws Throwable {
if (state != SERVER_STATE.STOPPED) {
ActiveMQServerLogger.LOGGER.serverFinalisedWIthoutBeingSTopped();
@@ -510,6 +530,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
@Override
public void setHAPolicy(HAPolicy haPolicy) {
+ if (logger.isTraceEnabled()) {
+ logger.tracef("XXX @@@ Setting %s, isBackup=%s at %s", haPolicy, haPolicy.isBackup(), this);
+ }
this.haPolicy = haPolicy;
}
@@ -707,6 +730,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
* @param criticalIOError whether we have encountered an IO error with the journal etc
*/
void stop(boolean failoverOnServerShutdown, final boolean criticalIOError, boolean restarting) {
+
synchronized (this) {
if (state == SERVER_STATE.STOPPED || state == SERVER_STATE.STOPPING) {
return;
@@ -2202,7 +2226,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
/**
* Check if journal directory exists or create it (if configured to do so)
*/
- void checkJournalDirectory() {
+ public void checkJournalDirectory() {
File journalDir = configuration.getJournalLocation();
if (!journalDir.exists() && configuration.isPersistenceEnabled()) {
@@ -2269,86 +2293,18 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return scaledDownNodeIDs.contains(scaledDownNodeId);
}
- int countNumberOfCopiedJournals() {
- //will use the main journal to check for how many backups have been kept
- File journalDir = new File(configuration.getJournalDirectory());
- final String fileName = journalDir.getName();
- int numberOfbackupsSaved = 0;
- //fine if it doesn't exist, we aren't using file based persistence so it's no issue
- if (journalDir.exists()) {
- File parentFile = new File(journalDir.getParent());
- String[] backupJournals = parentFile.list(new FilenameFilter() {
- @Override
- public boolean accept(File dir, String name) {
- return name.startsWith(fileName) && !name.matches(fileName);
- }
- });
- numberOfbackupsSaved = backupJournals != null ? backupJournals.length : 0;
- }
- return numberOfbackupsSaved;
- }
-
/**
* Move data away before starting data synchronization for fail-back.
* <p>
* Use case is a server, upon restarting, finding a former backup running in its place. It will
* move any older data away and log a warning about it.
*/
- void moveServerData() {
+ void moveServerData(int maxSavedReplicated) throws IOException {
File[] dataDirs = new File[]{configuration.getBindingsLocation(), configuration.getJournalLocation(), configuration.getPagingLocation(), configuration.getLargeMessagesLocation()};
- boolean allEmpty = true;
- int lowestSuffixForMovedData = 1;
- boolean redo = true;
-
- while (redo) {
- redo = false;
- for (File fDir : dataDirs) {
- if (fDir.exists()) {
- if (!fDir.isDirectory()) {
- throw ActiveMQMessageBundle.BUNDLE.journalDirIsFile(fDir);
- }
-
- if (fDir.list().length > 0)
- allEmpty = false;
- }
-
- String sanitizedPath = fDir.getPath();
- while (new File(sanitizedPath + lowestSuffixForMovedData).exists()) {
- lowestSuffixForMovedData++;
- redo = true;
- }
- }
- }
- if (allEmpty)
- return;
-
- for (File dir : dataDirs) {
- File newPath = new File(dir.getPath() + lowestSuffixForMovedData);
- if (dir.exists()) {
- if (!dir.renameTo(newPath)) {
- throw ActiveMQMessageBundle.BUNDLE.couldNotMoveJournal(dir);
- }
-
- ActiveMQServerLogger.LOGGER.backupMovingDataAway(dir.getAbsolutePath(), newPath.getPath());
- }
- /*
- * sometimes OS's can hold on to file handles for a while so we need to check this actually qorks and then wait
- * a while and try again if it doesn't
- * */
-
- int count = 0;
- while (!dir.exists() && !dir.mkdir()) {
- try {
- Thread.sleep(1000);
- }
- catch (InterruptedException e) {
- }
- count++;
- if (count == 5) {
- throw ActiveMQMessageBundle.BUNDLE.cannotCreateDir(dir.getPath());
- }
- }
+ for (File data : dataDirs) {
+ FileMoveManager moveManager = new FileMoveManager(data, maxSavedReplicated);
+ moveManager.doMove();
}
}
@@ -2371,4 +2327,25 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return new Date().getTime() - startDate.getTime();
}
+
+
+ private final class ActivationThread extends Thread {
+ final Runnable runnable;
+
+ ActivationThread(Runnable runnable, String name) {
+ super(name);
+ this.runnable = runnable;
+ }
+
+ public void run() {
+ lockActivation();
+ try {
+ runnable.run();
+ }
+ finally {
+ unlockActivation();
+ }
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/81541200/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileMoveManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileMoveManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileMoveManager.java
new file mode 100644
index 0000000..efe1bb2
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileMoveManager.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.impl;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Arrays;
+
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.jboss.logging.Logger;
+
+/**
+ * Used to move files away.
+ * Each time a backup starts its formeter data will be moved to a backup folder called bkp.1, bkp.2, ... etc
+ * We may control the maximum number of folders so we remove old ones.
+ */
+public class FileMoveManager {
+ private static final Logger logger = Logger.getLogger(FileMoveManager.class);
+
+ private final File folder;
+ private int maxFolders;
+ public static final String PREFIX = "oldreplica.";
+
+ private static final FilenameFilter isPrefix = new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ boolean prefixed = name.contains(PREFIX);
+
+ if (prefixed) {
+ try {
+ Integer.parseInt(name.substring(PREFIX.length()));
+ }
+ catch (NumberFormatException e) {
+ // This function is not really used a lot
+ // so I don't really mind about performance here
+ // this is good enough for what we need
+ prefixed = false;
+ }
+ }
+
+ return prefixed;
+ }
+ };
+
+ private static final FilenameFilter notPrefix = new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ return !isPrefix.accept(dir, name);
+ }
+ };
+
+
+ public FileMoveManager(File folder) {
+ this(folder, -1);
+ }
+
+ public FileMoveManager(File folder, int maxFolders) {
+ this.folder = folder;
+ this.maxFolders = maxFolders;
+ }
+
+ public int getMaxFolders() {
+ return maxFolders;
+ }
+
+ public FileMoveManager setMaxFolders(int maxFolders) {
+ this.maxFolders = maxFolders;
+ return this;
+ }
+
+ public void doMove() throws IOException {
+ String[] files = getFiles();
+
+ if (files == null || files.length == 0) {
+ // if no files, nothing to be done, no backup, no deletes... nothing!
+ return;
+ }
+
+ // Since we will create one folder, we are already taking that one into consideration
+ internalCheckOldFolders(1);
+
+ int whereToMove = getMaxID() + 1;
+
+ File folderTo = getFolder(whereToMove);
+ folderTo.mkdirs();
+
+ ActiveMQServerLogger.LOGGER.backupMovingDataAway(folder.getPath(), folderTo.getPath());
+
+ for (String fileMove : files) {
+ File fileFrom = new File(folder, fileMove);
+ File fileTo = new File(folderTo, fileMove);
+ logger.tracef("doMove:: moving %s as %s", fileFrom, fileTo);
+ Files.move(fileFrom.toPath(), fileTo.toPath());
+ }
+
+ }
+
+ public void checkOldFolders() {
+ internalCheckOldFolders(0);
+ }
+
+ private void internalCheckOldFolders(int creating) {
+ if (maxFolders > 0) {
+ int folders = getNumberOfFolders();
+
+ // We are counting the next one to be created
+ int foldersToDelete = folders + creating - maxFolders;
+
+ if (foldersToDelete > 0) {
+ logger.tracef("There are %d folders to delete", foldersToDelete);
+ int[] ids = getIDlist();
+ for (int i = 0; i < foldersToDelete; i++) {
+ File file = getFolder(ids[i]);
+ ActiveMQServerLogger.LOGGER.removingBackupData(file.getPath());
+ deleteTree(file);
+ }
+ }
+ }
+ }
+
+ /**
+ * It will return non backup folders
+ */
+ public String[] getFiles() {
+ return folder.list(notPrefix);
+ }
+
+
+ public int getNumberOfFolders() {
+ return getFolders().length;
+ }
+
+ public String[] getFolders() {
+ String[] list = folder.list(isPrefix);
+
+ if (list == null) {
+ list = new String[0];
+ }
+
+
+ return list;
+ }
+
+
+ public int getMinID() {
+ int[] list = getIDlist();
+
+ if (list.length == 0) {
+ return 0;
+ }
+
+ return list[0];
+ }
+
+ public int getMaxID() {
+ int[] list = getIDlist();
+
+ if (list.length == 0) {
+ return 0;
+ }
+
+ return list[list.length - 1];
+ }
+
+
+ public int[] getIDlist() {
+ String[] list = getFolders();
+ int[] ids = new int[list.length];
+ for (int i = 0; i < ids.length; i++) {
+ ids[i] = getID(list[i]);
+ }
+
+ Arrays.sort(ids);
+
+ return ids;
+ }
+
+ public int getID(String folderName) {
+ return Integer.parseInt(folderName.substring(PREFIX.length()));
+ }
+
+
+ public File getFolder(int id) {
+ return new File(folder, PREFIX + id);
+ }
+
+
+ private void deleteTree(File file) {
+ File[] files = file.listFiles();
+
+ if (files != null) {
+ for (File fileDelete : files) {
+ deleteTree(fileDelete);
+ }
+ }
+
+ file.delete();
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/81541200/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
index e2adc1f..d279864 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
@@ -104,7 +104,7 @@ public final class SharedNothingBackupActivation extends Activation {
}
// move all data away:
activeMQServer.getNodeManager().stop();
- activeMQServer.moveServerData();
+ activeMQServer.moveServerData(replicaPolicy.getMaxSavedReplicatedJournalsSize());
activeMQServer.getNodeManager().start();
synchronized (this) {
if (closed)
@@ -311,7 +311,7 @@ public final class SharedNothingBackupActivation extends Activation {
}
if (logger.isTraceEnabled()) {
- logger.trace("setReplicaPolicy::" + replicaPolicy);
+ logger.trace("@@@ setReplicaPolicy::" + replicaPolicy);
}
replicaPolicy.getReplicatedPolicy().setReplicaPolicy(replicaPolicy);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/81541200/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
index 3e0d812..6b222fb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
@@ -90,11 +90,16 @@ public class SharedNothingLiveActivation extends LiveActivation {
try {
if (replicatedPolicy.isCheckForLiveServer() && isNodeIdUsed()) {
//set for when we failback
+ if (logger.isTraceEnabled()) {
+ logger.tracef("@@@ setting up replicatedPolicy.getReplicaPolicy for back start, replicaPolicy::%s, isBackup=%s, server=%s", replicatedPolicy.getReplicaPolicy(), replicatedPolicy.isBackup(), activeMQServer);
+ }
replicatedPolicy.getReplicaPolicy().setReplicatedPolicy(replicatedPolicy);
activeMQServer.setHAPolicy(replicatedPolicy.getReplicaPolicy());
return;
}
+ logger.trace("@@@ did not do it now");
+
activeMQServer.initialisePart1(false);
activeMQServer.initialisePart2(false);
@@ -175,16 +180,11 @@ public class SharedNothingLiveActivation extends LiveActivation {
clusterConnection.addClusterTopologyListener(listener1);
if (listener1.waitForBackup()) {
//if we have to many backups kept or are not configured to restart just stop, otherwise restart as a backup
- if (!replicatedPolicy.getReplicaPolicy().isRestartBackup() && activeMQServer.countNumberOfCopiedJournals() >= replicatedPolicy.getReplicaPolicy().getMaxSavedReplicatedJournalsSize() && replicatedPolicy.getReplicaPolicy().getMaxSavedReplicatedJournalsSize() >= 0) {
- activeMQServer.stop(true);
- ActiveMQServerLogger.LOGGER.stopReplicatedBackupAfterFailback();
- }
- else {
- activeMQServer.stop(true);
- ActiveMQServerLogger.LOGGER.restartingReplicatedBackupAfterFailback();
- activeMQServer.setHAPolicy(replicatedPolicy.getReplicaPolicy());
- activeMQServer.start();
- }
+ activeMQServer.stop(true);
+ ActiveMQServerLogger.LOGGER.restartingReplicatedBackupAfterFailback();
+// activeMQServer.moveServerData(replicatedPolicy.getReplicaPolicy().getMaxSavedReplicatedJournalsSize());
+ activeMQServer.setHAPolicy(replicatedPolicy.getReplicaPolicy());
+ activeMQServer.start();
}
else {
ActiveMQServerLogger.LOGGER.failbackMissedBackupAnnouncement();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/81541200/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/FileMoveManagerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/FileMoveManagerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/FileMoveManagerTest.java
new file mode 100644
index 0000000..0935c38
--- /dev/null
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/FileMoveManagerTest.java
@@ -0,0 +1,346 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.impl;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
+import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
+import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.util.ThreadLeakCheckRule;
+import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class FileMoveManagerTest {
+
+ @Rule
+ public TemporaryFolder temporaryFolder;
+
+ @Rule
+ public ThreadLeakCheckRule leakCheckRule = new ThreadLeakCheckRule();
+
+ private File dataLocation;
+ private FileMoveManager manager;
+
+
+ @Before
+ public void setUp() {
+ dataLocation = new File(temporaryFolder.getRoot(), "data");
+ dataLocation.mkdirs();
+ manager = new FileMoveManager(dataLocation, 10);
+ }
+
+
+ public FileMoveManagerTest() {
+ File parent = new File("./target/tmp");
+ parent.mkdirs();
+ temporaryFolder = new TemporaryFolder(parent);
+ }
+
+ @Test
+ public void testBackupFiles() {
+ int[] originalFiles = new int[12];
+ int count = 0;
+
+ // It will fake folders creation
+ for (int i = 0; i < 12; i++) {
+ originalFiles[count++] = i;
+ File bkp = new File(dataLocation, FileMoveManager.PREFIX + i);
+ bkp.mkdirs();
+ }
+
+ Assert.assertEquals(12, manager.getFolders().length);
+ Assert.assertEquals(12, manager.getNumberOfFolders());
+
+
+ assertIDs(originalFiles, manager.getIDlist());
+ }
+
+ @Test
+ public void testMinMax() {
+ int[] originalFiles = new int[12];
+ int count = 0;
+
+ // It will fake folders creation
+ for (int i = 0; i < 5; i++) {
+ originalFiles[count++] = i;
+ File bkp = new File(dataLocation, FileMoveManager.PREFIX + i);
+ bkp.mkdirs();
+ }
+
+ // simulates a hole where someone removed a folder by hand
+
+ // It will fake folders creation
+ for (int i = 7; i < 14; i++) {
+ originalFiles[count++] = i;
+ File bkp = new File(dataLocation, FileMoveManager.PREFIX + i);
+ bkp.mkdirs();
+ }
+
+ Assert.assertEquals(12, manager.getFolders().length);
+ Assert.assertEquals(12, manager.getNumberOfFolders());
+
+ int[] ids = manager.getIDlist();
+
+ assertIDs(originalFiles, ids);
+
+ Assert.assertEquals(0, manager.getMinID());
+ Assert.assertEquals(13, manager.getMaxID());
+
+ manager.setMaxFolders(3).checkOldFolders();
+
+ Assert.assertEquals(3, manager.getNumberOfFolders());
+ Assert.assertEquals(13, manager.getMaxID());
+ Assert.assertEquals(11, manager.getMinID());
+
+ }
+
+ @Test
+ public void testGarbageCreated() {
+ // I'm pretending an admin created a folder here
+ File garbage = new File(dataLocation, "bkp.zzz");
+ garbage.mkdirs();
+
+ testMinMax();
+
+ resetTmp();
+ // the admin renamed a folder maybe
+ garbage = new File(dataLocation, "bkp.001.old");
+ garbage.mkdirs();
+
+ resetTmp();
+
+ // the admin renamed a folder maybe
+ garbage = new File(dataLocation, "bkp.1.5");
+ garbage.mkdirs();
+
+ testMinMax();
+ }
+
+
+ @Test
+ public void testNoFolders() {
+ Assert.assertEquals(0, manager.getFolders().length);
+ Assert.assertEquals(0, manager.getNumberOfFolders());
+
+ Assert.assertTrue(dataLocation.delete());
+
+ Assert.assertEquals(0, manager.getFolders().length);
+ Assert.assertEquals(0, manager.getNumberOfFolders());
+ }
+
+
+ @Test
+ public void testNoFiles() throws Exception {
+ // nothing to be moved, so why to do a backup
+ manager.doMove();
+
+ Assert.assertEquals(0, manager.getNumberOfFolders());
+ }
+
+ @Test
+ public void testMoveFiles() throws Exception {
+ manager.setMaxFolders(3);
+
+ for (int bkp = 1; bkp <= 10; bkp++) {
+ for (int i = 0; i < 100; i++) {
+ createFile(dataLocation, i);
+ }
+
+ manager.doMove();
+
+ // We will always have maximum of 3 folders
+ Assert.assertEquals(Math.min(bkp, manager.getMaxFolders()), manager.getNumberOfFolders());
+
+ File bkpFolder = manager.getFolder(bkp);
+
+ FileMoveManager bkp1Manager = new FileMoveManager(bkpFolder, 10);
+ String[] filesAfterMove = bkp1Manager.getFiles();
+
+ for (String file : filesAfterMove) {
+ checkFile(bkpFolder, file);
+ }
+ }
+
+ Assert.assertEquals(manager.getMaxFolders(), manager.getNumberOfFolders());
+
+ manager.setMaxFolders(0).checkOldFolders();
+
+ Assert.assertEquals(3, manager.getNumberOfFolders());
+
+ manager.setMaxFolders(1).checkOldFolders();
+ Assert.assertEquals(1, manager.getNumberOfFolders());
+
+
+ Assert.assertEquals(10, manager.getMaxID());
+ Assert.assertEquals(10, manager.getMinID());
+ }
+
+
+ @Test
+ public void testMoveFolders() throws Exception {
+ manager.setMaxFolders(3);
+
+ int NUMBER_OF_FOLDERS = 10;
+ int FILES_PER_FOLDER = 10;
+
+ for (int bkp = 1; bkp <= 10; bkp++) {
+ for (int f = 0; f < NUMBER_OF_FOLDERS; f++) {
+ File folderF = new File(dataLocation, "folder" + f);
+ folderF.mkdirs();
+
+ // FILES_PER_FOLDER + f, I'm just creating more files as f grows.
+ // this is just to make each folder unique somehow
+ for (int i = 0; i < FILES_PER_FOLDER + f; i++) {
+ createFile(folderF, i);
+ }
+ }
+
+ manager.doMove();
+
+ // We will always have maximum of 3 folders
+ Assert.assertEquals(Math.min(bkp, manager.getMaxFolders()), manager.getNumberOfFolders());
+
+ File bkpFolder = manager.getFolder(bkp);
+
+ for (int f = 0; f < NUMBER_OF_FOLDERS; f++) {
+ File fileTmp = new File(bkpFolder, "folder" + f);
+
+ String[] filesOnFolder = fileTmp.list();
+
+ Assert.assertEquals(FILES_PER_FOLDER + f, filesOnFolder.length);
+
+ for (String file : filesOnFolder) {
+ checkFile(fileTmp, file);
+ }
+ }
+
+ }
+
+ Assert.assertEquals(manager.getMaxFolders(), manager.getNumberOfFolders());
+
+ manager.setMaxFolders(0).checkOldFolders();
+
+ Assert.assertEquals(3, manager.getNumberOfFolders());
+
+ manager.setMaxFolders(1).checkOldFolders();
+ Assert.assertEquals(1, manager.getNumberOfFolders());
+
+
+ Assert.assertEquals(10, manager.getMaxID());
+ Assert.assertEquals(10, manager.getMinID());
+ }
+
+ @Test
+ public void testMoveOverPaging() throws Exception {
+ AssertionLoggerHandler.startCapture();
+
+ ExecutorService threadPool = Executors.newCachedThreadPool();
+ try {
+ manager.setMaxFolders(3);
+ for (int i = 1; i <= 10; i++) {
+ HierarchicalRepository<AddressSettings> addressSettings = new HierarchicalObjectRepository<>();
+ AddressSettings settings = new AddressSettings();
+ settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ addressSettings.setDefault(settings);
+
+ final StorageManager storageManager = new NullStorageManager();
+
+ PagingStoreFactoryNIO storeFactory =
+ new PagingStoreFactoryNIO(storageManager, dataLocation, 100, null,
+ new OrderedExecutorFactory(threadPool), true, null);
+
+ PagingManagerImpl managerImpl = new PagingManagerImpl(storeFactory, addressSettings);
+
+ managerImpl.start();
+
+ PagingStore store = managerImpl.getPageStore(new SimpleString("simple-test"));
+
+ store.startPaging();
+
+ store.stop();
+
+ managerImpl.stop();
+
+ manager.doMove();
+
+ Assert.assertEquals(Math.min(i, manager.getMaxFolders()), manager.getNumberOfFolders());
+ }
+
+ Assert.assertFalse("The loggers are complaining about address.txt", AssertionLoggerHandler.findText("address.txt"));
+ }
+ finally {
+ AssertionLoggerHandler.stopCapture();
+ threadPool.shutdown();
+ }
+
+
+ }
+
+
+ private void assertIDs(int[] originalFiles, int[] ids) {
+ Assert.assertEquals(originalFiles.length, ids.length);
+ for (int i = 0; i < ids.length; i++) {
+ Assert.assertEquals(originalFiles[i], ids[i]);
+ }
+ }
+
+ private void resetTmp() {
+ temporaryFolder.delete();
+ temporaryFolder.getRoot().mkdirs();
+ Assert.assertEquals(0, manager.getNumberOfFolders());
+ }
+
+ private void createFile(File folder, int i) throws FileNotFoundException {
+ File dataFile = new File(folder, i + ".jrn");
+ PrintWriter outData = new PrintWriter(new FileOutputStream(dataFile));
+ outData.print(i);
+ outData.close();
+ }
+
+ private void checkFile(File bkpFolder, String file) throws IOException {
+ File fileRead = new File(bkpFolder, file);
+ InputStreamReader stream = new InputStreamReader(new FileInputStream(fileRead));
+ BufferedReader reader = new BufferedReader(stream);
+ String valueRead = reader.readLine();
+ int id = Integer.parseInt(file.substring(0, file.indexOf('.')));
+ Assert.assertEquals("content of the file wasn't the expected", id, Integer.parseInt(valueRead));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/81541200/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index 6a9f729..f9a9535 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -1297,6 +1297,10 @@ public abstract class ActiveMQTestBase extends Assert {
}
protected void waitForServerToStart(ActiveMQServer server) throws InterruptedException {
+ waitForServerToStart(server, true);
+ }
+
+ protected void waitForServerToStart(ActiveMQServer server, boolean activation) throws InterruptedException {
if (server == null)
return;
final long wait = 5000;
@@ -1310,9 +1314,12 @@ public abstract class ActiveMQTestBase extends Assert {
fail("server didn't start: " + server);
}
- if (!server.getHAPolicy().isBackup()) {
- if (!server.waitForActivation(wait, TimeUnit.MILLISECONDS))
- fail("Server didn't initialize: " + server);
+
+ if (activation) {
+ if (!server.getHAPolicy().isBackup()) {
+ if (!server.waitForActivation(wait, TimeUnit.MILLISECONDS))
+ fail("Server didn't initialize: " + server);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/81541200/tests/config/logging.properties.trace
----------------------------------------------------------------------
diff --git a/tests/config/logging.properties.trace b/tests/config/logging.properties.trace
index aa23850..cd6e364 100644
--- a/tests/config/logging.properties.trace
+++ b/tests/config/logging.properties.trace
@@ -51,7 +51,7 @@ handler.TEST.formatter=PATTERN
# Formatter pattern configuration
formatter.PATTERN=org.jboss.logmanager.formatters.PatternFormatter
formatter.PATTERN.properties=pattern
-formatter.PATTERN.pattern=[%t] %d{HH:mm:ss,SSS} %-5p [%c] %s%E%n
+#formatter.PATTERN.pattern=[%t] %d{HH:mm:ss,SSS} %-5p [%c] %s%E%n
# Alternate format useful when debugging
-#formatter.PATTERN.pattern=*** [%t] ***\n%d{HH:mm:ss,SSS} %-5p [%c] %s%E%n\n
+formatter.PATTERN.pattern=*** [%t] ***\n%d{HH:mm:ss,SSS} %-5p [%c] %s%E%n\n
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/81541200/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java
index fa520c9..c32ebc1 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/BackupSyncJournalTest.java
@@ -41,15 +41,20 @@ import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.FileMoveManager;
import org.apache.activemq.artemis.tests.integration.cluster.util.BackupSyncDelay;
import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer;
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
import org.apache.activemq.artemis.utils.UUID;
+import org.jboss.logging.Logger;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class BackupSyncJournalTest extends FailoverTestBase {
+ private static final Logger logger = Logger.getLogger(BackupSyncJournalTest.class);
+
protected static final int BACKUP_WAIT_TIME = 60;
private ServerLocatorInternal locator;
protected ClientSessionFactoryInternal sessionFactory;
@@ -283,17 +288,28 @@ public class BackupSyncJournalTest extends FailoverTestBase {
sendMessages(session, producer, 2 * n_msgs);
assertFalse("must NOT be a backup", liveServer.getServer().getHAPolicy().isBackup());
adaptLiveConfigForReplicatedFailBack(liveServer);
- liveServer.start();
+ FileMoveManager liveMoveManager = new FileMoveManager(liveServer.getServer().getConfiguration().getJournalLocation(), -1);
+ liveServer.getServer().lockActivation();
+ try {
+ liveServer.start();
+ assertTrue("must have become a backup", liveServer.getServer().getHAPolicy().isBackup());
+ Assert.assertEquals(0, liveMoveManager.getNumberOfFolders());
+ }
+ finally {
+ liveServer.getServer().unlockActivation();
+ }
waitForServerToStart(liveServer.getServer());
- assertTrue("must have become a backup", liveServer.getServer().getHAPolicy().isBackup());
+ liveServer.getServer().waitForActivation(10, TimeUnit.SECONDS);
+ Assert.assertEquals(1, liveMoveManager.getNumberOfFolders());
+ assertTrue("must be active now", !liveServer.getServer().getHAPolicy().isBackup());
assertTrue("Fail-back must initialize live!", liveServer.getServer().waitForActivation(15, TimeUnit.SECONDS));
assertFalse("must be LIVE!", liveServer.getServer().getHAPolicy().isBackup());
int i = 0;
- while (backupServer.isStarted() && i++ < 100) {
+ while (!backupServer.isStarted() && i++ < 100) {
Thread.sleep(100);
}
- assertFalse("Backup should stop!", backupServer.getServer().isStarted());
+ assertTrue(backupServer.getServer().isStarted());
assertTrue(liveServer.getServer().isStarted());
receiveMsgsInRange(0, 2 * n_msgs);
assertNoMoreMessages();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/81541200/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
index ea9cb8e..f915a31 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
@@ -44,8 +44,6 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
-import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
-import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.server.cluster.ha.BackupPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.HAPolicy;
@@ -53,9 +51,12 @@ import org.apache.activemq.artemis.core.server.cluster.ha.ReplicaPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.ReplicatedPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreMasterPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreSlavePolicy;
+import org.apache.activemq.artemis.core.server.impl.FileMoveManager;
import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage;
+import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer;
import org.apache.activemq.artemis.tests.util.CountDownSessionFailureListener;
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
import org.apache.activemq.artemis.utils.RandomUtil;
@@ -518,10 +519,12 @@ public class FailoverTest extends FailoverTestBase {
boolean doFailBack = true;
HAPolicy haPolicy = backupServer.getServer().getHAPolicy();
if (haPolicy instanceof ReplicaPolicy) {
- ((ReplicaPolicy) haPolicy).setMaxSavedReplicatedJournalsSize(0);
+ ((ReplicaPolicy) haPolicy).setMaxSavedReplicatedJournalsSize(1);
}
- simpleReplication(doFailBack);
+ simpleFailover(haPolicy instanceof ReplicaPolicy, doFailBack);
+ tearDown();
+ setUp();
}
@Test
@@ -571,9 +574,10 @@ public class FailoverTest extends FailoverTestBase {
}
@Test
- public void testSimpleReplication() throws Exception {
- boolean doFailBack = false;
- simpleReplication(doFailBack);
+ public void testSimpleFailover() throws Exception {
+ HAPolicy haPolicy = backupServer.getServer().getHAPolicy();
+
+ simpleFailover(haPolicy instanceof ReplicaPolicy, false);
}
@Test
@@ -628,7 +632,7 @@ public class FailoverTest extends FailoverTestBase {
* @param doFailBack
* @throws Exception
*/
- private void simpleReplication(boolean doFailBack) throws Exception {
+ private void simpleFailover(boolean isReplicated, boolean doFailBack) throws Exception {
locator.setFailoverOnInitialConnection(true);
createSessionFactory();
ClientSession session = createSessionAndQueue();
@@ -660,10 +664,16 @@ public class FailoverTest extends FailoverTestBase {
liveServer.start();
Assert.assertTrue("live initialized...", liveServer.getServer().waitForActivation(40, TimeUnit.SECONDS));
int i = 0;
- while (backupServer.isStarted() && i++ < 100) {
+ while (!backupServer.isStarted() && i++ < 100) {
Thread.sleep(100);
}
- Assert.assertFalse("Backup should stop!", backupServer.isStarted());
+ liveServer.getServer().waitForActivation(5, TimeUnit.SECONDS);
+ Assert.assertTrue(backupServer.isStarted());
+
+ if (isReplicated) {
+ FileMoveManager moveManager = new FileMoveManager(backupServer.getServer().getConfiguration().getJournalLocation(), 0);
+ Assert.assertEquals(1, moveManager.getNumberOfFolders());
+ }
}
else {
backupServer.stop();
@@ -886,35 +896,49 @@ public class FailoverTest extends FailoverTestBase {
@Test
public void testTransactedMessagesNotSentSoNoRollback() throws Exception {
- createSessionFactory();
+ try {
+ createSessionFactory();
- ClientSession session = createSessionAndQueue();
+ ClientSession session = createSessionAndQueue();
- ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+ ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- sendMessagesSomeDurable(session, producer);
+ sendMessagesSomeDurable(session, producer);
- session.commit();
+ session.commit();
- crash(session);
+ crash(session);
- // committing again should work since didn't send anything since last commit
+ // committing again should work since didn't send anything since last commit
- Assert.assertFalse(session.isRollbackOnly());
+ Assert.assertFalse(session.isRollbackOnly());
- session.commit();
+ session.commit();
- ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+ ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
- session.start();
+ session.start();
- receiveDurableMessages(consumer);
+ receiveDurableMessages(consumer);
- Assert.assertNull(consumer.receiveImmediate());
+ Assert.assertNull(consumer.receiveImmediate());
- session.commit();
+ session.commit();
- session.close();
+ session.close();
+ }
+ finally {
+ try {
+ liveServer.getServer().stop();
+ }
+ catch (Throwable ignored) {
+ }
+ try {
+ backupServer.getServer().stop();
+ }
+ catch (Throwable ignored) {
+ }
+ }
}
@Test
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/81541200/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveToLiveFailoverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveToLiveFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveToLiveFailoverTest.java
index 68f65a4..66c48b5 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveToLiveFailoverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LiveToLiveFailoverTest.java
@@ -320,7 +320,7 @@ public class LiveToLiveFailoverTest extends FailoverTest {
}
@Override
- public void testSimpleReplication() throws Exception {
+ public void testSimpleFailover() throws Exception {
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/81541200/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedFailoverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedFailoverTest.java
index f03326e..8dcf905 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedFailoverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedFailoverTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.tests.integration.cluster.failover;
+import java.util.concurrent.TimeUnit;
+
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration;
@@ -98,22 +100,31 @@ public class ReplicatedFailoverTest extends FailoverTest {
liveServer.start();
- waitForRemoteBackupSynchronization(liveServer.getServer());
-
waitForServerToStart(liveServer.getServer());
- //this will give the backup time to stop fully
- waitForServerToStop(backupServer.getServer());
+ backupServer.getServer().waitForActivation(5, TimeUnit.SECONDS);
+
+ waitForRemoteBackupSynchronization(liveServer.getServer());
+
+ waitForServerToStart(backupServer.getServer());
- assertFalse(backupServer.getServer().isStarted());
+ assertTrue(backupServer.getServer().isStarted());
- //the server wouldnt have reset to backup
- assertFalse(backupServer.getServer().getHAPolicy().isBackup());
}
finally {
if (sf != null) {
sf.close();
}
+ try {
+ liveServer.getServer().stop();
+ }
+ catch (Throwable ignored) {
+ }
+ try {
+ backupServer.getServer().stop();
+ }
+ catch (Throwable ignored) {
+ }
}
}