You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2017/08/28 22:21:55 UTC

geode git commit: GEODE-3448: Implement and expose parallel snapshot import

Repository: geode
Updated Branches:
  refs/heads/develop 4a5c56eb8 -> ca2f20b86


GEODE-3448: Implement and expose parallel snapshot import

This closes #721


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/ca2f20b8
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/ca2f20b8
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/ca2f20b8

Branch: refs/heads/develop
Commit: ca2f20b86cb574a9fbc67146efd40c8246d8f685
Parents: 4a5c56e
Author: Nick Reich <nr...@pivotal.io>
Authored: Thu Aug 17 16:29:45 2017 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Mon Aug 28 15:21:11 2017 -0700

----------------------------------------------------------------------
 .../geode/cache/snapshot/SnapshotIterator.java  |   2 +-
 .../snapshot/CacheSnapshotServiceImpl.java      |   2 +-
 .../snapshot/ParallelSnapshotFileMapper.java    |   3 +-
 .../snapshot/RegionSnapshotServiceImpl.java     |  50 ++++-----
 .../cache/snapshot/CacheSnapshotJUnitTest.java  |  22 ++--
 .../snapshot/ParallelSnapshotDUnitTest.java     |  98 +++++++++++------
 .../cache/snapshot/RegionSnapshotJUnitTest.java | 109 ++++++++-----------
 .../geode/cache/snapshot/SnapshotTestCase.java  |  56 ++++------
 .../cache/snapshot/TestSnapshotFileMapper.java  |  14 +--
 .../cache/snapshot/WanSnapshotJUnitTest.java    |  17 +--
 .../ParallelSnapshotFileMapperTest.java         |   7 +-
 11 files changed, 185 insertions(+), 195 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/ca2f20b8/geode-core/src/main/java/org/apache/geode/cache/snapshot/SnapshotIterator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/cache/snapshot/SnapshotIterator.java b/geode-core/src/main/java/org/apache/geode/cache/snapshot/SnapshotIterator.java
index 767e63d..7022531 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/snapshot/SnapshotIterator.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/snapshot/SnapshotIterator.java
@@ -29,7 +29,7 @@ import java.util.Map.Entry;
  * 
  * @since GemFire 7.0
  */
-public interface SnapshotIterator<K, V> {
+public interface SnapshotIterator<K, V> extends AutoCloseable {
   /**
    * Returns true if there are more elements in the iteration.
    * 

http://git-wip-us.apache.org/repos/asf/geode/blob/ca2f20b8/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/CacheSnapshotServiceImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/CacheSnapshotServiceImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/CacheSnapshotServiceImpl.java
index c35b413..b3c920c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/CacheSnapshotServiceImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/CacheSnapshotServiceImpl.java
@@ -42,7 +42,7 @@ public class CacheSnapshotServiceImpl implements CacheSnapshotService {
 
   @Override
   public SnapshotOptions<Object, Object> createOptions() {
-    return new SnapshotOptionsImpl<Object, Object>();
+    return new SnapshotOptionsImpl<>();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/geode/blob/ca2f20b8/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/ParallelSnapshotFileMapper.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/ParallelSnapshotFileMapper.java b/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/ParallelSnapshotFileMapper.java
index 8e7a4c2..e86dcc2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/ParallelSnapshotFileMapper.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/ParallelSnapshotFileMapper.java
@@ -38,8 +38,7 @@ public class ParallelSnapshotFileMapper implements SnapshotFileMapper {
 
   @Override
   public File[] mapImportPath(DistributedMember member, File snapshot) {
-    // parallel import is not yet supported
-    throw new UnsupportedOperationException();
+    return new File[] {snapshot};
   }
 
   private String getBaseName(File snapshot) {

http://git-wip-us.apache.org/repos/asf/geode/blob/ca2f20b8/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/RegionSnapshotServiceImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/RegionSnapshotServiceImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/RegionSnapshotServiceImpl.java
index c5e67ac..fb21594 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/RegionSnapshotServiceImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/RegionSnapshotServiceImpl.java
@@ -46,7 +46,6 @@ import org.apache.geode.internal.cache.snapshot.SnapshotPacket.SnapshotRecord;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 
 import java.io.File;
-import java.io.FileFilter;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.io.Serializable;
@@ -60,6 +59,8 @@ import java.util.concurrent.Future;
 
 import static org.apache.geode.distributed.internal.InternalDistributedSystem.getLoggerI18n;
 
+import org.apache.logging.log4j.LogManager;
+
 /**
  * Provides an implementation for region snapshots.
  * 
@@ -90,12 +91,7 @@ public class RegionSnapshotServiceImpl<K, V> implements RegionSnapshotService<K,
         return new File[] {snapshot};
       }
 
-      return snapshot.listFiles(new FileFilter() {
-        @Override
-        public boolean accept(File pathname) {
-          return !pathname.isDirectory();
-        }
-      });
+      return snapshot.listFiles(pathname -> !pathname.isDirectory());
     }
   };
 
@@ -141,7 +137,7 @@ public class RegionSnapshotServiceImpl<K, V> implements RegionSnapshotService<K,
 
   @Override
   public SnapshotOptions<K, V> createOptions() {
-    return new SnapshotOptionsImpl<K, V>();
+    return new SnapshotOptionsImpl<>();
   }
 
   @Override
@@ -158,7 +154,7 @@ public class RegionSnapshotServiceImpl<K, V> implements RegionSnapshotService<K,
     }
 
     if (shouldRunInParallel(options)) {
-      snapshotInParallel(new ParallelArgs<K, V>(snapshot, format, options),
+      snapshotInParallel(new ParallelArgs<>(snapshot, format, options),
           new ParallelExportFunction<K, V>());
     } else {
       exportOnMember(snapshot, format, options);
@@ -176,9 +172,8 @@ public class RegionSnapshotServiceImpl<K, V> implements RegionSnapshotService<K,
       throws IOException, ClassNotFoundException {
 
     if (shouldRunInParallel(options)) {
-      snapshotInParallel(new ParallelArgs<K, V>(snapshot, format, options),
-          new ParallelImportFunction<K, V>());
-      return;
+      snapshotInParallel(new ParallelArgs<>(snapshot, format, options),
+          new ParallelImportFunction<>());
 
     } else {
       importOnMember(snapshot, format, options);
@@ -241,12 +236,12 @@ public class RegionSnapshotServiceImpl<K, V> implements RegionSnapshotService<K,
 
     // Would be interesting to use a PriorityQueue ordered on isDone()
     // but this is probably close enough in practice.
-    LinkedList<Future<?>> puts = new LinkedList<Future<?>>();
+    LinkedList<Future<?>> puts = new LinkedList<>();
     GFSnapshotImporter in = new GFSnapshotImporter(snapshot);
 
     try {
       int bufferSize = 0;
-      Map<K, V> buffer = new HashMap<K, V>();
+      Map<K, V> buffer = new HashMap<>();
 
       SnapshotRecord record;
       while ((record = in.readSnapshotRecord()) != null) {
@@ -286,14 +281,10 @@ public class RegionSnapshotServiceImpl<K, V> implements RegionSnapshotService<K,
               puts.removeFirst().get();
             }
 
-            final Map<K, V> copy = new HashMap<K, V>(buffer);
+            final Map<K, V> copy = new HashMap<>(buffer);
             Future<?> f = GemFireCacheImpl.getExisting("Importing region from snapshot")
-                .getDistributionManager().getWaitingThreadPool().submit(new Runnable() {
-                  @Override
-                  public void run() {
-                    local.basicImportPutAll(copy, !options.shouldInvokeCallbacks());
-                  }
-                });
+                .getDistributionManager().getWaitingThreadPool().submit((Runnable) () -> local
+                    .basicImportPutAll(copy, !options.shouldInvokeCallbacks()));
 
             puts.addLast(f);
             buffer.clear();
@@ -400,12 +391,12 @@ public class RegionSnapshotServiceImpl<K, V> implements RegionSnapshotService<K,
   static <K, V> Exporter<K, V> createExporter(Region<?, ?> region, SnapshotOptions<K, V> options) {
     String pool = region.getAttributes().getPoolName();
     if (pool != null) {
-      return new ClientExporter<K, V>(PoolManager.find(pool));
+      return new ClientExporter<>(PoolManager.find(pool));
 
     } else if (InternalDistributedSystem.getAnyInstance().isLoner()
         || region.getAttributes().getDataPolicy().equals(DataPolicy.NORMAL)
         || region.getAttributes().getDataPolicy().equals(DataPolicy.PRELOADED)
-        || region instanceof LocalDataSet || (((SnapshotOptionsImpl<K, V>) options).isParallelMode()
+        || region instanceof LocalDataSet || (options.isParallelMode()
             && region.getAttributes().getDataPolicy().withPartitioning())) {
 
       // Avoid function execution:
@@ -413,10 +404,10 @@ public class RegionSnapshotServiceImpl<K, V> implements RegionSnapshotService<K,
       // for NORMAL/PRELOAD since they don't support fn execution
       // for LocalDataSet since we're already running a fn
       // for parallel ops since we're already running a fn
-      return new LocalExporter<K, V>();
+      return new LocalExporter<>();
     }
 
-    return new WindowedExporter<K, V>();
+    return new WindowedExporter<>();
   }
 
   static LocalRegion getLocalRegion(Region<?, ?> region) {
@@ -566,11 +557,12 @@ public class RegionSnapshotServiceImpl<K, V> implements RegionSnapshotService<K,
 
         if (files != null) {
           for (File f : files) {
-            if (f.isDirectory() || !f.exists()) {
-              throw new IOException(
-                  LocalizedStrings.Snapshot_INVALID_IMPORT_FILE.toLocalizedString(f));
+            if (f.exists()) {
+              local.getSnapshotService().load(f, args.getFormat(), args.getOptions());
+            } else {
+              LogManager.getLogger(RegionSnapshotServiceImpl.class)
+                  .info("Nothing to import as location does not exist: " + f.getAbsolutePath());
             }
-            local.getSnapshotService().load(f, args.getFormat(), args.getOptions());
           }
         }
         context.getResultSender().lastResult(Boolean.TRUE);

http://git-wip-us.apache.org/repos/asf/geode/blob/ca2f20b8/geode-core/src/test/java/org/apache/geode/cache/snapshot/CacheSnapshotJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/cache/snapshot/CacheSnapshotJUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/snapshot/CacheSnapshotJUnitTest.java
index e310042..8999cda 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/snapshot/CacheSnapshotJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/snapshot/CacheSnapshotJUnitTest.java
@@ -35,14 +35,14 @@ public class CacheSnapshotJUnitTest extends SnapshotTestCase {
   public void testExportAndImport() throws Exception {
     for (final RegionType rt : RegionType.values()) {
       for (final SerializationType st : SerializationType.values()) {
-        Region<Integer, MyObject> region =
-            rgen.createRegion(cache, ds.getName(), rt, "test-" + rt.name() + "-" + st.name());
+        Region<Integer, MyObject> region = regionGenerator.createRegion(cache, diskStore.getName(),
+            rt, "test-" + rt.name() + "-" + st.name());
         region.putAll(createExpected(st));
       }
     }
 
     // save all regions
-    cache.getSnapshotService().save(snaps, SnapshotFormat.GEMFIRE);
+    cache.getSnapshotService().save(getSnapshotDirectory(), SnapshotFormat.GEMFIRE);
 
     for (final RegionType rt : RegionType.values()) {
       for (final SerializationType st : SerializationType.values()) {
@@ -51,12 +51,12 @@ public class CacheSnapshotJUnitTest extends SnapshotTestCase {
         Region<Integer, MyObject> region = cache.getRegion(name);
         region.destroyRegion();
 
-        rgen.createRegion(cache, ds.getName(), rt, name);
+        regionGenerator.createRegion(cache, diskStore.getName(), rt, name);
       }
     }
 
     // load all regions
-    cache.getSnapshotService().load(snaps, SnapshotFormat.GEMFIRE);
+    cache.getSnapshotService().load(getSnapshotDirectory(), SnapshotFormat.GEMFIRE);
 
     for (final RegionType rt : RegionType.values()) {
       for (final SerializationType st : SerializationType.values()) {
@@ -73,8 +73,8 @@ public class CacheSnapshotJUnitTest extends SnapshotTestCase {
   public void testFilter() throws Exception {
     for (final RegionType rt : RegionType.values()) {
       for (final SerializationType st : SerializationType.values()) {
-        Region<Integer, MyObject> region =
-            rgen.createRegion(cache, ds.getName(), rt, "test-" + rt.name() + "-" + st.name());
+        Region<Integer, MyObject> region = regionGenerator.createRegion(cache, diskStore.getName(),
+            rt, "test-" + rt.name() + "-" + st.name());
         region.putAll(createExpected(st));
       }
     }
@@ -88,18 +88,20 @@ public class CacheSnapshotJUnitTest extends SnapshotTestCase {
     // save even entries
     CacheSnapshotService css = cache.getSnapshotService();
     SnapshotOptions<Object, Object> options = css.createOptions().setFilter(even);
-    cache.getSnapshotService().save(snaps, SnapshotFormat.GEMFIRE, options);
+    cache.getSnapshotService().save(getSnapshotDirectory(), SnapshotFormat.GEMFIRE, options);
 
     for (final RegionType rt : RegionType.values()) {
       for (final SerializationType st : SerializationType.values()) {
         Region region = cache.getRegion("test-" + rt.name() + "-" + st.name());
         region.destroyRegion();
-        rgen.createRegion(cache, ds.getName(), rt, "test-" + rt.name() + "-" + st.name());
+        regionGenerator.createRegion(cache, diskStore.getName(), rt,
+            "test-" + rt.name() + "-" + st.name());
       }
     }
 
     // load odd entries
-    File[] snapshots = snaps.listFiles(pathname -> pathname.getName().startsWith("snapshot-"));
+    File[] snapshots =
+        getSnapshotDirectory().listFiles(pathname -> pathname.getName().startsWith("snapshot-"));
 
     options = css.createOptions().setFilter(odd);
     css.load(snapshots, SnapshotFormat.GEMFIRE, options);

http://git-wip-us.apache.org/repos/asf/geode/blob/ca2f20b8/geode-core/src/test/java/org/apache/geode/cache/snapshot/ParallelSnapshotDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/cache/snapshot/ParallelSnapshotDUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/snapshot/ParallelSnapshotDUnitTest.java
index 541a603..5b59674 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/snapshot/ParallelSnapshotDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/snapshot/ParallelSnapshotDUnitTest.java
@@ -17,6 +17,8 @@ package org.apache.geode.cache.snapshot;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import org.junit.Before;
+import org.junit.Rule;
 import org.junit.experimental.categories.Category;
 import org.junit.Test;
 
@@ -28,6 +30,7 @@ import java.io.IOException;
 import java.util.Arrays;
 
 import com.examples.snapshot.MyPdxSerializer;
+
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.Region;
@@ -36,43 +39,79 @@ import org.apache.geode.cache.snapshot.SnapshotOptions.SnapshotFormat;
 import org.apache.geode.internal.cache.snapshot.SnapshotOptionsImpl;
 import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.SerializableCallable;
+import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
 
 @Category(DistributedTest.class)
 public class ParallelSnapshotDUnitTest extends JUnit4CacheTestCase {
   private static final byte[] ffff = new byte[] {0xf, 0xf, 0xf, 0xf};
   private static final byte[] eeee = new byte[] {0xe, 0xe, 0xe, 0xe};
+  private static final int DATA_POINTS = 100;
+
+  private File directory;
+
+  @Rule
+  public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder();
+
+  @Before
+  public void setup() throws IOException {
+    directory = temporaryFolder.newFolder();
+  }
 
   @Test
   public void testExportImport() throws Exception {
+    loadCache();
     doExport(false);
     doImport(false);
   }
 
   @Test
   public void testExportWithSequentialImport() throws Exception {
+    loadCache();
     doExport(false);
     doSequentialImport();
   }
 
   @Test
   public void testExportImportErrors() throws Exception {
+    loadCache();
     try {
       doExport(true);
-      fail();
+      fail("Expected exception not thrown");
     } catch (Exception e) {
+      // do nothing on expected exception from test
     }
 
     doExport(false);
     try {
       doImport(true);
-      fail();
+      fail("Expected exception not thrown");
     } catch (Exception e) {
+      // do nothing on expected exception from test
     }
   }
 
+  /**
+   * This test ensures that parallel import succeeds even when each node does not have a file to
+   * import (import cluster larger than export one)
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testImportOnLargerCluster() throws Exception {
+    loadCache(2);
+    doExport(false, 2);
+    getCache().getRegion("test").destroyRegion();
+    loadCache();
+    doImport(false);
+  }
+
   private void doExport(boolean explode) throws Exception {
+    doExport(explode, Host.getHost(0).getVMCount());
+  }
+
+  private void doExport(boolean explode, int nodes) throws Exception {
     Region region = getCache().getRegion("test");
-    for (int i = 0; i < 1000; i++) {
+    for (int i = 0; i < DATA_POINTS; i++) {
       region.put(i, ffff);
     }
 
@@ -85,7 +124,7 @@ public class ParallelSnapshotDUnitTest extends JUnit4CacheTestCase {
     opt.setParallelMode(true);
     opt.setMapper(mapper);
 
-    File f = new File("mysnap.gfd").getAbsoluteFile();
+    File f = new File(directory, "mysnap.gfd").getAbsoluteFile();
     rss.save(f, SnapshotFormat.GEMFIRE, opt);
 
     mapper.setShouldExplode(false);
@@ -100,7 +139,7 @@ public class ParallelSnapshotDUnitTest extends JUnit4CacheTestCase {
       }
     };
 
-    forEachVm(check, true);
+    forEachVm(check, true, nodes);
   }
 
   private void doImport(boolean explode) throws ClassNotFoundException, IOException {
@@ -114,14 +153,12 @@ public class ParallelSnapshotDUnitTest extends JUnit4CacheTestCase {
     opt.setParallelMode(true);
     opt.setMapper(mapper);
 
-    final File f = new File("mysnap.gfd").getAbsoluteFile();
-
-    for (int i = 0; i < 1000; i++) {
+    for (int i = 0; i < DATA_POINTS; i++) {
       region.put(i, eeee);
     }
 
-    rss.load(f, SnapshotFormat.GEMFIRE, opt);
-    for (int i = 0; i < 1000; i++) {
+    rss.load(directory, SnapshotFormat.GEMFIRE, opt);
+    for (int i = 0; i < DATA_POINTS; i++) {
       assertTrue(Arrays.equals(ffff, (byte[]) region.get(i)));
     }
   }
@@ -132,48 +169,43 @@ public class ParallelSnapshotDUnitTest extends JUnit4CacheTestCase {
     SnapshotOptionsImpl opt = (SnapshotOptionsImpl) rss.createOptions();
 
 
-    for (int i = 0; i < 1000; i++) {
+    for (int i = 0; i < DATA_POINTS; i++) {
       region.put(i, eeee);
     }
-
-    final File file = new File("").getAbsoluteFile();
-    rss.load(file, SnapshotFormat.GEMFIRE, opt);
-    for (int i = 0; i < 1000; i++) {
+    int vmCount = Host.getHost(0).getVMCount();
+    for (int i = 0; i <= vmCount; i++) {
+      rss.load(new File(directory, Integer.toString(i)), SnapshotFormat.GEMFIRE, opt);
+    }
+    for (int i = 0; i < DATA_POINTS; i++) {
       assertTrue(Arrays.equals(ffff, (byte[]) region.get(i)));
     }
   }
 
-  public Object forEachVm(SerializableCallable call, boolean local) throws Exception {
+  private void forEachVm(SerializableCallable call, boolean local) throws Exception {
+    this.forEachVm(call, local, Integer.MAX_VALUE);
+  }
+
+  private void forEachVm(SerializableCallable call, boolean local, int maxNodes) throws Exception {
     Host host = Host.getHost(0);
-    int vms = host.getVMCount();
+    int vms = Math.min(host.getVMCount(), maxNodes);
 
     for (int i = 0; i < vms; ++i) {
       host.getVM(i).invoke(call);
     }
 
     if (local) {
-      return call.call();
+      call.call();
     }
-    return null;
   }
 
   @Override
-  public final void postSetUp() throws Exception {
-    loadCache();
-  }
-
-  @Override
-  public final void postTearDownCacheTestCase() throws Exception {
-    File[] snaps = new File(".").listFiles((dir, name) -> name.startsWith("mysnap"));
+  public final void postSetUp() throws Exception {}
 
-    if (snaps != null) {
-      for (File f : snaps) {
-        f.delete();
-      }
-    }
+  private void loadCache() throws Exception {
+    this.loadCache(Integer.MAX_VALUE);
   }
 
-  public void loadCache() throws Exception {
+  private void loadCache(int maxNodes) throws Exception {
     SerializableCallable setup = new SerializableCallable() {
       @Override
       public Object call() throws Exception {
@@ -187,6 +219,6 @@ public class ParallelSnapshotDUnitTest extends JUnit4CacheTestCase {
       }
     };
 
-    forEachVm(setup, true);
+    forEachVm(setup, true, maxNodes);
   }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/ca2f20b8/geode-core/src/test/java/org/apache/geode/cache/snapshot/RegionSnapshotJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/cache/snapshot/RegionSnapshotJUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/snapshot/RegionSnapshotJUnitTest.java
index 706067a..46c491c 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/snapshot/RegionSnapshotJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/snapshot/RegionSnapshotJUnitTest.java
@@ -40,29 +40,27 @@ import static org.junit.Assert.*;
 
 @Category(IntegrationTest.class)
 public class RegionSnapshotJUnitTest extends SnapshotTestCase {
-  private File f;
+  private File snapshotFile;
 
   @Test
   public void testExportAndReadSnapshot() throws Exception {
-    for (final RegionType rt : RegionType.values()) {
+    for (final RegionType type : RegionType.values()) {
       for (final SerializationType st : SerializationType.values()) {
-        String name = "test-" + rt.name() + "-" + st.name();
-        Region<Integer, MyObject> region = rgen.createRegion(cache, ds.getName(), rt, name);
+        String name = "test-" + type.name() + "-" + st.name();
+        Region<Integer, MyObject> region =
+            regionGenerator.createRegion(cache, diskStore.getName(), type, name);
         final Map<Integer, MyObject> expected = createExpected(st);
 
         region.putAll(expected);
-        region.getSnapshotService().save(f, SnapshotFormat.GEMFIRE);
+        region.getSnapshotService().save(snapshotFile, SnapshotFormat.GEMFIRE);
 
-        final Map<Integer, Object> read = new HashMap<Integer, Object>();
-        SnapshotIterator<Integer, Object> iter = SnapshotReader.read(f);
-        try {
+        final Map<Integer, Object> read = new HashMap<>();
+        try (SnapshotIterator<Integer, Object> iter = SnapshotReader.read(snapshotFile)) {
           while (iter.hasNext()) {
             Entry<Integer, Object> entry = iter.next();
             read.put(entry.getKey(), entry.getValue());
           }
-          assertEquals("Comparison failure for " + rt.name() + "/" + st.name(), expected, read);
-        } finally {
-          iter.close();
+          assertEquals("Comparison failure for " + type.name() + "/" + st.name(), expected, read);
         }
       }
     }
@@ -73,14 +71,15 @@ public class RegionSnapshotJUnitTest extends SnapshotTestCase {
     for (final RegionType rt : RegionType.values()) {
       for (final SerializationType st : SerializationType.values()) {
         String name = "test-" + rt.name() + "-" + st.name();
-        Region<Integer, MyObject> region = rgen.createRegion(cache, ds.getName(), rt, name);
+        Region<Integer, MyObject> region =
+            regionGenerator.createRegion(cache, diskStore.getName(), rt, name);
         final Map<Integer, MyObject> expected = createExpected(st);
 
         region.putAll(expected);
-        region.getSnapshotService().save(f, SnapshotFormat.GEMFIRE);
+        region.getSnapshotService().save(snapshotFile, SnapshotFormat.GEMFIRE);
 
         region.destroyRegion();
-        region = rgen.createRegion(cache, ds.getName(), rt, name);
+        region = regionGenerator.createRegion(cache, diskStore.getName(), rt, name);
 
         region.getAttributesMutator().setCacheWriter(new CacheWriterAdapter<Integer, MyObject>() {
           @Override
@@ -98,7 +97,7 @@ public class RegionSnapshotJUnitTest extends SnapshotTestCase {
               }
             });
 
-        region.getSnapshotService().load(f, SnapshotFormat.GEMFIRE);
+        region.getSnapshotService().load(snapshotFile, SnapshotFormat.GEMFIRE);
 
         assertEquals("Comparison failure for " + rt.name() + "/" + st.name(), expected.entrySet(),
             region.entrySet());
@@ -109,37 +108,30 @@ public class RegionSnapshotJUnitTest extends SnapshotTestCase {
 
   @Test
   public void testFilter() throws Exception {
-    SnapshotFilter<Integer, MyObject> even = new SnapshotFilter<Integer, MyObject>() {
-      @Override
-      public boolean accept(Entry<Integer, MyObject> entry) {
-        return entry.getKey() % 2 == 0;
-      }
-    };
+    SnapshotFilter<Integer, MyObject> even =
+        (SnapshotFilter<Integer, MyObject>) entry -> entry.getKey() % 2 == 0;
 
-    SnapshotFilter<Integer, MyObject> odd = new SnapshotFilter<Integer, MyObject>() {
-      @Override
-      public boolean accept(Entry<Integer, MyObject> entry) {
-        return entry.getKey() % 2 == 1;
-      }
-    };
+    SnapshotFilter<Integer, MyObject> odd =
+        (SnapshotFilter<Integer, MyObject>) entry -> entry.getKey() % 2 == 1;
 
     for (final RegionType rt : RegionType.values()) {
       for (final SerializationType st : SerializationType.values()) {
         String name = "test-" + rt.name() + "-" + st.name();
-        Region<Integer, MyObject> region = rgen.createRegion(cache, ds.getName(), rt, name);
+        Region<Integer, MyObject> region =
+            regionGenerator.createRegion(cache, diskStore.getName(), rt, name);
         final Map<Integer, MyObject> expected = createExpected(st);
 
         region.putAll(expected);
         RegionSnapshotService<Integer, MyObject> rss = region.getSnapshotService();
         SnapshotOptions<Integer, MyObject> options = rss.createOptions().setFilter(even);
-        rss.save(f, SnapshotFormat.GEMFIRE, options);
+        rss.save(snapshotFile, SnapshotFormat.GEMFIRE, options);
 
         region.destroyRegion();
-        region = rgen.createRegion(cache, ds.getName(), rt, name);
+        region = regionGenerator.createRegion(cache, diskStore.getName(), rt, name);
 
         rss = region.getSnapshotService();
         options = rss.createOptions().setFilter(odd);
-        rss.load(f, SnapshotFormat.GEMFIRE, options);
+        rss.load(snapshotFile, SnapshotFormat.GEMFIRE, options);
 
         assertEquals("Comparison failure for " + rt.name() + "/" + st.name(), 0, region.size());
       }
@@ -148,17 +140,15 @@ public class RegionSnapshotJUnitTest extends SnapshotTestCase {
 
   @Test
   public void testFilterExportException() throws Exception {
-    SnapshotFilter<Integer, MyObject> oops = new SnapshotFilter<Integer, MyObject>() {
-      @Override
-      public boolean accept(Entry<Integer, MyObject> entry) {
-        throw new RuntimeException();
-      }
+    SnapshotFilter<Integer, MyObject> oops = (SnapshotFilter<Integer, MyObject>) entry -> {
+      throw new RuntimeException();
     };
 
     for (final RegionType rt : RegionType.values()) {
       for (final SerializationType st : SerializationType.values()) {
         String name = "test-" + rt.name() + "-" + st.name();
-        Region<Integer, MyObject> region = rgen.createRegion(cache, ds.getName(), rt, name);
+        Region<Integer, MyObject> region =
+            regionGenerator.createRegion(cache, diskStore.getName(), rt, name);
         final Map<Integer, MyObject> expected = createExpected(st);
 
         region.putAll(expected);
@@ -167,17 +157,17 @@ public class RegionSnapshotJUnitTest extends SnapshotTestCase {
 
         boolean caughtException = false;
         try {
-          rss.save(f, SnapshotFormat.GEMFIRE, options);
+          rss.save(snapshotFile, SnapshotFormat.GEMFIRE, options);
         } catch (RuntimeException e) {
           caughtException = true;
         }
         assertTrue(caughtException);
 
         region.destroyRegion();
-        region = rgen.createRegion(cache, ds.getName(), rt, name);
+        region = regionGenerator.createRegion(cache, diskStore.getName(), rt, name);
 
         rss = region.getSnapshotService();
-        rss.load(f, SnapshotFormat.GEMFIRE, options);
+        rss.load(snapshotFile, SnapshotFormat.GEMFIRE, options);
 
         assertEquals("Comparison failure for " + rt.name() + "/" + st.name(), 0, region.size());
       }
@@ -186,32 +176,30 @@ public class RegionSnapshotJUnitTest extends SnapshotTestCase {
 
   @Test
   public void testFilterImportException() throws Exception {
-    SnapshotFilter<Integer, MyObject> oops = new SnapshotFilter<Integer, MyObject>() {
-      @Override
-      public boolean accept(Entry<Integer, MyObject> entry) {
-        throw new RuntimeException();
-      }
+    SnapshotFilter<Integer, MyObject> oops = (SnapshotFilter<Integer, MyObject>) entry -> {
+      throw new RuntimeException();
     };
 
     for (final RegionType rt : RegionType.values()) {
       for (final SerializationType st : SerializationType.values()) {
         String name = "test-" + rt.name() + "-" + st.name();
-        Region<Integer, MyObject> region = rgen.createRegion(cache, ds.getName(), rt, name);
+        Region<Integer, MyObject> region =
+            regionGenerator.createRegion(cache, diskStore.getName(), rt, name);
         final Map<Integer, MyObject> expected = createExpected(st);
 
         region.putAll(expected);
         RegionSnapshotService<Integer, MyObject> rss = region.getSnapshotService();
-        rss.save(f, SnapshotFormat.GEMFIRE);
+        rss.save(snapshotFile, SnapshotFormat.GEMFIRE);
 
         region.destroyRegion();
-        region = rgen.createRegion(cache, ds.getName(), rt, name);
+        region = regionGenerator.createRegion(cache, diskStore.getName(), rt, name);
 
         rss = region.getSnapshotService();
         SnapshotOptions<Integer, MyObject> options = rss.createOptions().setFilter(oops);
 
         boolean caughtException = false;
         try {
-          rss.load(f, SnapshotFormat.GEMFIRE, options);
+          rss.load(snapshotFile, SnapshotFormat.GEMFIRE, options);
         } catch (RuntimeException e) {
           caughtException = true;
         }
@@ -225,14 +213,15 @@ public class RegionSnapshotJUnitTest extends SnapshotTestCase {
   @Test
   public void testInvalidate() throws Exception {
     Region<Integer, MyObject> region =
-        rgen.createRegion(cache, ds.getName(), RegionType.REPLICATE, "test");
-    MyObject obj = rgen.createData(SerializationType.SERIALIZABLE, 1, "invalidated value");
+        regionGenerator.createRegion(cache, diskStore.getName(), RegionType.REPLICATE, "test");
+    MyObject obj =
+        regionGenerator.createData(SerializationType.SERIALIZABLE, 1, "invalidated value");
 
     region.put(1, obj);
     region.invalidate(1);
 
-    region.getSnapshotService().save(f, SnapshotFormat.GEMFIRE);
-    region.getSnapshotService().load(f, SnapshotFormat.GEMFIRE);
+    region.getSnapshotService().save(snapshotFile, SnapshotFormat.GEMFIRE);
+    region.getSnapshotService().load(snapshotFile, SnapshotFormat.GEMFIRE);
 
     assertTrue(region.containsKey(1));
     assertFalse(region.containsValueForKey(1));
@@ -251,11 +240,12 @@ public class RegionSnapshotJUnitTest extends SnapshotTestCase {
     SerializationType st = SerializationType.PDX_SERIALIZER;
 
     String name = "test-" + rt.name() + "-" + st.name() + "-dsid";
-    Region<Integer, MyObject> region = rgen.createRegion(cache, ds.getName(), rt, name);
+    Region<Integer, MyObject> region =
+        regionGenerator.createRegion(cache, diskStore.getName(), rt, name);
     final Map<Integer, MyObject> expected = createExpected(st);
 
     region.putAll(expected);
-    region.getSnapshotService().save(f, SnapshotFormat.GEMFIRE);
+    region.getSnapshotService().save(snapshotFile, SnapshotFormat.GEMFIRE);
 
     cache.close();
 
@@ -264,22 +254,19 @@ public class RegionSnapshotJUnitTest extends SnapshotTestCase {
         .setPdxSerializer(new MyPdxSerializer()).set(DISTRIBUTED_SYSTEM_ID, "100");
     cache = cf2.create();
 
-    final Map<Integer, Object> read = new HashMap<Integer, Object>();
-    SnapshotIterator<Integer, Object> iter = SnapshotReader.read(f);
-    try {
+    final Map<Integer, Object> read = new HashMap<>();
+    try (SnapshotIterator<Integer, Object> iter = SnapshotReader.read(snapshotFile)) {
       while (iter.hasNext()) {
         Entry<Integer, Object> entry = iter.next();
         read.put(entry.getKey(), entry.getValue());
       }
       assertEquals("Comparison failure for " + rt.name() + "/" + st.name(), expected, read);
-    } finally {
-      iter.close();
     }
   }
 
   @Before
   public void setUp() throws Exception {
     super.setUp();
-    f = new File(snaps, "test.snapshot.gfd");
+    snapshotFile = new File(getSnapshotDirectory(), "test.snapshot.gfd");
   }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/ca2f20b8/geode-core/src/test/java/org/apache/geode/cache/snapshot/SnapshotTestCase.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/cache/snapshot/SnapshotTestCase.java b/geode-core/src/test/java/org/apache/geode/cache/snapshot/SnapshotTestCase.java
index 3898925..d0040f7 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/snapshot/SnapshotTestCase.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/snapshot/SnapshotTestCase.java
@@ -21,69 +21,55 @@ import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.DiskStore;
 import org.apache.geode.cache.snapshot.RegionGenerator.SerializationType;
+
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
-import java.io.FilenameFilter;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Random;
 
 import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
 
 public class SnapshotTestCase {
-  protected File store;
-  protected File snaps;
+  private File snapshotDirectory;
   protected Cache cache;
-  protected RegionGenerator rgen;
-  protected DiskStore ds;
+  RegionGenerator regionGenerator;
+  DiskStore diskStore;
+
+  @Rule
+  public TemporaryFolder baseDir = new TemporaryFolder();
 
   @Before
   public void setUp() throws Exception {
-    store = new File("store-" + Math.abs(new Random().nextInt()));
-    store.mkdir();
-
-    snaps = new File("snapshots-" + Math.abs(new Random().nextInt()));
-    snaps.mkdir();
+    File storeDirectory = baseDir.newFolder("store");
+    snapshotDirectory = baseDir.newFolder("snapshots");
 
-    rgen = new RegionGenerator();
+    regionGenerator = new RegionGenerator();
 
     CacheFactory cf = new CacheFactory().set(MCAST_PORT, "0").set(LOG_LEVEL, "error");
     cache = cf.create();
 
-    ds = cache.createDiskStoreFactory().setMaxOplogSize(1).setDiskDirs(new File[] {store})
-        .create("snapshotTest");
+    diskStore = cache.createDiskStoreFactory().setMaxOplogSize(1)
+        .setDiskDirs(new File[] {storeDirectory}).create("snapshotTest");
   }
 
   @After
   public void tearDown() throws Exception {
     cache.close();
-    deleteFiles(store);
-    deleteFiles(snaps);
   }
 
-  public Map<Integer, MyObject> createExpected(SerializationType type) {
-    Map<Integer, MyObject> expected = new HashMap<Integer, MyObject>();
-    for (int i = 0; i < 1000; i++) {
-      expected.put(i, rgen.createData(type, i, "The number is " + i));
-    }
-    return expected;
+  File getSnapshotDirectory() {
+    return snapshotDirectory;
   }
 
-  public static void deleteFiles(File dir) {
-    File[] deletes = dir.listFiles(new FilenameFilter() {
-      @Override
-      public boolean accept(File dir, String name) {
-        return true;
-      }
-    });
-
-    if (deletes != null) {
-      for (File f : deletes) {
-        f.delete();
-      }
+  Map<Integer, MyObject> createExpected(SerializationType type) {
+    Map<Integer, MyObject> expected = new HashMap<>();
+    for (int i = 0; i < 1000; i++) {
+      expected.put(i, regionGenerator.createData(type, i, "The number is " + i));
     }
-    dir.delete();
+    return expected;
   }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/ca2f20b8/geode-core/src/test/java/org/apache/geode/cache/snapshot/TestSnapshotFileMapper.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/cache/snapshot/TestSnapshotFileMapper.java b/geode-core/src/test/java/org/apache/geode/cache/snapshot/TestSnapshotFileMapper.java
index 953721f..ba89630 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/snapshot/TestSnapshotFileMapper.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/snapshot/TestSnapshotFileMapper.java
@@ -32,7 +32,9 @@ public class TestSnapshotFileMapper implements SnapshotFileMapper {
     if (shouldExplode) {
       throw new RuntimeException();
     }
-    return new File(snapshot.getParentFile(), mapFilename(snapshot));
+    File directory = new File(snapshot.getParent(), Integer.toString(1 + VM.getCurrentVMNum()));
+    directory.mkdirs();
+    return new File(directory, mapFilename(snapshot));
   }
 
   @Override
@@ -40,15 +42,11 @@ public class TestSnapshotFileMapper implements SnapshotFileMapper {
     if (shouldExplode) {
       throw new RuntimeException();
     }
-
-    File f = new File(snapshot.getParentFile(), mapFilename(snapshot));
-    return new File[] {f};
+    File directory = new File(snapshot, Integer.toString(1 + VM.getCurrentVMNum()));
+    return new File[] {directory};
   }
 
   private String mapFilename(File snapshot) {
-    String filename = snapshot.getName();
-    int suffixLocation = filename.indexOf(RegionSnapshotService.SNAPSHOT_FILE_EXTENSION);
-    return filename.substring(0, suffixLocation) + "-" + VM.getCurrentVMNum()
-        + RegionSnapshotService.SNAPSHOT_FILE_EXTENSION;
+    return snapshot.getName();
   }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/ca2f20b8/geode-core/src/test/java/org/apache/geode/cache/snapshot/WanSnapshotJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/cache/snapshot/WanSnapshotJUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/snapshot/WanSnapshotJUnitTest.java
index 46245c3..b62b7bf 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/snapshot/WanSnapshotJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/snapshot/WanSnapshotJUnitTest.java
@@ -15,12 +15,13 @@
 package org.apache.geode.cache.snapshot;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
 
 import java.io.File;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.awaitility.Awaitility;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -37,7 +38,6 @@ import org.apache.geode.test.junit.categories.IntegrationTest;
 public class WanSnapshotJUnitTest extends SnapshotTestCase {
   private Region<Integer, MyObject> region;
   private WanListener wan;
-  private static final long MAX_WAIT = 5 * 60 * 1000; // 6 minutes
 
   @Test
   public void testWanCallback() throws Exception {
@@ -46,18 +46,12 @@ public class WanSnapshotJUnitTest extends SnapshotTestCase {
       region.put(i, new MyObject(i, "clienttest " + i));
     }
 
-    File snapshot = new File("wan.snapshot.gfd");
+    File snapshot = new File(getSnapshotDirectory(), "wan.snapshot.gfd");
     region.getSnapshotService().save(snapshot, SnapshotFormat.GEMFIRE);
     region.clear();
 
-    long start = System.currentTimeMillis();
-    // wait for the events to drain out
-    while (!wan.ticker.compareAndSet(count, 0)) {
-      Thread.sleep(100);
-      if (System.currentTimeMillis() - start > MAX_WAIT) {
-        fail("Event did not drain in 5 minutes");
-      }
-    }
+    Awaitility.await().atMost(30, TimeUnit.SECONDS).pollInterval(100, TimeUnit.MILLISECONDS)
+        .until(() -> wan.ticker.compareAndSet(count, 0));
 
     region.getSnapshotService().load(snapshot, SnapshotFormat.GEMFIRE);
 
@@ -92,4 +86,3 @@ public class WanSnapshotJUnitTest extends SnapshotTestCase {
   }
 
 }
-

http://git-wip-us.apache.org/repos/asf/geode/blob/ca2f20b8/geode-core/src/test/java/org/apache/geode/internal/cache/snapshot/ParallelSnapshotFileMapperTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/snapshot/ParallelSnapshotFileMapperTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/snapshot/ParallelSnapshotFileMapperTest.java
index 2f045fc..371fc6e 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/snapshot/ParallelSnapshotFileMapperTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/snapshot/ParallelSnapshotFileMapperTest.java
@@ -69,9 +69,10 @@ public class ParallelSnapshotFileMapperTest {
   }
 
   @Test
-  public void mapImportPathIsUnsupported() throws Exception {
-    thrown.expect(UnsupportedOperationException.class);
-    mapper.mapImportPath(null, null);
+  public void mapImportReturnsUnchangedPath() {
+    File file = new File(BASE_LOCATION + FILE_TYPE);
+    File[] mappedFiles = mapper.mapImportPath(null, file);
+    assertEquals(file, mappedFiles[0]);
   }
 
   @Test