You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/08/19 00:10:35 UTC

[40/51] [abbrv] geode git commit: GEODE-3169: Decoupling of DiskStore and backups This closes #715 * move backup logic away from DiskStore and into BackupManager * refactor code into smaller methods * improve test code clarity

GEODE-3169: Decoupling of DiskStore and backups
This closes #715
  * move backup logic away from DiskStore and into BackupManager
  * refactor code into smaller methods
  * improve test code clarity


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/3bb6a221
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/3bb6a221
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/3bb6a221

Branch: refs/heads/feature/GEODE-1279
Commit: 3bb6a2214d02fcb339ecba0d0645457d3926ab12
Parents: f38dff9
Author: Nick Reich <nr...@pivotal.io>
Authored: Tue Aug 8 11:30:17 2017 -0700
Committer: Anil <ag...@pivotal.io>
Committed: Fri Aug 18 09:52:24 2017 -0700

----------------------------------------------------------------------
 .../admin/internal/FinishBackupRequest.java     |   2 +-
 .../admin/internal/PrepareBackupRequest.java    |   4 +-
 .../geode/internal/cache/BackupManager.java     | 603 +++++++++++++++++++
 .../geode/internal/cache/DiskStoreBackup.java   |   9 +-
 .../internal/cache/DiskStoreFactoryImpl.java    |   1 -
 .../geode/internal/cache/DiskStoreImpl.java     | 224 +------
 .../geode/internal/cache/GemFireCacheImpl.java  |   5 +-
 .../geode/internal/cache/InternalCache.java     |   1 -
 .../org/apache/geode/internal/cache/Oplog.java  |   1 +
 .../cache/PartitionedRegionDataStore.java       |   1 -
 .../cache/persistence/BackupManager.java        | 389 ------------
 .../internal/cache/xmlcache/CacheCreation.java  |   2 +-
 .../internal/beans/MemberMBeanBridge.java       |   6 +-
 .../geode/internal/cache/BackupDUnitTest.java   | 176 +++---
 .../geode/internal/cache/BackupJUnitTest.java   | 145 +++--
 .../cache/IncrementalBackupDUnitTest.java       |   3 +-
 .../BackupPrepareAndFinishMsgDUnitTest.java     | 548 ++++-------------
 ...ionedBackupPrepareAndFinishMsgDUnitTest.java |  28 +
 ...icateBackupPrepareAndFinishMsgDUnitTest.java |  28 +
 .../beans/DistributedSystemBridgeJUnitTest.java |   8 +-
 20 files changed, 935 insertions(+), 1249 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/3bb6a221/geode-core/src/main/java/org/apache/geode/admin/internal/FinishBackupRequest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/admin/internal/FinishBackupRequest.java b/geode-core/src/main/java/org/apache/geode/admin/internal/FinishBackupRequest.java
index f01666d..88f67bd 100644
--- a/geode-core/src/main/java/org/apache/geode/admin/internal/FinishBackupRequest.java
+++ b/geode-core/src/main/java/org/apache/geode/admin/internal/FinishBackupRequest.java
@@ -99,7 +99,7 @@ public class FinishBackupRequest extends CliLegacyMessage {
       persistentIds = new HashSet<PersistentID>();
     } else {
       try {
-        persistentIds = cache.getBackupManager().finishBackup(targetDir, baselineDir, abort);
+        persistentIds = cache.getBackupManager().doBackup(targetDir, baselineDir, abort);
       } catch (IOException e) {
         logger.error(
             LocalizedMessage.create(LocalizedStrings.CliLegacyMessage_ERROR, this.getClass()), e);

http://git-wip-us.apache.org/repos/asf/geode/blob/3bb6a221/geode-core/src/main/java/org/apache/geode/admin/internal/PrepareBackupRequest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/admin/internal/PrepareBackupRequest.java b/geode-core/src/main/java/org/apache/geode/admin/internal/PrepareBackupRequest.java
index 0c096f9..ede70c1 100644
--- a/geode-core/src/main/java/org/apache/geode/admin/internal/PrepareBackupRequest.java
+++ b/geode-core/src/main/java/org/apache/geode/admin/internal/PrepareBackupRequest.java
@@ -37,7 +37,7 @@ import org.apache.geode.internal.admin.remote.AdminResponse;
 import org.apache.geode.internal.admin.remote.CliLegacyMessage;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.cache.persistence.BackupManager;
+import org.apache.geode.internal.cache.BackupManager;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.log4j.LocalizedMessage;
@@ -87,7 +87,7 @@ public class PrepareBackupRequest extends CliLegacyMessage {
     } else {
       try {
         BackupManager manager = cache.startBackup(getSender());
-        persistentIds = manager.prepareBackup();
+        persistentIds = manager.prepareForBackup();
       } catch (IOException e) {
         logger.error(
             LocalizedMessage.create(LocalizedStrings.CliLegacyMessage_ERROR, this.getClass()), e);

http://git-wip-us.apache.org/repos/asf/geode/blob/3bb6a221/geode-core/src/main/java/org/apache/geode/internal/cache/BackupManager.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BackupManager.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BackupManager.java
new file mode 100644
index 0000000..b7e0e47
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BackupManager.java
@@ -0,0 +1,603 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.InternalGemFireError;
+import org.apache.geode.cache.DiskStore;
+import org.apache.geode.cache.persistence.PersistentID;
+import org.apache.geode.distributed.DistributedSystem;
+import org.apache.geode.distributed.internal.DM;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.MembershipListener;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.ClassPathLoader;
+import org.apache.geode.internal.DeployedJar;
+import org.apache.geode.internal.JarDeployer;
+import org.apache.geode.internal.cache.persistence.BackupInspector;
+import org.apache.geode.internal.cache.persistence.RestoreScript;
+import org.apache.geode.internal.i18n.LocalizedStrings;
+import org.apache.geode.internal.logging.LogService;
+
+/**
+ * This class manages the state an logic to backup a single cache.
+ */
+public class BackupManager implements MembershipListener {
+  private static final Logger logger = LogService.getLogger(BackupManager.class);
+
+  static final String INCOMPLETE_BACKUP_FILE = "INCOMPLETE_BACKUP_FILE";
+
+  private static final String BACKUP_DIR_PREFIX = "dir";
+  private static final String README_FILE = "README_FILE.txt";
+  private static final String DATA_STORES_DIRECTORY = "diskstores";
+  private static final String USER_FILES = "user";
+  private static final String CONFIG_DIRECTORY = "config";
+
+  private final Map<DiskStoreImpl, DiskStoreBackup> backupByDiskStore = new HashMap<>();
+  private final RestoreScript restoreScript = new RestoreScript();
+  private final InternalDistributedMember sender;
+  private final InternalCache cache;
+  private final CountDownLatch allowDestroys = new CountDownLatch(1);
+  private volatile boolean isCancelled = false;
+
+  public BackupManager(InternalDistributedMember sender, InternalCache gemFireCache) {
+    this.sender = sender;
+    this.cache = gemFireCache;
+  }
+
+  public void validateRequestingAdmin() {
+    // We need to watch for pure admin guys that depart. this allMembershipListener set
+    // looks like it should receive those events.
+    Set allIds = getDistributionManager().addAllMembershipListenerAndGetAllIds(this);
+    if (!allIds.contains(sender)) {
+      cleanup();
+      throw new IllegalStateException("The admin member requesting a backup has already departed");
+    }
+  }
+
+  public HashSet<PersistentID> prepareForBackup() {
+    HashSet<PersistentID> persistentIds = new HashSet<>();
+    for (DiskStore store : cache.listDiskStoresIncludingRegionOwned()) {
+      DiskStoreImpl storeImpl = (DiskStoreImpl) store;
+      storeImpl.lockStoreBeforeBackup();
+      if (storeImpl.hasPersistedData()) {
+        persistentIds.add(storeImpl.getPersistentID());
+        storeImpl.getStats().startBackup();
+      }
+    }
+    return persistentIds;
+  }
+
+  public HashSet<PersistentID> doBackup(File targetDir, File baselineDir, boolean abort)
+      throws IOException {
+    try {
+      if (abort) {
+        return new HashSet<>();
+      }
+      HashSet<PersistentID> persistentIds = new HashSet<>();
+      File backupDir = getBackupDir(targetDir);
+
+      // Make sure our baseline is okay for this member
+      baselineDir = checkBaseline(baselineDir);
+
+      // Create an inspector for the baseline backup
+      BackupInspector inspector =
+          (baselineDir == null ? null : BackupInspector.createInspector(baselineDir));
+
+      File storesDir = new File(backupDir, DATA_STORES_DIRECTORY);
+      Collection<DiskStore> diskStores = cache.listDiskStoresIncludingRegionOwned();
+      Map<DiskStoreImpl, DiskStoreBackup> backupByDiskStore = new HashMap<>();
+
+      boolean foundPersistentData = false;
+      for (DiskStore store : diskStores) {
+        DiskStoreImpl diskStore = (DiskStoreImpl) store;
+        if (diskStore.hasPersistedData()) {
+          if (!foundPersistentData) {
+            createBackupDir(backupDir);
+            foundPersistentData = true;
+          }
+          File diskStoreDir = new File(storesDir, getBackupDirName(diskStore));
+          diskStoreDir.mkdir();
+          DiskStoreBackup backup = startDiskStoreBackup(diskStore, diskStoreDir, inspector);
+          backupByDiskStore.put(diskStore, backup);
+        }
+        diskStore.releaseBackupLock();
+      }
+
+      allowDestroys.countDown();
+
+      for (Map.Entry<DiskStoreImpl, DiskStoreBackup> entry : backupByDiskStore.entrySet()) {
+        DiskStoreImpl diskStore = entry.getKey();
+        completeBackup(diskStore, entry.getValue());
+        diskStore.getStats().endBackup();
+        persistentIds.add(diskStore.getPersistentID());
+      }
+
+      if (!backupByDiskStore.isEmpty()) {
+        completeRestoreScript(backupDir);
+      }
+
+      return persistentIds;
+
+    } finally {
+      cleanup();
+    }
+  }
+
+  public void abort() {
+    cleanup();
+  }
+
+  private DM getDistributionManager() {
+    return cache.getInternalDistributedSystem().getDistributionManager();
+  }
+
+  private void cleanup() {
+    isCancelled = true;
+    allowDestroys.countDown();
+    releaseBackupLocks();
+    getDistributionManager().removeAllMembershipListener(this);
+    cache.clearBackupManager();
+  }
+
+  private void releaseBackupLocks() {
+    for (DiskStore store : cache.listDiskStoresIncludingRegionOwned()) {
+      ((DiskStoreImpl) store).releaseBackupLock();
+    }
+  }
+
+  /**
+   * Returns the memberId directory for this member in the baseline. The memberId may have changed
+   * if this member has been restarted since the last backup.
+   * 
+   * @param baselineParentDir parent directory of last backup.
+   * @return null if the baseline for this member could not be located.
+   */
+  private File findBaselineForThisMember(File baselineParentDir) {
+    File baselineDir = null;
+
+    /*
+     * Find the first matching DiskStoreId directory for this member.
+     */
+    for (DiskStore diskStore : cache.listDiskStoresIncludingRegionOwned()) {
+      File[] matchingFiles = baselineParentDir
+          .listFiles((file, name) -> name.endsWith(getBackupDirName((DiskStoreImpl) diskStore)));
+      // We found it? Good. Set this member's baseline to the backed up disk store's member dir (two
+      // levels up).
+      if (null != matchingFiles && matchingFiles.length > 0)
+        baselineDir = matchingFiles[0].getParentFile().getParentFile();
+    }
+    return baselineDir;
+  }
+
+  /**
+   * Performs a sanity check on the baseline directory for incremental backups. If a baseline
+   * directory exists for the member and there is no INCOMPLETE_BACKUP_FILE file then return the
+   * data stores directory for this member.
+   * 
+   * @param baselineParentDir a previous backup directory. This is used with the incremental backup
+   *        option. May be null if the user specified a full backup.
+   * @return null if the backup is to be a full backup otherwise return the data store directory in
+   *         the previous backup for this member (if incremental).
+   */
+  private File checkBaseline(File baselineParentDir) throws IOException {
+    File baselineDir = null;
+
+    if (null != baselineParentDir) {
+      // Start by looking for this memberId
+      baselineDir = getBackupDir(baselineParentDir);
+
+      if (!baselineDir.exists()) {
+        // hmmm, did this member have a restart?
+        // Determine which member dir might be a match for us
+        baselineDir = findBaselineForThisMember(baselineParentDir);
+      }
+
+      if (null != baselineDir) {
+        // check for existence of INCOMPLETE_BACKUP_FILE file
+        File incompleteBackup = new File(baselineDir, INCOMPLETE_BACKUP_FILE);
+        if (incompleteBackup.exists()) {
+          baselineDir = null;
+        }
+      }
+    }
+
+    return baselineDir;
+  }
+
+  private void completeRestoreScript(File backupDir) throws IOException {
+    backupConfigFiles(restoreScript, backupDir);
+    backupUserFiles(restoreScript, backupDir);
+    backupDeployedJars(restoreScript, backupDir);
+    restoreScript.generate(backupDir);
+    File incompleteFile = new File(backupDir, INCOMPLETE_BACKUP_FILE);
+    if (!incompleteFile.delete()) {
+      throw new IOException("Could not delete file " + INCOMPLETE_BACKUP_FILE);
+    }
+  }
+
+  /**
+   * Copy the oplogs to the backup directory. This is the final step of the backup process. The
+   * oplogs we copy are defined in the startDiskStoreBackup method.
+   */
+  private void completeBackup(DiskStoreImpl diskStore, DiskStoreBackup backup) throws IOException {
+    if (backup == null) {
+      return;
+    }
+    try {
+      // Wait for oplogs to be unpreblown before backing them up.
+      diskStore.waitForDelayedWrites();
+
+      // Backup all of the oplogs
+      for (Oplog oplog : backup.getPendingBackup()) {
+        if (isCancelled()) {
+          break;
+        }
+        // Copy theoplog to the destination directory
+        int index = oplog.getDirectoryHolder().getArrayIndex();
+        File backupDir = getBackupDir(backup.getTargetDir(), index);
+        // TODO prpersist - We could probably optimize this to *move* the files
+        // that we know are supposed to be deleted.
+        oplog.copyTo(backupDir);
+
+        // Allow the oplog to be deleted, and process any pending delete
+        backup.backupFinished(oplog);
+      }
+    } finally {
+      backup.cleanup();
+    }
+  }
+
+  /**
+   * Returns the dir name used to back up this DiskStore's directories under. The name is a
+   * concatenation of the disk store name and id.
+   */
+  private String getBackupDirName(DiskStoreImpl diskStore) {
+    String name = diskStore.getName();
+
+    if (name == null) {
+      name = GemFireCacheImpl.getDefaultDiskStoreName();
+    }
+
+    return (name + "_" + diskStore.getDiskStoreID().toString());
+  }
+
+  /**
+   * Start the backup process. This is the second step of the backup process. In this method, we
+   * define the data we're backing up by copying the init file and rolling to the next file. After
+   * this method returns operations can proceed as normal, except that we don't remove oplogs.
+   */
+  private DiskStoreBackup startDiskStoreBackup(DiskStoreImpl diskStore, File targetDir,
+      BackupInspector baselineInspector) throws IOException {
+    diskStore.getBackupLock().setBackupThread();
+    DiskStoreBackup backup = null;
+    boolean done = false;
+    try {
+      for (;;) {
+        Oplog childOplog = diskStore.getPersistentOplogSet().getChild();
+        if (childOplog == null) {
+          backup = new DiskStoreBackup(new Oplog[0], targetDir);
+          backupByDiskStore.put(diskStore, backup);
+          break;
+        }
+
+        // Get an appropriate lock object for each set of oplogs.
+        Object childLock = childOplog.lock;
+
+        // TODO - We really should move this lock into the disk store, but
+        // until then we need to do this magic to make sure we're actually
+        // locking the latest child for both types of oplogs
+
+        // This ensures that all writing to disk is blocked while we are
+        // creating the snapshot
+        synchronized (childLock) {
+          if (diskStore.getPersistentOplogSet().getChild() != childOplog) {
+            continue;
+          }
+
+          if (logger.isDebugEnabled()) {
+            logger.debug("snapshotting oplogs for disk store {}", diskStore.getName());
+          }
+
+          createDiskStoreBackupDirs(diskStore, targetDir);
+
+          restoreScript.addExistenceTest(diskStore.getDiskInitFile().getIFFile());
+
+          // Contains all oplogs that will backed up
+          Oplog[] allOplogs = null;
+
+          // Incremental backup so filter out oplogs that have already been
+          // backed up
+          if (null != baselineInspector) {
+            Map<File, File> baselineCopyMap = new HashMap<>();
+            allOplogs = filterBaselineOplogs(diskStore, baselineInspector, baselineCopyMap);
+            restoreScript.addBaselineFiles(baselineCopyMap);
+          } else {
+            allOplogs = diskStore.getAllOplogsForBackup();
+          }
+
+          // mark all oplogs as being backed up. This will
+          // prevent the oplogs from being deleted
+          backup = new DiskStoreBackup(allOplogs, targetDir);
+          backupByDiskStore.put(diskStore, backup);
+
+          // copy the init file
+          File firstDir = getBackupDir(targetDir, diskStore.getInforFileDirIndex());
+          diskStore.getDiskInitFile().copyTo(firstDir);
+          diskStore.getPersistentOplogSet().forceRoll(null);
+
+          if (logger.isDebugEnabled()) {
+            logger.debug("done snaphotting for disk store {}", diskStore.getName());
+          }
+          break;
+        }
+      }
+      done = true;
+    } finally {
+      if (!done) {
+        if (backup != null) {
+          backupByDiskStore.remove(diskStore);
+          backup.cleanup();
+        }
+      }
+    }
+    return backup;
+  }
+
+  private void createDiskStoreBackupDirs(DiskStoreImpl diskStore, File targetDir)
+      throws IOException {
+    // Create the directories for this disk store
+    DirectoryHolder[] directories = diskStore.getDirectoryHolders();
+    for (int i = 0; i < directories.length; i++) {
+      File dir = getBackupDir(targetDir, i);
+      if (!dir.mkdirs()) {
+        throw new IOException("Could not create directory " + dir);
+      }
+      restoreScript.addFile(directories[i].getDir(), dir);
+    }
+  }
+
+  /**
+   * Filters and returns the current set of oplogs that aren't already in the baseline for
+   * incremental backup
+   *
+   * @param baselineInspector the inspector for the previous backup.
+   * @param baselineCopyMap this will be populated with baseline oplogs Files that will be used in
+   *        the restore script.
+   * @return an array of Oplogs to be copied for an incremental backup.
+   */
+  private Oplog[] filterBaselineOplogs(DiskStoreImpl diskStore, BackupInspector baselineInspector,
+      Map<File, File> baselineCopyMap) throws IOException {
+    File baselineDir =
+        new File(baselineInspector.getBackupDir(), BackupManager.DATA_STORES_DIRECTORY);
+    baselineDir = new File(baselineDir, getBackupDirName(diskStore));
+
+    // Find all of the member's diskstore oplogs in the member's baseline
+    // diskstore directory structure (*.crf,*.krf,*.drf)
+    Collection<File> baselineOplogFiles =
+        FileUtils.listFiles(baselineDir, new String[] {"krf", "drf", "crf"}, true);
+    // Our list of oplogs to copy (those not already in the baseline)
+    List<Oplog> oplogList = new LinkedList<>();
+
+    // Total list of member oplogs
+    Oplog[] allOplogs = diskStore.getAllOplogsForBackup();
+
+    /*
+     * Loop through operation logs and see if they are already part of the baseline backup.
+     */
+    for (Oplog log : allOplogs) {
+      // See if they are backed up in the current baseline
+      Map<File, File> oplogMap = log.mapBaseline(baselineOplogFiles);
+
+      // No? Then see if they were backed up in previous baselines
+      if (oplogMap.isEmpty() && baselineInspector.isIncremental()) {
+        Set<String> matchingOplogs =
+            log.gatherMatchingOplogFiles(baselineInspector.getIncrementalOplogFileNames());
+        if (!matchingOplogs.isEmpty()) {
+          for (String matchingOplog : matchingOplogs) {
+            oplogMap.put(new File(baselineInspector.getCopyFromForOplogFile(matchingOplog)),
+                new File(baselineInspector.getCopyToForOplogFile(matchingOplog)));
+          }
+        }
+      }
+
+      if (oplogMap.isEmpty()) {
+        /*
+         * These are fresh operation log files so lets back them up.
+         */
+        oplogList.add(log);
+      } else {
+        /*
+         * These have been backed up before so lets just add their entries from the previous backup
+         * or restore script into the current one.
+         */
+        baselineCopyMap.putAll(oplogMap);
+      }
+    }
+
+    // Convert the filtered oplog list to an array
+    return oplogList.toArray(new Oplog[oplogList.size()]);
+  }
+
+  private File getBackupDir(File targetDir, int index) {
+    return new File(targetDir, BACKUP_DIR_PREFIX + index);
+  }
+
+  private void backupConfigFiles(RestoreScript restoreScript, File backupDir) throws IOException {
+    File configBackupDir = new File(backupDir, CONFIG_DIRECTORY);
+    configBackupDir.mkdirs();
+    URL url = cache.getCacheXmlURL();
+    if (url != null) {
+      File cacheXMLBackup =
+          new File(configBackupDir, DistributionConfig.DEFAULT_CACHE_XML_FILE.getName());
+      FileUtils.copyFile(new File(cache.getCacheXmlURL().getFile()), cacheXMLBackup);
+    }
+
+    URL propertyURL = DistributedSystem.getPropertiesFileURL();
+    if (propertyURL != null) {
+      File propertyBackup =
+          new File(configBackupDir, DistributionConfig.GEMFIRE_PREFIX + "properties");
+      FileUtils.copyFile(new File(DistributedSystem.getPropertiesFile()), propertyBackup);
+    }
+
+    // TODO: should the gfsecurity.properties file be backed up?
+  }
+
+  private void backupUserFiles(RestoreScript restoreScript, File backupDir) throws IOException {
+    List<File> backupFiles = cache.getBackupFiles();
+    File userBackupDir = new File(backupDir, USER_FILES);
+    if (!userBackupDir.exists()) {
+      userBackupDir.mkdir();
+    }
+    for (File original : backupFiles) {
+      if (original.exists()) {
+        original = original.getAbsoluteFile();
+        File dest = new File(userBackupDir, original.getName());
+        if (original.isDirectory()) {
+          FileUtils.copyDirectory(original, dest);
+        } else {
+          FileUtils.copyFile(original, dest);
+        }
+        restoreScript.addExistenceTest(original);
+        restoreScript.addFile(original, dest);
+      }
+    }
+  }
+
+  /**
+   * Copies user deployed jars to the backup directory.
+   * 
+   * @param restoreScript Used to restore from this backup.
+   * @param backupDir The backup directory for this member.
+   * @throws IOException one or more of the jars did not successfully copy.
+   */
+  private void backupDeployedJars(RestoreScript restoreScript, File backupDir) throws IOException {
+    JarDeployer deployer = null;
+
+    try {
+      /*
+       * Suspend any user deployed jar file updates during this backup.
+       */
+      deployer = ClassPathLoader.getLatest().getJarDeployer();
+      deployer.suspendAll();
+
+      List<DeployedJar> jarList = deployer.findDeployedJars();
+      if (!jarList.isEmpty()) {
+        File userBackupDir = new File(backupDir, USER_FILES);
+        if (!userBackupDir.exists()) {
+          userBackupDir.mkdir();
+        }
+
+        for (DeployedJar loader : jarList) {
+          File source = new File(loader.getFileCanonicalPath());
+          File dest = new File(userBackupDir, source.getName());
+          if (source.isDirectory()) {
+            FileUtils.copyDirectory(source, dest);
+          } else {
+            FileUtils.copyFile(source, dest);
+          }
+          restoreScript.addFile(source, dest);
+        }
+      }
+    } finally {
+      /*
+       * Re-enable user deployed jar file updates.
+       */
+      if (null != deployer) {
+        deployer.resumeAll();
+      }
+    }
+  }
+
+  private File getBackupDir(File targetDir) throws IOException {
+    InternalDistributedMember memberId =
+        cache.getInternalDistributedSystem().getDistributedMember();
+    String vmId = memberId.toString();
+    vmId = cleanSpecialCharacters(vmId);
+    return new File(targetDir, vmId);
+  }
+
+  private void createBackupDir(File backupDir) throws IOException {
+    if (backupDir.exists()) {
+      throw new IOException("Backup directory " + backupDir.getAbsolutePath() + " already exists.");
+    }
+
+    if (!backupDir.mkdirs()) {
+      throw new IOException("Could not create directory: " + backupDir);
+    }
+
+    File incompleteFile = new File(backupDir, INCOMPLETE_BACKUP_FILE);
+    if (!incompleteFile.createNewFile()) {
+      throw new IOException("Could not create file: " + incompleteFile);
+    }
+
+    File readme = new File(backupDir, README_FILE);
+    FileOutputStream fos = new FileOutputStream(readme);
+
+    try {
+      String text = LocalizedStrings.BackupManager_README.toLocalizedString();
+      fos.write(text.getBytes());
+    } finally {
+      fos.close();
+    }
+  }
+
+  private String cleanSpecialCharacters(String string) {
+    return string.replaceAll("[^\\w]+", "_");
+  }
+
+  public void memberDeparted(InternalDistributedMember id, boolean crashed) {
+    cleanup();
+  }
+
+  public void memberJoined(InternalDistributedMember id) {}
+
+  public void quorumLost(Set<InternalDistributedMember> failures,
+      List<InternalDistributedMember> remaining) {}
+
+  public void memberSuspect(InternalDistributedMember id, InternalDistributedMember whoSuspected,
+      String reason) {}
+
+  public void waitForBackup() {
+    try {
+      allowDestroys.await();
+    } catch (InterruptedException e) {
+      throw new InternalGemFireError(e);
+    }
+  }
+
+  public boolean isCancelled() {
+    return isCancelled;
+  }
+
+  public DiskStoreBackup getBackupForDiskStore(DiskStoreImpl diskStore) {
+    return backupByDiskStore.get(diskStore);
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/3bb6a221/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreBackup.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreBackup.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreBackup.java
index 309dea3..53c5ca1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreBackup.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreBackup.java
@@ -25,17 +25,16 @@ import org.apache.geode.internal.cache.persistence.BackupInspector;
  * This class manages the state of the backup of an individual disk store. It holds the list of
  * oplogs that still need to be backed up, along with the lists of oplog files that should be
  * deleted when the oplog is backed up. See
- * {@link DiskStoreImpl#startBackup(File, BackupInspector, org.apache.geode.internal.cache.persistence.RestoreScript)}
  */
 public class DiskStoreBackup {
 
   private final Set<Oplog> pendingBackup;
-  private final Set<Oplog> deferredCrfDeletes = new HashSet<Oplog>();
-  private final Set<Oplog> deferredDrfDeletes = new HashSet<Oplog>();
+  private final Set<Oplog> deferredCrfDeletes = new HashSet<>();
+  private final Set<Oplog> deferredDrfDeletes = new HashSet<>();
   private final File targetDir;
 
   public DiskStoreBackup(Oplog[] allOplogs, File targetDir) {
-    this.pendingBackup = new HashSet<Oplog>(Arrays.asList(allOplogs));
+    this.pendingBackup = new HashSet<>(Arrays.asList(allOplogs));
     this.targetDir = targetDir;
   }
 
@@ -70,7 +69,7 @@ public class DiskStoreBackup {
   }
 
   public synchronized Set<Oplog> getPendingBackup() {
-    return new HashSet<Oplog>(pendingBackup);
+    return new HashSet<>(pendingBackup);
   }
 
   public synchronized void backupFinished(Oplog oplog) {

http://git-wip-us.apache.org/repos/asf/geode/blob/3bb6a221/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreFactoryImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreFactoryImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreFactoryImpl.java
index 0288ef1..d6d55d6 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreFactoryImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreFactoryImpl.java
@@ -21,7 +21,6 @@ import org.apache.geode.GemFireIOException;
 import org.apache.geode.cache.DiskStoreFactory;
 import org.apache.geode.cache.DiskStore;
 import org.apache.geode.distributed.internal.ResourceEvent;
-import org.apache.geode.internal.cache.persistence.BackupManager;
 import org.apache.geode.internal.cache.xmlcache.CacheCreation;
 import org.apache.geode.internal.cache.xmlcache.CacheXml;
 import org.apache.geode.internal.cache.xmlcache.DiskStoreAttributesCreation;

http://git-wip-us.apache.org/repos/asf/geode/blob/3bb6a221/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
index 94d1253..a8a8a53 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
@@ -33,8 +33,6 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
@@ -60,7 +58,6 @@ import java.util.regex.Pattern;
 
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
-import org.apache.commons.io.FileUtils;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.CancelCriterion;
@@ -86,8 +83,6 @@ import org.apache.geode.internal.cache.DiskEntry.RecoveredEntry;
 import org.apache.geode.internal.cache.ExportDiskRegion.ExportWriter;
 import org.apache.geode.internal.cache.lru.LRUAlgorithm;
 import org.apache.geode.internal.cache.lru.LRUStatistics;
-import org.apache.geode.internal.cache.persistence.BackupInspector;
-import org.apache.geode.internal.cache.persistence.BackupManager;
 import org.apache.geode.internal.cache.persistence.BytesAndBits;
 import org.apache.geode.internal.cache.persistence.DiskRecoveryStore;
 import org.apache.geode.internal.cache.persistence.DiskRegionView;
@@ -97,7 +92,6 @@ import org.apache.geode.internal.cache.persistence.OplogType;
 import org.apache.geode.internal.cache.persistence.PRPersistentConfig;
 import org.apache.geode.internal.cache.persistence.PersistentMemberID;
 import org.apache.geode.internal.cache.persistence.PersistentMemberPattern;
-import org.apache.geode.internal.cache.persistence.RestoreScript;
 import org.apache.geode.internal.cache.snapshot.GFSnapshot;
 import org.apache.geode.internal.cache.snapshot.GFSnapshot.SnapshotWriter;
 import org.apache.geode.internal.cache.snapshot.SnapshotPacket.SnapshotRecord;
@@ -126,8 +120,6 @@ import org.apache.geode.pdx.internal.PeerTypeRegistration;
 public class DiskStoreImpl implements DiskStore {
   private static final Logger logger = LogService.getLogger();
 
-  private static final String BACKUP_DIR_PREFIX = "dir";
-
   public static final boolean KRF_DEBUG = Boolean.getBoolean("disk.KRF_DEBUG");
 
   public static final int MAX_OPEN_INACTIVE_OPLOGS =
@@ -302,8 +294,6 @@ public class DiskStoreImpl implements DiskStore {
 
   private DiskInitFile initFile = null;
 
-  private volatile DiskStoreBackup diskStoreBackup = null;
-
   private final ReentrantReadWriteLock compactorLock = new ReentrantReadWriteLock();
 
   private final WriteLock compactorWriteLock = compactorLock.writeLock();
@@ -672,6 +662,10 @@ public class DiskStoreImpl implements DiskStore {
     }
   }
 
+  public PersistentOplogSet getPersistentOplogSet() {
+    return persistentOplogs;
+  }
+
   PersistentOplogSet getPersistentOplogSet(DiskRegionView drv) {
     assert drv.isBackup();
     return persistentOplogs;
@@ -2031,6 +2025,10 @@ public class DiskStoreImpl implements DiskStore {
     return this.directories[this.infoFileDirIndex];
   }
 
+  int getInforFileDirIndex() {
+    return this.infoFileDirIndex;
+  }
+
   /**
    * returns the size of the biggest directory available to the region
    */
@@ -2692,84 +2690,9 @@ public class DiskStoreImpl implements DiskStore {
   }
 
   /**
-   * Returns the dir name used to back up this DiskStore's directories under. The name is a
-   * concatenation of the disk store name and id.
-   */
-  public String getBackupDirName() {
-    String name = getName();
-
-    if (name == null) {
-      name = GemFireCacheImpl.getDefaultDiskStoreName();
-    }
-
-    return (name + "_" + getDiskStoreID().toString());
-  }
-
-  /**
-   * Filters and returns the current set of oplogs that aren't already in the baseline for
-   * incremental backup
-   * 
-   * @param baselineInspector the inspector for the previous backup.
-   * @param baselineCopyMap this will be populated with baseline oplogs Files that will be used in
-   *        the restore script.
-   * @return an array of Oplogs to be copied for an incremental backup.
-   */
-  private Oplog[] filterBaselineOplogs(BackupInspector baselineInspector,
-      Map<File, File> baselineCopyMap) throws IOException {
-    File baselineDir = new File(baselineInspector.getBackupDir(), BackupManager.DATA_STORES);
-    baselineDir = new File(baselineDir, getBackupDirName());
-
-    // Find all of the member's diskstore oplogs in the member's baseline
-    // diskstore directory structure (*.crf,*.krf,*.drf)
-    Collection<File> baselineOplogFiles =
-        FileUtils.listFiles(baselineDir, new String[] {"krf", "drf", "crf"}, true);
-    // Our list of oplogs to copy (those not already in the baseline)
-    List<Oplog> oplogList = new LinkedList<Oplog>();
-
-    // Total list of member oplogs
-    Oplog[] allOplogs = getAllOplogsForBackup();
-
-    /*
-     * Loop through operation logs and see if they are already part of the baseline backup.
-     */
-    for (Oplog log : allOplogs) {
-      // See if they are backed up in the current baseline
-      Map<File, File> oplogMap = log.mapBaseline(baselineOplogFiles);
-
-      // No? Then see if they were backed up in previous baselines
-      if (oplogMap.isEmpty() && baselineInspector.isIncremental()) {
-        Set<String> matchingOplogs =
-            log.gatherMatchingOplogFiles(baselineInspector.getIncrementalOplogFileNames());
-        if (!matchingOplogs.isEmpty()) {
-          for (String matchingOplog : matchingOplogs) {
-            oplogMap.put(new File(baselineInspector.getCopyFromForOplogFile(matchingOplog)),
-                new File(baselineInspector.getCopyToForOplogFile(matchingOplog)));
-          }
-        }
-      }
-
-      if (oplogMap.isEmpty()) {
-        /*
-         * These are fresh operation log files so lets back them up.
-         */
-        oplogList.add(log);
-      } else {
-        /*
-         * These have been backed up before so lets just add their entries from the previous backup
-         * or restore script into the current one.
-         */
-        baselineCopyMap.putAll(oplogMap);
-      }
-    }
-
-    // Convert the filtered oplog list to an array
-    return oplogList.toArray(new Oplog[oplogList.size()]);
-  }
-
-  /**
    * Get all of the oplogs
    */
-  private Oplog[] getAllOplogsForBackup() {
+  Oplog[] getAllOplogsForBackup() {
     return persistentOplogs.getAllOplogs();
   }
 
@@ -4066,124 +3989,6 @@ public class DiskStoreImpl implements DiskStore {
     getBackupLock().unlockForBackup();
   }
 
-  /**
-   * Start the backup process. This is the second step of the backup process. In this method, we
-   * define the data we're backing up by copying the init file and rolling to the next file. After
-   * this method returns operations can proceed as normal, except that we don't remove oplogs.
-   */
-  public void startBackup(File targetDir, BackupInspector baselineInspector,
-      RestoreScript restoreScript) throws IOException {
-    getBackupLock().setBackupThread();
-    boolean done = false;
-    try {
-      for (;;) {
-        Oplog childOplog = persistentOplogs.getChild();
-        if (childOplog == null) {
-          this.diskStoreBackup = new DiskStoreBackup(new Oplog[0], targetDir);
-          break;
-        }
-
-        // Get an appropriate lock object for each set of oplogs.
-        Object childLock = childOplog.lock;
-
-        // TODO - We really should move this lock into the disk store, but
-        // until then we need to do this magic to make sure we're actually
-        // locking the latest child for both types of oplogs
-
-        // This ensures that all writing to disk is blocked while we are
-        // creating the snapshot
-        synchronized (childLock) {
-          if (persistentOplogs.getChild() != childOplog) {
-            continue;
-          }
-
-          if (logger.isDebugEnabled()) {
-            logger.debug("snapshotting oplogs for disk store {}", getName());
-          }
-
-          // Create the directories for this disk store
-          for (int i = 0; i < directories.length; i++) {
-            File dir = getBackupDir(targetDir, i);
-            if (!dir.mkdirs()) {
-              throw new IOException("Could not create directory " + dir);
-            }
-            restoreScript.addFile(directories[i].getDir(), dir);
-          }
-
-          restoreScript.addExistenceTest(this.initFile.getIFFile());
-
-          // Contains all oplogs that will backed up
-          Oplog[] allOplogs = null;
-
-          // Incremental backup so filter out oplogs that have already been
-          // backed up
-          if (null != baselineInspector) {
-            Map<File, File> baselineCopyMap = new HashMap<File, File>();
-            allOplogs = filterBaselineOplogs(baselineInspector, baselineCopyMap);
-            restoreScript.addBaselineFiles(baselineCopyMap);
-          } else {
-            allOplogs = getAllOplogsForBackup();
-          }
-
-          // mark all oplogs as being backed up. This will
-          // prevent the oplogs from being deleted
-          this.diskStoreBackup = new DiskStoreBackup(allOplogs, targetDir);
-
-          // copy the init file
-          File firstDir = getBackupDir(targetDir, infoFileDirIndex);
-          initFile.copyTo(firstDir);
-          persistentOplogs.forceRoll(null);
-
-          if (logger.isDebugEnabled()) {
-            logger.debug("done snaphotting for disk store {}", getName());
-          }
-          break;
-        }
-      }
-      done = true;
-    } finally {
-      if (!done) {
-        clearBackup();
-      }
-    }
-  }
-
-  private File getBackupDir(File targetDir, int index) {
-    return new File(targetDir, BACKUP_DIR_PREFIX + index);
-  }
-
-  /**
-   * Copy the oplogs to the backup directory. This is the final step of the backup process. The
-   * oplogs we copy are defined in the startBackup method.
-   */
-  public void finishBackup(BackupManager backupManager) throws IOException {
-    if (diskStoreBackup == null) {
-      return;
-    }
-    try {
-      // Wait for oplogs to be unpreblown before backing them up.
-      waitForDelayedWrites();
-
-      // Backup all of the oplogs
-      for (Oplog oplog : this.diskStoreBackup.getPendingBackup()) {
-        if (backupManager.isCancelled()) {
-          break;
-        }
-        // Copy theoplog to the destination directory
-        int index = oplog.getDirectoryHolder().getArrayIndex();
-        File backupDir = getBackupDir(this.diskStoreBackup.getTargetDir(), index);
-        // TODO prpersist - We could probably optimize this to *move* the files
-        // that we know are supposed to be deleted.
-        oplog.copyTo(backupDir);
-
-        // Allow the oplog to be deleted, and process any pending delete
-        this.diskStoreBackup.backupFinished(oplog);
-      }
-    } finally {
-      clearBackup();
-    }
-  }
-
   private int getArrayIndexOfDirectory(File searchDir) {
     for (DirectoryHolder holder : directories) {
       if (holder.getDir().equals(searchDir)) {
@@ -4197,16 +4002,9 @@ public class DiskStoreImpl implements DiskStore {
     return this.directories;
   }
 
-  private void clearBackup() {
-    DiskStoreBackup backup = this.diskStoreBackup;
-    if (backup != null) {
-      this.diskStoreBackup = null;
-      backup.cleanup();
-    }
-  }
-
   public DiskStoreBackup getInProgressBackup() {
-    return diskStoreBackup;
+    BackupManager backupManager = cache.getBackupManager();
+    return backupManager == null ? null : backupManager.getBackupForDiskStore(this);
   }
 
   public Collection<DiskRegionView> getKnown() {

http://git-wip-us.apache.org/repos/asf/geode/blob/3bb6a221/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
index 67c8add..6d250d9 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
@@ -79,6 +79,8 @@ import com.sun.jna.Platform;
 import org.apache.commons.lang.StringUtils;
 import org.apache.logging.log4j.Logger;
 
+import org.apache.geode.internal.cache.event.EventTrackerExpiryTask;
+import org.apache.geode.internal.security.SecurityServiceFactory;
 import org.apache.geode.CancelCriterion;
 import org.apache.geode.CancelException;
 import org.apache.geode.ForcedDisconnectException;
@@ -184,7 +186,6 @@ import org.apache.geode.internal.cache.locks.TXLockService;
 import org.apache.geode.internal.cache.lru.HeapEvictor;
 import org.apache.geode.internal.cache.lru.OffHeapEvictor;
 import org.apache.geode.internal.cache.partitioned.RedundancyAlreadyMetException;
-import org.apache.geode.internal.cache.persistence.BackupManager;
 import org.apache.geode.internal.cache.persistence.PersistentMemberID;
 import org.apache.geode.internal.cache.persistence.PersistentMemberManager;
 import org.apache.geode.internal.cache.snapshot.CacheSnapshotServiceImpl;
@@ -4351,7 +4352,7 @@ public class GemFireCacheImpl implements InternalCache, InternalClientCache, Has
     if (!this.backupManager.compareAndSet(null, manager)) {
       throw new IOException("Backup already in progress");
     }
-    manager.start();
+    manager.validateRequestingAdmin();
     return manager;
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/3bb6a221/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
index d162010..84aa66e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
@@ -55,7 +55,6 @@ import org.apache.geode.internal.cache.control.InternalResourceManager;
 import org.apache.geode.internal.cache.control.ResourceAdvisor;
 import org.apache.geode.internal.cache.event.EventTrackerExpiryTask;
 import org.apache.geode.internal.cache.extension.Extensible;
-import org.apache.geode.internal.cache.persistence.BackupManager;
 import org.apache.geode.internal.cache.persistence.PersistentMemberManager;
 import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
 import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;

http://git-wip-us.apache.org/repos/asf/geode/blob/3bb6a221/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java b/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
index 80f19b5..860db98 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
@@ -5702,6 +5702,7 @@ public class Oplog implements CompactableOplog, Flushable {
 
   public void deleteCRF() {
     oplogSet.crfDelete(this.oplogId);
+    BackupManager backupManager = getInternalCache().getBackupManager();
     DiskStoreBackup inProgressBackup = getParent().getInProgressBackup();
     if (inProgressBackup == null || !inProgressBackup.deferCrfDelete(this)) {
       deleteCRFFileOnly();

http://git-wip-us.apache.org/repos/asf/geode/blob/3bb6a221/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
index 893ca6b..3d9ac18 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
@@ -42,7 +42,6 @@ import org.apache.geode.internal.cache.execute.PartitionedRegionFunctionResultSe
 import org.apache.geode.internal.cache.execute.RegionFunctionContextImpl;
 import org.apache.geode.internal.cache.partitioned.*;
 import org.apache.geode.internal.cache.partitioned.RemoveBucketMessage.RemoveBucketResponse;
-import org.apache.geode.internal.cache.persistence.BackupManager;
 import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
 import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySender;

http://git-wip-us.apache.org/repos/asf/geode/blob/3bb6a221/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/BackupManager.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/BackupManager.java b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/BackupManager.java
deleted file mode 100644
index f464e0d..0000000
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/BackupManager.java
+++ /dev/null
@@ -1,389 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.internal.cache.persistence;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.commons.io.FileUtils;
-
-import org.apache.geode.InternalGemFireError;
-import org.apache.geode.cache.DiskStore;
-import org.apache.geode.cache.persistence.PersistentID;
-import org.apache.geode.distributed.DistributedSystem;
-import org.apache.geode.distributed.internal.DM;
-import org.apache.geode.distributed.internal.DistributionConfig;
-import org.apache.geode.distributed.internal.MembershipListener;
-import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.ClassPathLoader;
-import org.apache.geode.internal.DeployedJar;
-import org.apache.geode.internal.JarDeployer;
-import org.apache.geode.internal.cache.DiskStoreImpl;
-import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.i18n.LocalizedStrings;
-
-/**
- * This class manages the state an logic to backup a single cache.
- */
-public class BackupManager implements MembershipListener {
-
-  // TODO prpersist internationalize this.
-  public static final String INCOMPLETE_BACKUP = "INCOMPLETE_BACKUP";
-  public static final String README = "README.txt";
-  public static final String DATA_STORES = "diskstores";
-  public static final String USER_FILES = "user";
-  public static final String CONFIG = "config";
-  private InternalDistributedMember sender;
-  private InternalCache cache;
-  private CountDownLatch allowDestroys = new CountDownLatch(1);
-  private volatile boolean isCancelled = false;
-
-  public BackupManager(InternalDistributedMember sender, InternalCache gemFireCache) {
-    this.sender = sender;
-    this.cache = gemFireCache;
-  }
-
-  public void start() {
-    final DM distributionManager = cache.getInternalDistributedSystem().getDistributionManager();
-    // We need to watch for pure admin guys that depart. this allMembershipListener set
-    // looks like it should receive those events.
-    Set allIds = distributionManager.addAllMembershipListenerAndGetAllIds(this);
-    if (!allIds.contains(sender)) {
-      cleanup();
-      throw new IllegalStateException("The admin member requesting a backup has already departed");
-    }
-  }
-
-  private void cleanup() {
-    isCancelled = true;
-    allowDestroys.countDown();
-    Collection<DiskStore> diskStores = cache.listDiskStoresIncludingRegionOwned();
-    for (DiskStore store : diskStores) {
-      ((DiskStoreImpl) store).releaseBackupLock();
-    }
-    final DM distributionManager = cache.getInternalDistributedSystem().getDistributionManager();
-    distributionManager.removeAllMembershipListener(this);
-    cache.clearBackupManager();
-  }
-
-  public HashSet<PersistentID> prepareBackup() {
-    HashSet<PersistentID> persistentIds = new HashSet<PersistentID>();
-    Collection<DiskStore> diskStores = cache.listDiskStoresIncludingRegionOwned();
-    for (DiskStore store : diskStores) {
-      DiskStoreImpl storeImpl = (DiskStoreImpl) store;
-      storeImpl.lockStoreBeforeBackup();
-      if (storeImpl.hasPersistedData()) {
-        persistentIds.add(storeImpl.getPersistentID());
-        storeImpl.getStats().startBackup();
-      }
-    }
-    return persistentIds;
-  }
-
-  /**
-   * Returns the memberId directory for this member in the baseline. The memberId may have changed
-   * if this member has been restarted since the last backup.
-   * 
-   * @param baselineParentDir parent directory of last backup.
-   * @return null if the baseline for this member could not be located.
-   */
-  private File findBaselineForThisMember(File baselineParentDir) {
-    File baselineDir = null;
-
-    /*
-     * Find the first matching DiskStoreId directory for this member.
-     */
-    for (DiskStore diskStore : cache.listDiskStoresIncludingRegionOwned()) {
-      File[] matchingFiles = baselineParentDir.listFiles(new FilenameFilter() {
-        Pattern pattern =
-            Pattern.compile(".*" + ((DiskStoreImpl) diskStore).getBackupDirName() + "$");
-
-        public boolean accept(File dir, String name) {
-          Matcher m = pattern.matcher(name);
-          return m.find();
-        }
-      });
-      // We found it? Good. Set this member's baseline to the backed up disk store's member dir (two
-      // levels up).
-      if (null != matchingFiles && matchingFiles.length > 0)
-        baselineDir = matchingFiles[0].getParentFile().getParentFile();
-    }
-    return baselineDir;
-  }
-
-  /**
-   * Performs a sanity check on the baseline directory for incremental backups. If a baseline
-   * directory exists for the member and there is no INCOMPLETE_BACKUP file then return the data
-   * stores directory for this member.
-   * 
-   * @param baselineParentDir a previous backup directory. This is used with the incremental backup
-   *        option. May be null if the user specified a full backup.
-   * @return null if the backup is to be a full backup otherwise return the data store directory in
-   *         the previous backup for this member (if incremental).
-   */
-  private File checkBaseline(File baselineParentDir) throws IOException {
-    File baselineDir = null;
-
-    if (null != baselineParentDir) {
-      // Start by looking for this memberId
-      baselineDir = getBackupDir(baselineParentDir);
-
-      if (!baselineDir.exists()) {
-        // hmmm, did this member have a restart?
-        // Determine which member dir might be a match for us
-        baselineDir = findBaselineForThisMember(baselineParentDir);
-      }
-
-      if (null != baselineDir) {
-        // check for existence of INCOMPLETE_BACKUP file
-        File incompleteBackup = new File(baselineDir, INCOMPLETE_BACKUP);
-        if (incompleteBackup.exists()) {
-          baselineDir = null;
-        }
-      }
-    }
-
-    return baselineDir;
-  }
-
-  public HashSet<PersistentID> finishBackup(File targetDir, File baselineDir, boolean abort)
-      throws IOException {
-    try {
-      if (abort) {
-        return new HashSet<PersistentID>();
-      }
-
-      File backupDir = getBackupDir(targetDir);
-
-      // Make sure our baseline is okay for this member
-      baselineDir = checkBaseline(baselineDir);
-
-      // Create an inspector for the baseline backup
-      BackupInspector inspector =
-          (baselineDir == null ? null : BackupInspector.createInspector(baselineDir));
-
-      File storesDir = new File(backupDir, DATA_STORES);
-      RestoreScript restoreScript = new RestoreScript();
-      HashSet<PersistentID> persistentIds = new HashSet<PersistentID>();
-      Collection<DiskStore> diskStores =
-          new ArrayList<DiskStore>(cache.listDiskStoresIncludingRegionOwned());
-
-      boolean foundPersistentData = false;
-      for (Iterator<DiskStore> itr = diskStores.iterator(); itr.hasNext();) {
-        DiskStoreImpl store = (DiskStoreImpl) itr.next();
-        if (store.hasPersistedData()) {
-          if (!foundPersistentData) {
-            createBackupDir(backupDir);
-            foundPersistentData = true;
-          }
-          File diskStoreDir = new File(storesDir, store.getBackupDirName());
-          diskStoreDir.mkdir();
-          store.startBackup(diskStoreDir, inspector, restoreScript);
-        } else {
-          itr.remove();
-        }
-        store.releaseBackupLock();
-      }
-
-      allowDestroys.countDown();
-
-      for (DiskStore store : diskStores) {
-        DiskStoreImpl storeImpl = (DiskStoreImpl) store;
-        storeImpl.finishBackup(this);
-        storeImpl.getStats().endBackup();
-        persistentIds.add(storeImpl.getPersistentID());
-      }
-
-      if (foundPersistentData) {
-        backupConfigFiles(restoreScript, backupDir);
-        backupUserFiles(restoreScript, backupDir);
-        backupDeployedJars(restoreScript, backupDir);
-        restoreScript.generate(backupDir);
-        File incompleteFile = new File(backupDir, INCOMPLETE_BACKUP);
-        if (!incompleteFile.delete()) {
-          throw new IOException("Could not delete file " + INCOMPLETE_BACKUP);
-        }
-      }
-
-      return persistentIds;
-
-    } finally {
-      cleanup();
-    }
-  }
-
-  public void abort() {
-    cleanup();
-  }
-
-  private void backupConfigFiles(RestoreScript restoreScript, File backupDir) throws IOException {
-    File configBackupDir = new File(backupDir, CONFIG);
-    configBackupDir.mkdirs();
-    URL url = cache.getCacheXmlURL();
-    if (url != null) {
-      File cacheXMLBackup =
-          new File(configBackupDir, DistributionConfig.DEFAULT_CACHE_XML_FILE.getName());
-      FileUtils.copyFile(new File(cache.getCacheXmlURL().getFile()), cacheXMLBackup);
-    }
-
-    URL propertyURL = DistributedSystem.getPropertiesFileURL();
-    if (propertyURL != null) {
-      File propertyBackup =
-          new File(configBackupDir, DistributionConfig.GEMFIRE_PREFIX + "properties");
-      FileUtils.copyFile(new File(DistributedSystem.getPropertiesFile()), propertyBackup);
-    }
-
-    // TODO: should the gfsecurity.properties file be backed up?
-  }
-
-  private void backupUserFiles(RestoreScript restoreScript, File backupDir) throws IOException {
-    List<File> backupFiles = cache.getBackupFiles();
-    File userBackupDir = new File(backupDir, USER_FILES);
-    if (!userBackupDir.exists()) {
-      userBackupDir.mkdir();
-    }
-    for (File original : backupFiles) {
-      if (original.exists()) {
-        original = original.getAbsoluteFile();
-        File dest = new File(userBackupDir, original.getName());
-        if (original.isDirectory()) {
-          FileUtils.copyDirectory(original, dest);
-        } else {
-          FileUtils.copyFile(original, dest);
-        }
-        restoreScript.addExistenceTest(original);
-        restoreScript.addFile(original, dest);
-      }
-    }
-  }
-
-  /**
-   * Copies user deployed jars to the backup directory.
-   * 
-   * @param restoreScript Used to restore from this backup.
-   * @param backupDir The backup directory for this member.
-   * @throws IOException one or more of the jars did not successfully copy.
-   */
-  private void backupDeployedJars(RestoreScript restoreScript, File backupDir) throws IOException {
-    JarDeployer deployer = null;
-
-    try {
-      /*
-       * Suspend any user deployed jar file updates during this backup.
-       */
-      deployer = ClassPathLoader.getLatest().getJarDeployer();
-      deployer.suspendAll();
-
-      List<DeployedJar> jarList = deployer.findDeployedJars();
-      if (!jarList.isEmpty()) {
-        File userBackupDir = new File(backupDir, USER_FILES);
-        if (!userBackupDir.exists()) {
-          userBackupDir.mkdir();
-        }
-
-        for (DeployedJar loader : jarList) {
-          File source = new File(loader.getFileCanonicalPath());
-          File dest = new File(userBackupDir, source.getName());
-          if (source.isDirectory()) {
-            FileUtils.copyDirectory(source, dest);
-          } else {
-            FileUtils.copyFile(source, dest);
-          }
-          restoreScript.addFile(source, dest);
-        }
-      }
-    } finally {
-      /*
-       * Re-enable user deployed jar file updates.
-       */
-      if (null != deployer) {
-        deployer.resumeAll();
-      }
-    }
-  }
-
-  private File getBackupDir(File targetDir) throws IOException {
-    InternalDistributedMember memberId =
-        cache.getInternalDistributedSystem().getDistributedMember();
-    String vmId = memberId.toString();
-    vmId = cleanSpecialCharacters(vmId);
-    return new File(targetDir, vmId);
-  }
-
-  private void createBackupDir(File backupDir) throws IOException {
-    if (backupDir.exists()) {
-      throw new IOException("Backup directory " + backupDir.getAbsolutePath() + " already exists.");
-    }
-
-    if (!backupDir.mkdirs()) {
-      throw new IOException("Could not create directory: " + backupDir);
-    }
-
-    File incompleteFile = new File(backupDir, INCOMPLETE_BACKUP);
-    if (!incompleteFile.createNewFile()) {
-      throw new IOException("Could not create file: " + incompleteFile);
-    }
-
-    File readme = new File(backupDir, README);
-    FileOutputStream fos = new FileOutputStream(readme);
-
-    try {
-      String text = LocalizedStrings.BackupManager_README.toLocalizedString();
-      fos.write(text.getBytes());
-    } finally {
-      fos.close();
-    }
-  }
-
-  private String cleanSpecialCharacters(String string) {
-    return string.replaceAll("[^\\w]+", "_");
-  }
-
-  public void memberDeparted(InternalDistributedMember id, boolean crashed) {
-    cleanup();
-  }
-
-  public void memberJoined(InternalDistributedMember id) {}
-
-  public void quorumLost(Set<InternalDistributedMember> failures,
-      List<InternalDistributedMember> remaining) {}
-
-  public void memberSuspect(InternalDistributedMember id, InternalDistributedMember whoSuspected,
-      String reason) {}
-
-  public void waitForBackup() {
-    try {
-      allowDestroys.await();
-    } catch (InterruptedException e) {
-      throw new InternalGemFireError(e);
-    }
-  }
-
-  public boolean isCancelled() {
-    return isCancelled;
-  }
-}

http://git-wip-us.apache.org/repos/asf/geode/blob/3bb6a221/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
index a7f2a11..e5e372d 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
@@ -108,7 +108,7 @@ import org.apache.geode.internal.cache.extension.Extensible;
 import org.apache.geode.internal.cache.extension.ExtensionPoint;
 import org.apache.geode.internal.cache.extension.SimpleExtensionPoint;
 import org.apache.geode.internal.cache.ha.HARegionQueue;
-import org.apache.geode.internal.cache.persistence.BackupManager;
+import org.apache.geode.internal.cache.BackupManager;
 import org.apache.geode.internal.cache.persistence.PersistentMemberManager;
 import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
 import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;

http://git-wip-us.apache.org/repos/asf/geode/blob/3bb6a221/geode-core/src/main/java/org/apache/geode/management/internal/beans/MemberMBeanBridge.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/MemberMBeanBridge.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/MemberMBeanBridge.java
index dd905eb..5105c3d 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/MemberMBeanBridge.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/MemberMBeanBridge.java
@@ -77,7 +77,7 @@ import org.apache.geode.internal.cache.PartitionedRegionStats;
 import org.apache.geode.internal.cache.control.ResourceManagerStats;
 import org.apache.geode.internal.cache.execute.FunctionServiceStats;
 import org.apache.geode.internal.cache.lru.LRUStatistics;
-import org.apache.geode.internal.cache.persistence.BackupManager;
+import org.apache.geode.internal.cache.BackupManager;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.log4j.LocalizedMessage;
@@ -1037,10 +1037,10 @@ public class MemberMBeanBridge {
         Set<PersistentID> existingDataStores;
         Set<PersistentID> successfulDataStores;
         try {
-          existingDataStores = manager.prepareBackup();
+          existingDataStores = manager.prepareForBackup();
           abort = false;
         } finally {
-          successfulDataStores = manager.finishBackup(targetDir, null/* TODO rishi */, abort);
+          successfulDataStores = manager.doBackup(targetDir, null/* TODO rishi */, abort);
         }
         diskBackUpResult = new DiskBackupResult[existingDataStores.size()];
         int j = 0;

http://git-wip-us.apache.org/repos/asf/geode/blob/3bb6a221/geode-core/src/test/java/org/apache/geode/internal/cache/BackupDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BackupDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/BackupDUnitTest.java
index f2cee71..338c712 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/BackupDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BackupDUnitTest.java
@@ -46,11 +46,13 @@ import org.apache.geode.test.dunit.DUnitEnv;
 import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.IgnoredException;
 import org.apache.geode.test.dunit.Invoke;
-import org.apache.geode.test.dunit.LogWriterUtils;
 import org.apache.geode.test.dunit.SerializableCallable;
 import org.apache.geode.test.dunit.SerializableRunnable;
 import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.junit.categories.DistributedTest;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -67,34 +69,38 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 
 @Category(DistributedTest.class)
 public class BackupDUnitTest extends PersistentPartitionedRegionTestBase {
+  Logger logger = LogManager.getLogger(BackupDUnitTest.class);
 
-  private static final long MAX_WAIT = 30 * 1000;
+  private static final long MAX_WAIT_SECONDS = 30;
+  private VM vm0;
+  private VM vm1;
 
   @Override
   public final void preTearDownCacheTestCase() throws Exception {
     StringBuilder failures = new StringBuilder();
     delete(getBackupDir(), failures);
     if (failures.length() > 0) {
-      LogWriterUtils.getLogWriter().error(failures.toString());
+      logger.error(failures.toString());
     }
   }
 
   @Test
   public void testBackupPR() throws Throwable {
     Host host = Host.getHost(0);
-    VM vm0 = host.getVM(0);
-    VM vm1 = host.getVM(1);
+    vm0 = host.getVM(0);
+    vm1 = host.getVM(1);
     VM vm2 = host.getVM(2);
 
-    LogWriterUtils.getLogWriter().info("Creating region in VM0");
+    logger.info("Creating region in VM0");
     createPersistentRegion(vm0);
-    LogWriterUtils.getLogWriter().info("Creating region in VM1");
+    logger.info("Creating region in VM1");
     createPersistentRegion(vm1);
 
     long lm0 = setBackupFiles(vm0);
@@ -107,7 +113,6 @@ public class BackupDUnitTest extends PersistentPartitionedRegionTestBase {
     assertEquals(2, status.getBackedUpDiskStores().size());
     assertEquals(Collections.emptySet(), status.getOfflineDiskStores());
 
-    Pattern pattern = Pattern.compile(".*my.txt.*");
     Collection<File> files = FileUtils.listFiles(getBackupDir(), new String[] {"txt"}, true);
     assertEquals(4, files.size());
     deleteOldUserUserFile(vm0);
@@ -136,13 +141,7 @@ public class BackupDUnitTest extends PersistentPartitionedRegionTestBase {
 
     restoreBackup(2);
 
-    LogWriterUtils.getLogWriter().info("Creating region in VM0");
-    AsyncInvocation async0 = createPersistentRegionAsync(vm0);
-    LogWriterUtils.getLogWriter().info("Creating region in VM1");
-    AsyncInvocation async1 = createPersistentRegionAsync(vm1);
-
-    async0.getResult(MAX_WAIT);
-    async1.getResult(MAX_WAIT);
+    createPersistentRegionsAsync();
 
     checkData(vm0, 0, 5, "A", "region1");
     checkData(vm0, 0, 5, "B", "region2");
@@ -156,12 +155,12 @@ public class BackupDUnitTest extends PersistentPartitionedRegionTestBase {
   @Test
   public void testBackupFromMemberWithDiskStore() throws Throwable {
     Host host = Host.getHost(0);
-    VM vm0 = host.getVM(0);
-    VM vm1 = host.getVM(1);
+    vm0 = host.getVM(0);
+    vm1 = host.getVM(1);
 
-    LogWriterUtils.getLogWriter().info("Creating region in VM0");
+    logger.info("Creating region in VM0");
     createPersistentRegion(vm0);
-    LogWriterUtils.getLogWriter().info("Creating region in VM1");
+    logger.info("Creating region in VM1");
     createPersistentRegion(vm1);
 
     createData(vm0, 0, 5, "A", "region1");
@@ -192,25 +191,21 @@ public class BackupDUnitTest extends PersistentPartitionedRegionTestBase {
 
     restoreBackup(2);
 
-    LogWriterUtils.getLogWriter().info("Creating region in VM0");
-    AsyncInvocation async0 = createPersistentRegionAsync(vm0);
-    LogWriterUtils.getLogWriter().info("Creating region in VM1");
-    AsyncInvocation async1 = createPersistentRegionAsync(vm1);
-
-    async0.getResult(MAX_WAIT);
-    async1.getResult(MAX_WAIT);
+    createPersistentRegionsAsync();
 
     checkData(vm0, 0, 5, "A", "region1");
     checkData(vm0, 0, 5, "B", "region2");
   }
 
-  // public void testLoop() throws Throwable {
-  // for(int i =0 ;i < 100; i++) {
-  // testBackupWhileBucketIsCreated();
-  // setUp();
-  // tearDown();
-  // }
-  // }
+  private void createPersistentRegionsAsync() throws java.util.concurrent.ExecutionException,
+      InterruptedException, java.util.concurrent.TimeoutException {
+    logger.info("Creating region in VM0");
+    AsyncInvocation async0 = createPersistentRegionAsync(vm0);
+    logger.info("Creating region in VM1");
+    AsyncInvocation async1 = createPersistentRegionAsync(vm1);
+    async0.get(MAX_WAIT_SECONDS, TimeUnit.SECONDS);
+    async1.get(MAX_WAIT_SECONDS, TimeUnit.SECONDS);
+  }
 
   /**
    * Test for bug 42419
@@ -218,40 +213,27 @@ public class BackupDUnitTest extends PersistentPartitionedRegionTestBase {
   @Test
   public void testBackupWhileBucketIsCreated() throws Throwable {
     Host host = Host.getHost(0);
-    final VM vm0 = host.getVM(0);
-    VM vm1 = host.getVM(1);
+    vm0 = host.getVM(0);
+    vm1 = host.getVM(1);
     final VM vm2 = host.getVM(2);
 
-    LogWriterUtils.getLogWriter().info("Creating region in VM0");
+    logger.info("Creating region in VM0");
     createPersistentRegion(vm0);
 
     // create a bucket on vm0
     createData(vm0, 0, 1, "A", "region1");
 
     // create the pr on vm1, which won't have any buckets
-    LogWriterUtils.getLogWriter().info("Creating region in VM1");
+    logger.info("Creating region in VM1");
     createPersistentRegion(vm1);
 
-    final AtomicReference<BackupStatus> statusRef = new AtomicReference<BackupStatus>();
-    Thread thread1 = new Thread() {
-      public void run() {
+    CompletableFuture<BackupStatus> backupStatusFuture =
+        CompletableFuture.supplyAsync(() -> backup(vm2));
+    CompletableFuture<Void> createDataFuture =
+        CompletableFuture.runAsync(() -> createData(vm0, 1, 5, "A", "region1"));
+    CompletableFuture.allOf(backupStatusFuture, createDataFuture);
 
-        BackupStatus status = backup(vm2);
-        statusRef.set(status);
-
-      }
-    };
-    thread1.start();
-    Thread thread2 = new Thread() {
-      public void run() {
-        createData(vm0, 1, 5, "A", "region1");
-      }
-    };
-    thread2.start();
-    thread1.join();
-    thread2.join();
-
-    BackupStatus status = statusRef.get();
+    BackupStatus status = backupStatusFuture.get();
     assertEquals(2, status.getBackedUpDiskStores().size());
     assertEquals(Collections.emptySet(), status.getOfflineDiskStores());
 
@@ -278,13 +260,7 @@ public class BackupDUnitTest extends PersistentPartitionedRegionTestBase {
 
     restoreBackup(2);
 
-    LogWriterUtils.getLogWriter().info("Creating region in VM0");
-    AsyncInvocation async0 = createPersistentRegionAsync(vm0);
-    LogWriterUtils.getLogWriter().info("Creating region in VM1");
-    AsyncInvocation async1 = createPersistentRegionAsync(vm1);
-
-    async0.getResult(MAX_WAIT);
-    async1.getResult(MAX_WAIT);
+    createPersistentRegionsAsync();
 
     checkData(vm0, 0, 1, "A", "region1");
   }
@@ -296,8 +272,6 @@ public class BackupDUnitTest extends PersistentPartitionedRegionTestBase {
 
     DistributionMessageObserver observer = new SerializableDistributionMessageObserver() {
       private volatile boolean done;
-      private AtomicInteger count = new AtomicInteger();
-      private volatile int replyId = -0xBAD;
 
       @Override
       public void beforeSendMessage(DistributionManager dm, DistributionMessage msg) {
@@ -316,8 +290,8 @@ public class BackupDUnitTest extends PersistentPartitionedRegionTestBase {
   @Test
   public void testBackupWhileBucketIsMovedBackupAfterSendDestroy() throws Throwable {
     Host host = Host.getHost(0);
-    final VM vm0 = host.getVM(0);
-    VM vm1 = host.getVM(1);
+    vm0 = host.getVM(0);
+    vm1 = host.getVM(1);
     final VM vm2 = host.getVM(2);
 
     DistributionMessageObserver observer = new SerializableDistributionMessageObserver() {
@@ -407,12 +381,11 @@ public class BackupDUnitTest extends PersistentPartitionedRegionTestBase {
    * 
    * @param observer - a message observer that triggers at the backup at the correct time.
    */
-  public void backupWhileBucketIsMoved(final DistributionMessageObserver observer)
+  private void backupWhileBucketIsMoved(final DistributionMessageObserver observer)
       throws Throwable {
     Host host = Host.getHost(0);
-    final VM vm0 = host.getVM(0);
-    VM vm1 = host.getVM(1);
-    final VM vm2 = host.getVM(2);
+    vm0 = host.getVM(0);
+    vm1 = host.getVM(1);
 
     vm0.invoke(new SerializableRunnable("Add listener to invoke backup") {
 
@@ -428,14 +401,14 @@ public class BackupDUnitTest extends PersistentPartitionedRegionTestBase {
     });
     try {
 
-      LogWriterUtils.getLogWriter().info("Creating region in VM0");
+      logger.info("Creating region in VM0");
       createPersistentRegion(vm0);
 
       // create twos bucket on vm0
       createData(vm0, 0, 2, "A", "region1");
 
       // create the pr on vm1, which won't have any buckets
-      LogWriterUtils.getLogWriter().info("Creating region in VM1");
+      logger.info("Creating region in VM1");
 
       createPersistentRegion(vm1);
 
@@ -476,13 +449,7 @@ public class BackupDUnitTest extends PersistentPartitionedRegionTestBase {
 
       restoreBackup(2);
 
-      LogWriterUtils.getLogWriter().info("Creating region in VM0");
-      AsyncInvocation async0 = createPersistentRegionAsync(vm0);
-      LogWriterUtils.getLogWriter().info("Creating region in VM1");
-      AsyncInvocation async1 = createPersistentRegionAsync(vm1);
-
-      async0.getResult(MAX_WAIT);
-      async1.getResult(MAX_WAIT);
+      createPersistentRegionsAsync();
 
       checkData(vm0, 0, 2, "A", "region1");
     } finally {
@@ -502,13 +469,13 @@ public class BackupDUnitTest extends PersistentPartitionedRegionTestBase {
   @Test
   public void testBackupOverflow() throws Throwable {
     Host host = Host.getHost(0);
-    VM vm0 = host.getVM(0);
-    VM vm1 = host.getVM(1);
+    vm0 = host.getVM(0);
+    vm1 = host.getVM(1);
     VM vm2 = host.getVM(2);
 
-    LogWriterUtils.getLogWriter().info("Creating region in VM0");
+    logger.info("Creating region in VM0");
     createPersistentRegion(vm0);
-    LogWriterUtils.getLogWriter().info("Creating region in VM1");
+    logger.info("Creating region in VM1");
     createOverflowRegion(vm1);
 
     createData(vm0, 0, 5, "A", "region1");
@@ -526,16 +493,16 @@ public class BackupDUnitTest extends PersistentPartitionedRegionTestBase {
   @Test
   public void testBackupPRWithOfflineMembers() throws Throwable {
     Host host = Host.getHost(0);
-    VM vm0 = host.getVM(0);
-    VM vm1 = host.getVM(1);
+    vm0 = host.getVM(0);
+    vm1 = host.getVM(1);
     VM vm2 = host.getVM(2);
     VM vm3 = host.getVM(3);
 
-    LogWriterUtils.getLogWriter().info("Creating region in VM0");
+    logger.info("Creating region in VM0");
     createPersistentRegion(vm0);
-    LogWriterUtils.getLogWriter().info("Creating region in VM1");
+    logger.info("Creating region in VM1");
     createPersistentRegion(vm1);
-    LogWriterUtils.getLogWriter().info("Creating region in VM2");
+    logger.info("Creating region in VM2");
     createPersistentRegion(vm2);
 
     createData(vm0, 0, 5, "A", "region1");
@@ -562,11 +529,11 @@ public class BackupDUnitTest extends PersistentPartitionedRegionTestBase {
     assertTrue(files.length == 0);
   }
 
-  protected void createPersistentRegion(VM vm) throws Throwable {
+  private void createPersistentRegion(VM vm) throws Throwable {
     AsyncInvocation future = createPersistentRegionAsync(vm);
-    future.join(MAX_WAIT);
+    future.get(MAX_WAIT_SECONDS, TimeUnit.SECONDS);
     if (future.isAlive()) {
-      fail("Region not created within" + MAX_WAIT);
+      fail("Region not created within" + MAX_WAIT_SECONDS);
     }
     if (future.exceptionOccurred()) {
       throw new RuntimeException(future.getException());
@@ -576,9 +543,8 @@ public class BackupDUnitTest extends PersistentPartitionedRegionTestBase {
   private void deleteOldUserUserFile(final VM vm) {
     SerializableRunnable validateUserFileBackup = new SerializableRunnable("set user backups") {
       public void run() {
-        final int pid = vm.getPid();
         try {
-          FileUtils.deleteDirectory(new File("userbackup_" + pid));
+          FileUtils.deleteDirectory(new File("userbackup_" + vm.getPid()));
         } catch (IOException e) {
           fail(e.getMessage());
         }
@@ -587,7 +553,7 @@ public class BackupDUnitTest extends PersistentPartitionedRegionTestBase {
     vm.invoke(validateUserFileBackup);
   }
 
-  protected long setBackupFiles(final VM vm) {
+  private long setBackupFiles(final VM vm) {
     SerializableCallable setUserBackups = new SerializableCallable("set user backups") {
       public Object call() {
         final int pid = DUnitEnv.get().getPid();
@@ -595,7 +561,7 @@ public class BackupDUnitTest extends PersistentPartitionedRegionTestBase {
         File test1 = new File(vmdir, "test1");
         File test2 = new File(test1, "test2");
         File mytext = new File(test2, "my.txt");
-        final ArrayList<File> backuplist = new ArrayList<File>();
+        final ArrayList<File> backuplist = new ArrayList<>();
         test2.mkdirs();
         PrintStream ps = null;
         try {
@@ -619,7 +585,7 @@ public class BackupDUnitTest extends PersistentPartitionedRegionTestBase {
     return (long) vm.invoke(setUserBackups);
   }
 
-  protected void verifyUserFileRestored(VM vm, final long lm) {
+  private void verifyUserFileRestored(VM vm, final long lm) {
     vm.invoke(new SerializableRunnable() {
       public void run() {
         final int pid = DUnitEnv.get().getPid();
@@ -640,8 +606,6 @@ public class BackupDUnitTest extends PersistentPartitionedRegionTestBase {
           BufferedReader bin = new BufferedReader(fr);
           String content = bin.readLine();
           assertTrue(content.equals("" + pid));
-        } catch (FileNotFoundException e) {
-          fail(e.getMessage());
         } catch (IOException e) {
           fail(e.getMessage());
         }
@@ -649,7 +613,7 @@ public class BackupDUnitTest extends PersistentPartitionedRegionTestBase {
     });
   }
 
-  protected AsyncInvocation createPersistentRegionAsync(final VM vm) {
+  private AsyncInvocation createPersistentRegionAsync(final VM vm) {
     SerializableRunnable createRegion = new SerializableRunnable("Create persistent region") {
       public void run() {
         Cache cache = getCache();
@@ -670,7 +634,7 @@ public class BackupDUnitTest extends PersistentPartitionedRegionTestBase {
         dsf = cache.createDiskStoreFactory();
         dsf.setDiskDirs(getDiskDirs(getUniqueName() + 2));
         dsf.setMaxOplogSize(1);
-        ds = dsf.create(getUniqueName() + 2);
+        dsf.create(getUniqueName() + 2);
         rf.setDiskStoreName(getUniqueName() + 2);
         rf.create("region2");
       }
@@ -678,7 +642,7 @@ public class BackupDUnitTest extends PersistentPartitionedRegionTestBase {
     return vm.invokeAsync(createRegion);
   }
 
-  protected void createOverflowRegion(final VM vm) {
+  private void createOverflowRegion(final VM vm) {
     SerializableRunnable createRegion = new SerializableRunnable("Create persistent region") {
       public void run() {
         Cache cache = getCache();
@@ -760,14 +724,14 @@ public class BackupDUnitTest extends PersistentPartitionedRegionTestBase {
       public Object call() throws Exception {
         Cache cache = getCache();
         PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionName);
-        return new TreeSet<Integer>(region.getDataStore().getAllLocalBucketIds());
+        return new TreeSet<>(region.getDataStore().getAllLocalBucketIds());
       }
     };
 
     return (Set<Integer>) vm0.invoke(getBuckets);
   }
 
-  public File[] getDiskDirs(String dsName) {
+  private File[] getDiskDirs(String dsName) {
     File[] dirs = getDiskDirs();
     File[] diskStoreDirs = new File[1];
     diskStoreDirs[0] = new File(dirs[0], dsName);
@@ -775,7 +739,7 @@ public class BackupDUnitTest extends PersistentPartitionedRegionTestBase {
     return diskStoreDirs;
   }
 
-  protected DataPolicy getDataPolicy() {
+  private DataPolicy getDataPolicy() {
     return DataPolicy.PERSISTENT_PARTITION;
   }