You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nr...@apache.org on 2018/01/30 00:35:15 UTC
[geode] branch develop updated: GEODE-4328: Make backup into a task
run by a single thread (#1351)
This is an automated email from the ASF dual-hosted git repository.
nreich pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 52ff5ef GEODE-4328: Make backup into a task run by a single thread (#1351)
52ff5ef is described below
commit 52ff5ef3e541bcbc4a427a7dc35588ff9b6f3b16
Author: Nick Reich <nr...@pivotal.io>
AuthorDate: Mon Jan 29 16:35:11 2018 -0800
GEODE-4328: Make backup into a task run by a single thread (#1351)
* The BackupLock had to acquire and release the underlying lock and keep
* track of who was supposed to be the true owner of the lock due to the fact
* that the part of backup that acquired the locks was done on a different
* thread than the one that release the locks. Since the backup task now is
* completed in a single thread, we can replace it with a ReentrantLock.
---
.../apache/geode/internal/cache/DiskStoreImpl.java | 15 +-
.../geode/internal/cache/backup/BackupLock.java | 114 -----
.../geode/internal/cache/backup/BackupManager.java | 474 +++------------------
.../backup/{BackupManager.java => BackupTask.java} | 118 +++--
.../geode/internal/cache/backup/PrepareBackup.java | 6 +-
.../cache/backup/PrepareBackupOperation.java | 2 +-
.../cache/backup/PrepareBackupRequest.java | 2 +-
.../cache/backup/TemporaryBackupFiles.java | 3 +-
.../internal/beans/MemberMBeanBridge.java | 6 +-
.../cache/backup/BackupIntegrationTest.java | 15 +-
.../internal/cache/backup/BackupLockTest.java | 105 -----
.../backup/BackupPrepareAndFinishMsgDUnitTest.java | 23 +-
.../backup/IncrementalBackupDistributedTest.java | 2 +-
.../beans/DistributedSystemBridgeJUnitTest.java | 3 +-
.../apache/geode/codeAnalysis/excludedClasses.txt | 1 -
15 files changed, 142 insertions(+), 747 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
index 511b821..5aa153a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
@@ -56,6 +56,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -87,14 +89,12 @@ import org.apache.geode.distributed.internal.membership.InternalDistributedMembe
import org.apache.geode.i18n.StringId;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.ExportDiskRegion.ExportWriter;
-import org.apache.geode.internal.cache.backup.BackupLock;
import org.apache.geode.internal.cache.backup.BackupManager;
import org.apache.geode.internal.cache.entries.DiskEntry;
import org.apache.geode.internal.cache.entries.DiskEntry.Helper.ValueWrapper;
import org.apache.geode.internal.cache.entries.DiskEntry.RecoveredEntry;
import org.apache.geode.internal.cache.eviction.AbstractEvictionController;
import org.apache.geode.internal.cache.eviction.EvictionController;
-import org.apache.geode.internal.cache.eviction.EvictionCounters;
import org.apache.geode.internal.cache.persistence.BytesAndBits;
import org.apache.geode.internal.cache.persistence.DiskRecoveryStore;
import org.apache.geode.internal.cache.persistence.DiskRegionView;
@@ -3954,9 +3954,9 @@ public class DiskStoreImpl implements DiskStore {
* Lock used to synchronize access to the init file. This is a lock rather than a synchronized
* block because the backup tool needs to acquire this lock.
*/
- private final BackupLock backupLock = new BackupLock();
+ private final ReentrantLock backupLock = new ReentrantLock();
- public BackupLock getBackupLock() {
+ public ReentrantLock getBackupLock() {
return backupLock;
}
@@ -4002,14 +4002,17 @@ public class DiskStoreImpl implements DiskStore {
// level operations, we will need to be careful
// to block them *before* they are put in the async
// queue
- getBackupLock().lockForBackup();
+ getBackupLock().lock();
}
/**
* Release the lock that is preventing operations on this disk store during the backup process.
*/
public void releaseBackupLock() {
- getBackupLock().unlockForBackup();
+ ReentrantLock backupLock = getBackupLock();
+ if (backupLock.isHeldByCurrentThread()) {
+ backupLock.unlock();
+ }
}
private int getArrayIndexOfDirectory(File searchDir) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupLock.java b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupLock.java
deleted file mode 100644
index 29d6976..0000000
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupLock.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.internal.cache.backup;
-
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * A lock used for the backup process. This is a reentrant lock that provides a "backup" mode, where
- * the lock is held by a "backup thread" which can be assigned later than the time we lock.
- *
- * <p>
- * We need this because our backup process is two phase. In the first phase we acquire the lock and
- * in the second phase we actually do the backup. During the second phase we need to reenter the
- * lock and release it with a different thread.
- */
-public class BackupLock extends ReentrantLock {
-
- private final ThreadLocal<Boolean> isBackupThread = new ThreadLocal<>();
- private boolean isBackingUp;
- private Condition backupDone = super.newCondition();
-
- // test hook
- private final AtomicReference<BackupLockTestHook> hook = new AtomicReference<>();
-
- public interface BackupLockTestHook {
- /**
- * Test hook called before the wait for backup to complete
- */
- void beforeWaitForBackupCompletion();
- }
-
- public void setBackupLockTestHook(BackupLockTestHook testHook) {
- hook.set(testHook);
- }
-
- public void lockForBackup() {
- super.lock();
- isBackingUp = true;
- super.unlock();
- }
-
- void setBackupThread() {
- isBackupThread.set(true);
- }
-
- public void unlockForBackup() {
- super.lock();
- isBackingUp = false;
- isBackupThread.remove();
- backupDone.signalAll();
- super.unlock();
- }
-
- boolean isCurrentThreadDoingBackup() {
- Boolean result = isBackupThread.get();
- return (result != null) && result;
- }
-
- /**
- * For testing only
- */
- boolean isBackingUp() {
- return isBackingUp;
- }
-
- /**
- * For testing only
- */
- boolean hasThreadLocal() {
- return isBackupThread.get() != null;
- }
-
- @Override
- public void unlock() {
- // The backup thread does not need to unlock this lock since it never gets the lock. It is the
- // only thread that has permission to modify disk files during backup.
- if (!isCurrentThreadDoingBackup()) {
- super.unlock();
- }
- }
-
- /**
- * Acquire this lock, waiting for a backup to finish the first phase.
- */
- @Override
- public void lock() {
- // The backup thread is a noop; it does not need to get the lock since it is the only thread
- // with permission to modify disk files during backup
- if (!isCurrentThreadDoingBackup()) {
- super.lock();
- while (isBackingUp) {
- BackupLockTestHook testHook = hook.get();
- if (testHook != null) {
- testHook.beforeWaitForBackupCompletion();
- }
- backupDone.awaitUninterruptibly();
- }
- }
- }
-}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupManager.java b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupManager.java
index 8503b2e..7f55770 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupManager.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupManager.java
@@ -15,476 +15,108 @@
package org.apache.geode.internal.cache.backup;
import java.io.File;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import java.util.Set;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.io.FileUtils;
import org.apache.logging.log4j.Logger;
-import org.apache.geode.InternalGemFireError;
-import org.apache.geode.cache.DiskStore;
import org.apache.geode.cache.persistence.PersistentID;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.MembershipListener;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.cache.DirectoryHolder;
import org.apache.geode.internal.cache.DiskStoreBackup;
import org.apache.geode.internal.cache.DiskStoreImpl;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.cache.Oplog;
import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.logging.LoggingThreadGroup;
-/**
- * This class manages the state an logic to backup a single cache.
- */
public class BackupManager {
- private static final Logger logger = LogService.getLogger();
-
- static final String INCOMPLETE_BACKUP_FILE = "INCOMPLETE_BACKUP_FILE";
+ Logger logger = LogService.getLogger();
- private static final String BACKUP_DIR_PREFIX = "dir";
- private static final String DATA_STORES_DIRECTORY = "diskstores";
public static final String DATA_STORES_TEMPORARY_DIRECTORY = "backupTemp_";
- private static final String USER_FILES = "user";
-
+ private final ExecutorService executor;
private final MembershipListener membershipListener = new BackupMembershipListener();
- private final Map<DiskStoreImpl, DiskStoreBackup> backupByDiskStore = new HashMap<>();
- private final RestoreScript restoreScript = new RestoreScript();
- private final InternalDistributedMember sender;
private final InternalCache cache;
- private final CountDownLatch allowDestroys = new CountDownLatch(1);
- private final String memberId;
+ private final InternalDistributedMember sender;
- private volatile boolean isCancelled = false;
- private TemporaryBackupFiles temporaryFiles;
- private BackupFileCopier fileCopier;
+ private BackupTask task;
+ private Future<HashSet<PersistentID>> taskFuture;
- public BackupManager(InternalDistributedMember sender, InternalCache gemFireCache) {
- this.sender = sender;
- this.cache = gemFireCache;
- memberId = getCleanedMemberId();
- }
- public void validateRequestingAdmin() {
- // We need to watch for pure admin guys that depart. this allMembershipListener set
- // looks like it should receive those events.
- Set allIds = getDistributionManager().addAllMembershipListenerAndGetAllIds(membershipListener);
- if (!allIds.contains(sender)) {
- cleanup();
- throw new IllegalStateException("The admin member requesting a backup has already departed");
- }
+ public BackupManager(InternalDistributedMember sender, InternalCache cache) {
+ this.cache = cache;
+ this.sender = sender;
+ executor = createExecutor();
}
- public HashSet<PersistentID> prepareForBackup() {
- HashSet<PersistentID> persistentIds = new HashSet<>();
- for (DiskStore store : cache.listDiskStoresIncludingRegionOwned()) {
- DiskStoreImpl storeImpl = (DiskStoreImpl) store;
+ private ExecutorService createExecutor() {
+ LoggingThreadGroup group = LoggingThreadGroup.createThreadGroup("BackupManager Thread", logger);
+ ThreadFactory threadFactory = new ThreadFactory() {
+ private final AtomicInteger threadId = new AtomicInteger();
- storeImpl.lockStoreBeforeBackup();
- if (storeImpl.hasPersistedData()) {
- persistentIds.add(storeImpl.getPersistentID());
- storeImpl.getStats().startBackup();
+ public Thread newThread(final Runnable command) {
+ Thread thread =
+ new Thread(group, command, "BackupManagerThread" + this.threadId.incrementAndGet());
+ thread.setDaemon(true);
+ return thread;
}
- }
- return persistentIds;
+ };
+ return Executors.newSingleThreadExecutor(threadFactory);
}
- public HashSet<PersistentID> doBackup(File targetDir, File baselineDir, boolean abort)
- throws IOException {
- if (abort) {
- cleanup();
- return new HashSet<>();
- }
-
- try {
- temporaryFiles = TemporaryBackupFiles.create();
- fileCopier = new BackupFileCopier(cache, temporaryFiles);
- File memberBackupDir = new File(targetDir, memberId);
-
- // Make sure our baseline is okay for this member, then create inspector for baseline backup
- baselineDir = checkBaseline(baselineDir);
- BackupInspector inspector =
- (baselineDir == null ? null : BackupInspector.createInspector(baselineDir));
- File storesDir = new File(memberBackupDir, DATA_STORES_DIRECTORY);
- Collection<DiskStore> diskStores = cache.listDiskStoresIncludingRegionOwned();
-
- Map<DiskStoreImpl, DiskStoreBackup> backupByDiskStores =
- startDiskStoreBackups(inspector, storesDir, diskStores);
- allowDestroys.countDown();
- HashSet<PersistentID> persistentIds = finishDiskStoreBackups(backupByDiskStores);
-
- if (!backupByDiskStores.isEmpty()) {
- // TODO: allow different strategies...
- BackupDefinition backupDefinition = fileCopier.getBackupDefinition();
- backupAdditionalFiles(memberBackupDir);
- backupDefinition.setRestoreScript(restoreScript);
- BackupDestination backupDestination =
- new FileSystemBackupDestination(memberBackupDir.toPath());
- backupDestination.backupFiles(backupDefinition);
- }
-
- return persistentIds;
- } finally {
- cleanup();
- }
+ public void startBackup() {
+ task = new BackupTask(cache);
+ taskFuture = executor.submit(task::backup);
}
- private HashSet<PersistentID> finishDiskStoreBackups(
- Map<DiskStoreImpl, DiskStoreBackup> backupByDiskStores) throws IOException {
- HashSet<PersistentID> persistentIds = new HashSet<>();
- for (Map.Entry<DiskStoreImpl, DiskStoreBackup> entry : backupByDiskStores.entrySet()) {
- DiskStoreImpl diskStore = entry.getKey();
- completeBackup(diskStore, entry.getValue());
- diskStore.getStats().endBackup();
- persistentIds.add(diskStore.getPersistentID());
- }
- return persistentIds;
+ public HashSet<PersistentID> getDiskStoreIdsToBackup() throws InterruptedException {
+ return task.awaitLockAcquisition();
}
- private Map<DiskStoreImpl, DiskStoreBackup> startDiskStoreBackups(BackupInspector inspector,
- File storesDir, Collection<DiskStore> diskStores) throws IOException {
- Map<DiskStoreImpl, DiskStoreBackup> backupByDiskStore = new HashMap<>();
+ public HashSet<PersistentID> doBackup(File targetDir, File baselineDir, boolean abort) {
+ task.notifyOtherMembersReady(targetDir, baselineDir, abort);
- for (DiskStore store : diskStores) {
- DiskStoreImpl diskStore = (DiskStoreImpl) store;
- if (diskStore.hasPersistedData()) {
- File diskStoreDir = new File(storesDir, getBackupDirName(diskStore));
- DiskStoreBackup backup = startDiskStoreBackup(diskStore, diskStoreDir, inspector);
- backupByDiskStore.put(diskStore, backup);
- }
- diskStore.releaseBackupLock();
+ HashSet<PersistentID> result;
+ try {
+ result = taskFuture.get();
+ } catch (InterruptedException | ExecutionException e) {
+ result = new HashSet<>();
}
- return backupByDiskStore;
+ return result;
}
- public void abort() {
- cleanup();
+ public void waitForBackup() {
+ task.waitForBackup();
}
- public boolean isCancelled() {
- return isCancelled;
+ public DiskStoreBackup getBackupForDiskStore(DiskStoreImpl diskStore) {
+ return task.getBackupForDiskStore(diskStore);
}
- public void waitForBackup() {
- try {
- allowDestroys.await();
- } catch (InterruptedException e) {
- throw new InternalGemFireError(e);
+ public void validateRequestingAdmin() {
+ // We need to watch for pure admin guys that depart. this allMembershipListener set
+ // looks like it should receive those events.
+ Set allIds = getDistributionManager().addAllMembershipListenerAndGetAllIds(membershipListener);
+ if (!allIds.contains(sender)) {
+ cleanup();
+ throw new IllegalStateException("The admin member requesting a backup has already departed");
}
}
- private DistributionManager getDistributionManager() {
- return cache.getInternalDistributedSystem().getDistributionManager();
- }
-
private void cleanup() {
- isCancelled = true;
- allowDestroys.countDown();
- if (temporaryFiles != null) {
- temporaryFiles.cleanupFiles();
- }
- releaseBackupLocks();
getDistributionManager().removeAllMembershipListener(membershipListener);
cache.clearBackupManager();
}
-
-
- private void releaseBackupLocks() {
- for (DiskStore store : cache.listDiskStoresIncludingRegionOwned()) {
- ((DiskStoreImpl) store).releaseBackupLock();
- }
- }
-
- /**
- * Returns the memberId directory for this member in the baseline. The memberId may have changed
- * if this member has been restarted since the last backup.
- *
- * @param baselineParentDir parent directory of last backup.
- * @return null if the baseline for this member could not be located.
- */
- private File findBaselineForThisMember(File baselineParentDir) {
- File baselineDir = null;
-
- // Find the first matching DiskStoreId directory for this member.
- for (DiskStore diskStore : cache.listDiskStoresIncludingRegionOwned()) {
- File[] matchingFiles = baselineParentDir
- .listFiles((file, name) -> name.endsWith(getBackupDirName((DiskStoreImpl) diskStore)));
- // We found it? Good. Set this member's baseline to the backed up disk store's member dir (two
- // levels up).
- if (null != matchingFiles && matchingFiles.length > 0)
- baselineDir = matchingFiles[0].getParentFile().getParentFile();
- }
- return baselineDir;
- }
-
- /**
- * Performs a sanity check on the baseline directory for incremental backups. If a baseline
- * directory exists for the member and there is no INCOMPLETE_BACKUP_FILE file then return the
- * data stores directory for this member.
- *
- * @param baselineParentDir a previous backup directory. This is used with the incremental backup
- * option. May be null if the user specified a full backup.
- * @return null if the backup is to be a full backup otherwise return the data store directory in
- * the previous backup for this member (if incremental).
- */
- private File checkBaseline(File baselineParentDir) {
- File baselineDir = null;
-
- if (null != baselineParentDir) {
- // Start by looking for this memberId
- baselineDir = new File(baselineParentDir, memberId);
-
- if (!baselineDir.exists()) {
- // hmmm, did this member have a restart?
- // Determine which member dir might be a match for us
- baselineDir = findBaselineForThisMember(baselineParentDir);
- }
-
- if (null != baselineDir) {
- // check for existence of INCOMPLETE_BACKUP_FILE file
- File incompleteBackup = new File(baselineDir, INCOMPLETE_BACKUP_FILE);
- if (incompleteBackup.exists()) {
- baselineDir = null;
- }
- }
- }
-
- return baselineDir;
- }
-
- private void backupAdditionalFiles(File backupDir) throws IOException {
- fileCopier.copyConfigFiles();
-
- Set<File> userFiles = fileCopier.copyUserFiles();
- File userBackupDir = new File(backupDir, USER_FILES);
- for (File file : userFiles) {
- File restoreScriptDestination = new File(userBackupDir, file.getName());
- restoreScript.addUserFile(file, restoreScriptDestination);
- }
-
- Set<File> jars = fileCopier.copyDeployedJars();
- for (File file : jars) {
- File restoreScriptDestination = new File(userBackupDir, file.getName());
- restoreScript.addFile(file, restoreScriptDestination);
- }
- }
-
- /**
- * Copy the oplogs to the backup directory. This is the final step of the backup process. The
- * oplogs we copy are defined in the startDiskStoreBackup method.
- */
- private void completeBackup(DiskStoreImpl diskStore, DiskStoreBackup backup) throws IOException {
- if (backup == null) {
- return;
- }
- try {
- // Wait for oplogs to be unpreblown before backing them up.
- diskStore.waitForDelayedWrites();
-
- // Backup all of the oplogs
- for (Oplog oplog : backup.getPendingBackup()) {
- if (isCancelled()) {
- break;
- }
- oplog.finishKrf();
- fileCopier.copyOplog(diskStore, oplog);
-
- // Allow the oplog to be deleted, and process any pending delete
- backup.backupFinished(oplog);
- }
- } finally {
- backup.cleanup();
- }
- }
-
- /**
- * Returns the dir name used to back up this DiskStore's directories under. The name is a
- * concatenation of the disk store name and id.
- */
- private String getBackupDirName(DiskStoreImpl diskStore) {
- String name = diskStore.getName();
-
- if (name == null) {
- name = GemFireCacheImpl.getDefaultDiskStoreName();
- }
-
- return (name + "_" + diskStore.getDiskStoreID().toString());
- }
-
- /**
- * Start the backup process. This is the second step of the backup process. In this method, we
- * define the data we're backing up by copying the init file and rolling to the next file. After
- * this method returns operations can proceed as normal, except that we don't remove oplogs.
- */
- private DiskStoreBackup startDiskStoreBackup(DiskStoreImpl diskStore, File targetDir,
- BackupInspector baselineInspector) throws IOException {
- diskStore.getBackupLock().setBackupThread();
- DiskStoreBackup backup = null;
- boolean done = false;
- try {
- for (;;) {
- Oplog childOplog = diskStore.getPersistentOplogSet().getChild();
- if (childOplog == null) {
- backup = new DiskStoreBackup(new Oplog[0], targetDir);
- backupByDiskStore.put(diskStore, backup);
- break;
- }
-
- // Get an appropriate lock object for each set of oplogs.
- Object childLock = childOplog.getLock();
-
- // TODO - We really should move this lock into the disk store, but
- // until then we need to do this magic to make sure we're actually
- // locking the latest child for both types of oplogs
-
- // This ensures that all writing to disk is blocked while we are
- // creating the snapshot
- synchronized (childLock) {
- if (diskStore.getPersistentOplogSet().getChild() != childOplog) {
- continue;
- }
-
- if (logger.isDebugEnabled()) {
- logger.debug("snapshotting oplogs for disk store {}", diskStore.getName());
- }
-
- addDiskStoreDirectoriesToRestoreScript(diskStore, targetDir);
-
- restoreScript.addExistenceTest(diskStore.getDiskInitFile().getIFFile());
-
- // Contains all oplogs that will backed up
-
- // Incremental backup so filter out oplogs that have already been
- // backed up
- Oplog[] allOplogs;
- if (null != baselineInspector) {
- allOplogs = filterBaselineOplogs(diskStore, baselineInspector);
- } else {
- allOplogs = diskStore.getAllOplogsForBackup();
- }
-
- // mark all oplogs as being backed up. This will
- // prevent the oplogs from being deleted
- backup = new DiskStoreBackup(allOplogs, targetDir);
- backupByDiskStore.put(diskStore, backup);
-
- fileCopier.copyDiskInitFile(diskStore);
- diskStore.getPersistentOplogSet().forceRoll(null);
-
- if (logger.isDebugEnabled()) {
- logger.debug("done backing up disk store {}", diskStore.getName());
- }
- break;
- }
- }
- done = true;
- } finally {
- if (!done && backup != null) {
- backupByDiskStore.remove(diskStore);
- backup.cleanup();
- }
- }
- return backup;
- }
-
-
-
- private void addDiskStoreDirectoriesToRestoreScript(DiskStoreImpl diskStore, File targetDir) {
- DirectoryHolder[] directories = diskStore.getDirectoryHolders();
- for (int i = 0; i < directories.length; i++) {
- File backupDir = getBackupDirForCurrentMember(targetDir, i);
- restoreScript.addFile(directories[i].getDir(), backupDir);
- }
- }
-
- /**
- * Filters and returns the current set of oplogs that aren't already in the baseline for
- * incremental backup
- *
- * @param baselineInspector the inspector for the previous backup.
- * @return an array of Oplogs to be copied for an incremental backup.
- */
- private Oplog[] filterBaselineOplogs(DiskStoreImpl diskStore, BackupInspector baselineInspector) {
- File baselineDir =
- new File(baselineInspector.getBackupDir(), BackupManager.DATA_STORES_DIRECTORY);
- baselineDir = new File(baselineDir, getBackupDirName(diskStore));
-
- // Find all of the member's diskstore oplogs in the member's baseline
- // diskstore directory structure (*.crf,*.krf,*.drf)
- Collection<File> baselineOplogFiles =
- FileUtils.listFiles(baselineDir, new String[] {"krf", "drf", "crf"}, true);
- // Our list of oplogs to copy (those not already in the baseline)
- List<Oplog> oplogList = new LinkedList<>();
-
- // Total list of member oplogs
- Oplog[] allOplogs = diskStore.getAllOplogsForBackup();
-
- // Loop through operation logs and see if they are already part of the baseline backup.
- for (Oplog log : allOplogs) {
- // See if they are backed up in the current baseline
- Map<File, File> oplogMap = log.mapBaseline(baselineOplogFiles);
-
- // No? Then see if they were backed up in previous baselines
- if (oplogMap.isEmpty() && baselineInspector.isIncremental()) {
- oplogMap = addBaselineOplogToRestoreScript(baselineInspector, log);
- }
-
- if (oplogMap.isEmpty()) {
- // These are fresh operation log files so lets back them up.
- oplogList.add(log);
- } else {
- /*
- * These have been backed up before so lets just add their entries from the previous backup
- * or restore script into the current one.
- */
- restoreScript.addBaselineFiles(oplogMap);
- }
- }
-
- // Convert the filtered oplog list to an array
- return oplogList.toArray(new Oplog[oplogList.size()]);
- }
-
- private Map<File, File> addBaselineOplogToRestoreScript(BackupInspector baselineInspector,
- Oplog log) {
- Map<File, File> oplogMap = new HashMap<>();
- Set<String> matchingOplogs =
- log.gatherMatchingOplogFiles(baselineInspector.getIncrementalOplogFileNames());
- for (String matchingOplog : matchingOplogs) {
- oplogMap.put(new File(baselineInspector.getCopyFromForOplogFile(matchingOplog)),
- new File(baselineInspector.getCopyToForOplogFile(matchingOplog)));
- }
- return oplogMap;
- }
-
- private File getBackupDirForCurrentMember(File targetDir, int index) {
- return new File(targetDir, BACKUP_DIR_PREFIX + index);
- }
-
- private String getCleanedMemberId() {
- InternalDistributedMember memberId =
- cache.getInternalDistributedSystem().getDistributedMember();
- String vmId = memberId.toString();
- return cleanSpecialCharacters(vmId);
- }
-
-
-
- private String cleanSpecialCharacters(String string) {
- return string.replaceAll("[^\\w]+", "_");
- }
-
- public DiskStoreBackup getBackupForDiskStore(DiskStoreImpl diskStore) {
- return backupByDiskStore.get(diskStore);
+ private DistributionManager getDistributionManager() {
+ return cache.getInternalDistributedSystem().getDistributionManager();
}
private class BackupMembershipListener implements MembershipListener {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupManager.java b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupTask.java
similarity index 85%
copy from geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupManager.java
copy to geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupTask.java
index 8503b2e..f984c4c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupManager.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupTask.java
@@ -31,8 +31,6 @@ import org.apache.logging.log4j.Logger;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.cache.DiskStore;
import org.apache.geode.cache.persistence.PersistentID;
-import org.apache.geode.distributed.internal.DistributionManager;
-import org.apache.geode.distributed.internal.MembershipListener;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.DirectoryHolder;
import org.apache.geode.internal.cache.DiskStoreBackup;
@@ -45,61 +43,79 @@ import org.apache.geode.internal.logging.LogService;
/**
* This class manages the state an logic to backup a single cache.
*/
-public class BackupManager {
+public class BackupTask {
private static final Logger logger = LogService.getLogger();
static final String INCOMPLETE_BACKUP_FILE = "INCOMPLETE_BACKUP_FILE";
private static final String BACKUP_DIR_PREFIX = "dir";
private static final String DATA_STORES_DIRECTORY = "diskstores";
- public static final String DATA_STORES_TEMPORARY_DIRECTORY = "backupTemp_";
private static final String USER_FILES = "user";
- private final MembershipListener membershipListener = new BackupMembershipListener();
private final Map<DiskStoreImpl, DiskStoreBackup> backupByDiskStore = new HashMap<>();
private final RestoreScript restoreScript = new RestoreScript();
- private final InternalDistributedMember sender;
private final InternalCache cache;
private final CountDownLatch allowDestroys = new CountDownLatch(1);
private final String memberId;
+ private final CountDownLatch locksAcquired = new CountDownLatch(1);
+ private final CountDownLatch otherMembersReady = new CountDownLatch(1);
+ private final HashSet<PersistentID> diskStoresWithData = new HashSet<>();
+ private volatile File targetDir;
+ private volatile File baselineDir;
private volatile boolean isCancelled = false;
+
private TemporaryBackupFiles temporaryFiles;
private BackupFileCopier fileCopier;
- public BackupManager(InternalDistributedMember sender, InternalCache gemFireCache) {
- this.sender = sender;
+ BackupTask(InternalCache gemFireCache) {
this.cache = gemFireCache;
memberId = getCleanedMemberId();
}
- public void validateRequestingAdmin() {
- // We need to watch for pure admin guys that depart. this allMembershipListener set
- // looks like it should receive those events.
- Set allIds = getDistributionManager().addAllMembershipListenerAndGetAllIds(membershipListener);
- if (!allIds.contains(sender)) {
+ HashSet<PersistentID> awaitLockAcquisition() throws InterruptedException {
+ locksAcquired.await();
+ return diskStoresWithData;
+ }
+
+ void notifyOtherMembersReady(File targetDir, File baselineDir, boolean abort) {
+ this.targetDir = targetDir;
+ this.baselineDir = baselineDir;
+ this.isCancelled = abort;
+ otherMembersReady.countDown();
+ }
+
+ HashSet<PersistentID> backup() throws InterruptedException, IOException {
+ prepareForBackup();
+ locksAcquired.countDown();
+ try {
+ otherMembersReady.await();
+ } catch (InterruptedException e) {
cleanup();
- throw new IllegalStateException("The admin member requesting a backup has already departed");
+ throw e;
}
+ if (isCancelled) {
+ cleanup();
+ return new HashSet<>();
+ }
+
+ return doBackup(targetDir, baselineDir);
}
- public HashSet<PersistentID> prepareForBackup() {
- HashSet<PersistentID> persistentIds = new HashSet<>();
+ private void prepareForBackup() {
for (DiskStore store : cache.listDiskStoresIncludingRegionOwned()) {
DiskStoreImpl storeImpl = (DiskStoreImpl) store;
storeImpl.lockStoreBeforeBackup();
if (storeImpl.hasPersistedData()) {
- persistentIds.add(storeImpl.getPersistentID());
+ diskStoresWithData.add(storeImpl.getPersistentID());
storeImpl.getStats().startBackup();
}
}
- return persistentIds;
}
- public HashSet<PersistentID> doBackup(File targetDir, File baselineDir, boolean abort)
- throws IOException {
- if (abort) {
+ private HashSet<PersistentID> doBackup(File targetDir, File baselineDir) throws IOException {
+ if (isCancelled) {
cleanup();
return new HashSet<>();
}
@@ -155,25 +171,28 @@ public class BackupManager {
for (DiskStore store : diskStores) {
DiskStoreImpl diskStore = (DiskStoreImpl) store;
- if (diskStore.hasPersistedData()) {
- File diskStoreDir = new File(storesDir, getBackupDirName(diskStore));
- DiskStoreBackup backup = startDiskStoreBackup(diskStore, diskStoreDir, inspector);
- backupByDiskStore.put(diskStore, backup);
+ try {
+ if (diskStore.hasPersistedData()) {
+ File diskStoreDir = new File(storesDir, getBackupDirName(diskStore));
+ DiskStoreBackup backup = startDiskStoreBackup(diskStore, diskStoreDir, inspector);
+ backupByDiskStore.put(diskStore, backup);
+ }
+ } finally {
+ diskStore.releaseBackupLock();
}
- diskStore.releaseBackupLock();
}
return backupByDiskStore;
}
- public void abort() {
+ void abort() {
cleanup();
}
- public boolean isCancelled() {
+ boolean isCancelled() {
return isCancelled;
}
- public void waitForBackup() {
+ void waitForBackup() {
try {
allowDestroys.await();
} catch (InterruptedException e) {
@@ -181,10 +200,6 @@ public class BackupManager {
}
}
- private DistributionManager getDistributionManager() {
- return cache.getInternalDistributedSystem().getDistributionManager();
- }
-
private void cleanup() {
isCancelled = true;
allowDestroys.countDown();
@@ -192,12 +207,9 @@ public class BackupManager {
temporaryFiles.cleanupFiles();
}
releaseBackupLocks();
- getDistributionManager().removeAllMembershipListener(membershipListener);
cache.clearBackupManager();
}
-
-
private void releaseBackupLocks() {
for (DiskStore store : cache.listDiskStoresIncludingRegionOwned()) {
((DiskStoreImpl) store).releaseBackupLock();
@@ -327,7 +339,6 @@ public class BackupManager {
*/
private DiskStoreBackup startDiskStoreBackup(DiskStoreImpl diskStore, File targetDir,
BackupInspector baselineInspector) throws IOException {
- diskStore.getBackupLock().setBackupThread();
DiskStoreBackup backup = null;
boolean done = false;
try {
@@ -396,8 +407,6 @@ public class BackupManager {
return backup;
}
-
-
private void addDiskStoreDirectoriesToRestoreScript(DiskStoreImpl diskStore, File targetDir) {
DirectoryHolder[] directories = diskStore.getDirectoryHolders();
for (int i = 0; i < directories.length; i++) {
@@ -414,8 +423,7 @@ public class BackupManager {
* @return an array of Oplogs to be copied for an incremental backup.
*/
private Oplog[] filterBaselineOplogs(DiskStoreImpl diskStore, BackupInspector baselineInspector) {
- File baselineDir =
- new File(baselineInspector.getBackupDir(), BackupManager.DATA_STORES_DIRECTORY);
+ File baselineDir = new File(baselineInspector.getBackupDir(), DATA_STORES_DIRECTORY);
baselineDir = new File(baselineDir, getBackupDirName(diskStore));
// Find all of the member's diskstore oplogs in the member's baseline
@@ -477,37 +485,11 @@ public class BackupManager {
return cleanSpecialCharacters(vmId);
}
-
-
private String cleanSpecialCharacters(String string) {
return string.replaceAll("[^\\w]+", "_");
}
- public DiskStoreBackup getBackupForDiskStore(DiskStoreImpl diskStore) {
+ DiskStoreBackup getBackupForDiskStore(DiskStoreImpl diskStore) {
return backupByDiskStore.get(diskStore);
}
-
- private class BackupMembershipListener implements MembershipListener {
- @Override
- public void memberDeparted(InternalDistributedMember id, boolean crashed) {
- cleanup();
- }
-
- @Override
- public void memberJoined(InternalDistributedMember id) {
- // unused
- }
-
- @Override
- public void quorumLost(Set<InternalDistributedMember> failures,
- List<InternalDistributedMember> remaining) {
- // unused
- }
-
- @Override
- public void memberSuspect(InternalDistributedMember id, InternalDistributedMember whoSuspected,
- String reason) {
- // unused
- }
- }
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/PrepareBackup.java b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/PrepareBackup.java
index de71173..c6130d7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/PrepareBackup.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/PrepareBackup.java
@@ -31,12 +31,14 @@ class PrepareBackup {
this.cache = cache;
}
- HashSet<PersistentID> run() throws IOException {
+ HashSet<PersistentID> run() throws IOException, InterruptedException {
HashSet<PersistentID> persistentIds;
if (cache == null) {
persistentIds = new HashSet<>();
} else {
- persistentIds = cache.startBackup(member).prepareForBackup();
+ BackupManager backupManager = cache.startBackup(member);
+ backupManager.startBackup();
+ persistentIds = backupManager.getDiskStoreIdsToBackup();
}
return persistentIds;
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/PrepareBackupOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/PrepareBackupOperation.java
index 270a150..b0d92a4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/PrepareBackupOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/PrepareBackupOperation.java
@@ -58,7 +58,7 @@ class PrepareBackupOperation extends BackupOperation {
void processLocally() {
try {
addToResults(member, prepareBackupFactory.createPrepareBackup(member, cache).run());
- } catch (IOException e) {
+ } catch (IOException | InterruptedException e) {
logger.fatal("Failed to PrepareBackup in " + member, e);
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/PrepareBackupRequest.java b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/PrepareBackupRequest.java
index 2bd828b..6a59cfc 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/PrepareBackupRequest.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/PrepareBackupRequest.java
@@ -58,7 +58,7 @@ public class PrepareBackupRequest extends CliLegacyMessage {
try {
persistentIds = prepareBackupFactory
.createPrepareBackup(dm.getDistributionManagerId(), dm.getCache()).run();
- } catch (IOException e) {
+ } catch (IOException | InterruptedException e) {
logger.error(LocalizedMessage.create(LocalizedStrings.CliLegacyMessage_ERROR, getClass()), e);
return AdminFailureResponse.create(getSender(), e);
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/TemporaryBackupFiles.java b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/TemporaryBackupFiles.java
index 85d059e..11099b9 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/backup/TemporaryBackupFiles.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/backup/TemporaryBackupFiles.java
@@ -14,7 +14,6 @@
*/
package org.apache.geode.internal.cache.backup;
-import static org.apache.geode.internal.cache.backup.BackupManager.DATA_STORES_TEMPORARY_DIRECTORY;
import java.io.File;
import java.io.IOException;
@@ -54,7 +53,7 @@ class TemporaryBackupFiles {
*/
static TemporaryBackupFiles create() throws IOException {
long currentTime = System.currentTimeMillis();
- String diskStoreDirectoryName = DATA_STORES_TEMPORARY_DIRECTORY + currentTime;
+ String diskStoreDirectoryName = BackupManager.DATA_STORES_TEMPORARY_DIRECTORY + currentTime;
Path temporaryDirectory = Files.createTempDirectory("backup_" + currentTime);
return new TemporaryBackupFiles(temporaryDirectory, diskStoreDirectoryName);
}
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/MemberMBeanBridge.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/MemberMBeanBridge.java
index a29bc1a..94bbabc 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/MemberMBeanBridge.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/MemberMBeanBridge.java
@@ -76,7 +76,6 @@ import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.PartitionedRegionStats;
import org.apache.geode.internal.cache.backup.BackupManager;
import org.apache.geode.internal.cache.control.ResourceManagerStats;
-import org.apache.geode.internal.cache.eviction.EvictionCounters;
import org.apache.geode.internal.cache.execute.FunctionServiceStats;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
@@ -1017,7 +1016,8 @@ public class MemberMBeanBridge {
Set<PersistentID> existingDataStores;
Set<PersistentID> successfulDataStores;
try {
- existingDataStores = manager.prepareForBackup();
+ manager.startBackup();
+ existingDataStores = manager.getDiskStoreIdsToBackup();
abort = false;
} finally {
successfulDataStores = manager.doBackup(targetDir, null/* TODO rishi */, abort);
@@ -1034,7 +1034,7 @@ public class MemberMBeanBridge {
j++;
}
- } catch (IOException e) {
+ } catch (IOException | InterruptedException e) {
throw new ManagementException(e);
}
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupIntegrationTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupIntegrationTest.java
index 78ab872..3d767f2 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupIntegrationTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupIntegrationTest.java
@@ -188,7 +188,8 @@ public class BackupIntegrationTest {
BackupManager backup =
cache.startBackup(cache.getInternalDistributedSystem().getDistributedMember());
- backup.prepareForBackup();
+ backup.startBackup();
+ backup.getDiskStoreIdsToBackup();
backup.doBackup(backupDir, null, false);
// Put another key to make sure we restore
@@ -237,7 +238,8 @@ public class BackupIntegrationTest {
BackupManager backup =
cache.startBackup(cache.getInternalDistributedSystem().getDistributedMember());
- backup.prepareForBackup();
+ backup.startBackup();
+ backup.getDiskStoreIdsToBackup();
backup.doBackup(backupDir, null, false);
assertEquals("No backup files should have been created", Collections.emptyList(),
Arrays.asList(backupDir.list()));
@@ -253,7 +255,8 @@ public class BackupIntegrationTest {
BackupManager backup =
cache.startBackup(cache.getInternalDistributedSystem().getDistributedMember());
- backup.prepareForBackup();
+ backup.startBackup();
+ backup.getDiskStoreIdsToBackup();
backup.doBackup(backupDir, null, false);
@@ -282,7 +285,8 @@ public class BackupIntegrationTest {
BackupManager backupManager =
cache.startBackup(cache.getInternalDistributedSystem().getDistributedMember());
backupManager.validateRequestingAdmin();
- backupManager.prepareForBackup();
+ backupManager.startBackup();
+ backupManager.getDiskStoreIdsToBackup();
final Region theRegion = region;
final DiskStore theDiskStore = ds;
CompletableFuture.runAsync(() -> destroyAndCompact(theRegion, theDiskStore));
@@ -316,7 +320,8 @@ public class BackupIntegrationTest {
BackupManager backup =
cache.startBackup(cache.getInternalDistributedSystem().getDistributedMember());
- backup.prepareForBackup();
+ backup.startBackup();
+ backup.getDiskStoreIdsToBackup();
backup.doBackup(backupDir, null, false);
Collection<File> fileCollection = FileUtils.listFiles(backupDir,
new RegexFileFilter("BackupIntegrationTest.cache.xml"), DirectoryFileFilter.DIRECTORY);
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupLockTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupLockTest.java
deleted file mode 100644
index 9f1c82b..0000000
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupLockTest.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.internal.cache.backup;
-
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.awaitility.Awaitility.await;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.geode.test.junit.categories.UnitTest;
-
-@Category(UnitTest.class)
-public class BackupLockTest {
-
- private BackupLock backupLock;
- private ExecutorService executor;
-
- @Before
- public void setUp() throws Exception {
- backupLock = new BackupLock();
- executor = Executors.newSingleThreadExecutor();
- }
-
- @Test
- public void lockShouldBlockUntilLockForBackup() throws Exception {
- backupLock.lockForBackup();
- backupLock.setBackupThread();
-
- AtomicBoolean beforeLock = new AtomicBoolean();
- AtomicBoolean afterLock = new AtomicBoolean();
-
- backupLock.setBackupLockTestHook(() -> beforeLock.set(true));
-
- executor.submit(() -> {
- backupLock.lock(); // beforeLock is set inside lock() method
- afterLock.set(true);
- });
-
- await().atMost(10, SECONDS).until(() -> assertThat(beforeLock).isTrue());
- assertThat(afterLock).isFalse();
-
- backupLock.unlockForBackup();
- await().atMost(10, SECONDS).until(() -> assertThat(afterLock).isTrue());
- }
-
- @Test
- public void otherThreadShouldBeAbleToUnlockForBackup() throws Exception {
- backupLock.lockForBackup();
- backupLock.setBackupThread();
-
- await().atMost(10, SECONDS).until(() -> assertThat(backupLock.isBackingUp()).isTrue());
- assertThat(backupLock.isCurrentThreadDoingBackup()).isTrue();
-
- executor.submit(() -> {
- assertThat(backupLock.isCurrentThreadDoingBackup()).isFalse();
- backupLock.unlockForBackup();
- });
-
- await().atMost(10, SECONDS).until(() -> assertThat(backupLock.isBackingUp()).isFalse());
- }
-
- @Test
- public void isCurrentThreadDoingBackupShouldBeSetAndUnset() throws Exception {
- backupLock.lockForBackup();
- backupLock.setBackupThread();
-
- assertThat(backupLock.isCurrentThreadDoingBackup()).isTrue();
-
- backupLock.unlockForBackup();
-
- assertThat(backupLock.isCurrentThreadDoingBackup()).isFalse();
- }
-
- @Test
- public void threadLocalShouldNotLeak() throws Exception {
- backupLock.lockForBackup();
- backupLock.setBackupThread();
-
- assertThat(backupLock.hasThreadLocal()).isTrue();
-
- backupLock.unlockForBackup();
-
- assertThat(backupLock.hasThreadLocal()).isFalse();
- }
-
-}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupPrepareAndFinishMsgDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupPrepareAndFinishMsgDUnitTest.java
index 7bca61d..56b2e51 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupPrepareAndFinishMsgDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupPrepareAndFinishMsgDUnitTest.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.awaitility.Awaitility;
@@ -53,6 +54,8 @@ import org.apache.geode.cache30.CacheTestCase;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.internal.cache.DiskStoreImpl;
import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.test.junit.categories.DistributedTest;
@Category({DistributedTest.class})
@@ -62,7 +65,6 @@ public abstract class BackupPrepareAndFinishMsgDUnitTest extends CacheTestCase {
// a distributed member (rather than local) because it sends prepare and finish backup messages
private static final String TEST_REGION_NAME = "TestRegion";
private File[] diskDirs = null;
- private int waitingForBackupLockCount = 0;
private Region<Integer, Integer> region;
protected abstract Region<Integer, Integer> createRegion();
@@ -139,10 +141,10 @@ public abstract class BackupPrepareAndFinishMsgDUnitTest extends CacheTestCase {
Future<Void> future = null;
new PrepareBackupOperation(dm, dm.getId(), dm.getCache(), recipients,
new PrepareBackupFactory()).send();
- waitingForBackupLockCount = 0;
+ ReentrantLock backupLock = ((LocalRegion) region).getDiskStore().getBackupLock();
future = CompletableFuture.runAsync(function);
Awaitility.await().atMost(5, TimeUnit.SECONDS)
- .until(() -> assertTrue(waitingForBackupLockCount == 1));
+ .until(() -> assertTrue(backupLock.getQueueLength() > 0));
new FinishBackupOperation(dm, dm.getId(), dm.getCache(), recipients, diskDirs[0], null, false,
new FinishBackupFactory()).send();
future.get(5, TimeUnit.SECONDS);
@@ -153,10 +155,10 @@ public abstract class BackupPrepareAndFinishMsgDUnitTest extends CacheTestCase {
Set recipients = dm.getOtherDistributionManagerIds();
new PrepareBackupOperation(dm, dm.getId(), dm.getCache(), recipients,
new PrepareBackupFactory()).send();
- waitingForBackupLockCount = 0;
+ ReentrantLock backupLock = ((LocalRegion) region).getDiskStore().getBackupLock();
List<CompletableFuture<?>> futureList = doReadActions();
CompletableFuture.allOf(futureList.toArray(new CompletableFuture<?>[futureList.size()]));
- assertTrue(waitingForBackupLockCount == 0);
+ assertTrue(backupLock.getQueueLength() == 0);
new FinishBackupOperation(dm, dm.getId(), dm.getCache(), recipients, diskDirs[0], null, false,
new FinishBackupFactory()).send();
}
@@ -203,16 +205,6 @@ public abstract class BackupPrepareAndFinishMsgDUnitTest extends CacheTestCase {
}
/**
- * Implementation of test hook
- */
- private class BackupLockHook implements BackupLock.BackupLockTestHook {
- @Override
- public void beforeWaitForBackupCompletion() {
- waitingForBackupLockCount++;
- }
- }
-
- /**
* Create a region, installing the test hook in the backup lock
*
* @param shortcut The region shortcut to use to create the region
@@ -224,7 +216,6 @@ public abstract class BackupPrepareAndFinishMsgDUnitTest extends CacheTestCase {
diskDirs = getDiskDirs();
diskStoreFactory.setDiskDirs(diskDirs);
DiskStore diskStore = diskStoreFactory.create(getUniqueName());
- ((DiskStoreImpl) diskStore).getBackupLock().setBackupLockTestHook(new BackupLockHook());
RegionFactory<Integer, Integer> regionFactory = cache.createRegionFactory(shortcut);
regionFactory.setDiskStoreName(diskStore.getName());
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/IncrementalBackupDistributedTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/IncrementalBackupDistributedTest.java
index 482b91c..960b9d7 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/backup/IncrementalBackupDistributedTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/backup/IncrementalBackupDistributedTest.java
@@ -581,7 +581,7 @@ public class IncrementalBackupDistributedTest extends JUnit4CacheTestCase {
File backupDir = getBackupDirForMember(getBaselineDir(), getMemberId(vm));
assertTrue(backupDir.exists());
- File incomplete = new File(backupDir, BackupManager.INCOMPLETE_BACKUP_FILE);
+ File incomplete = new File(backupDir, BackupTask.INCOMPLETE_BACKUP_FILE);
incomplete.createNewFile();
}
diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/beans/DistributedSystemBridgeJUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/beans/DistributedSystemBridgeJUnitTest.java
index eec3a88..0ae746e 100644
--- a/geode-core/src/test/java/org/apache/geode/management/internal/beans/DistributedSystemBridgeJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/internal/beans/DistributedSystemBridgeJUnitTest.java
@@ -72,7 +72,8 @@ public class DistributedSystemBridgeJUnitTest {
InOrder inOrder = inOrder(dm, backupManager);
inOrder.verify(dm).putOutgoing(isA(PrepareBackupRequest.class));
- inOrder.verify(backupManager).prepareForBackup();
+ inOrder.verify(backupManager).startBackup();
+ inOrder.verify(backupManager).getDiskStoreIdsToBackup();
inOrder.verify(dm).putOutgoing(isA(FinishBackupRequest.class));
inOrder.verify(backupManager).doBackup(any(), any(), eq(false));
}
diff --git a/geode-core/src/test/resources/org/apache/geode/codeAnalysis/excludedClasses.txt b/geode-core/src/test/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
index abf32d6..da96ce7 100644
--- a/geode-core/src/test/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
+++ b/geode-core/src/test/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
@@ -16,7 +16,6 @@ org/apache/geode/internal/ExitCode
org/apache/geode/internal/JarDeployer
org/apache/geode/internal/ObjIdConcurrentMap
org/apache/geode/internal/ObjIdConcurrentMap$Segment
-org/apache/geode/internal/cache/backup/BackupLock
org/apache/geode/internal/cache/DiskStoreMonitor$DiskState
org/apache/geode/internal/cache/InitialImageOperation$GIITestHook
org/apache/geode/internal/cache/Oplog$OPLOG_TYPE
--
To stop receiving notification emails like this one, please contact
nreich@apache.org.