You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2017/08/28 22:21:55 UTC
geode git commit: GEODE-3448: Implement and expose parallel snapshot
import
Repository: geode
Updated Branches:
refs/heads/develop 4a5c56eb8 -> ca2f20b86
GEODE-3448: Implement and expose parallel snapshot import
This closes #721
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/ca2f20b8
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/ca2f20b8
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/ca2f20b8
Branch: refs/heads/develop
Commit: ca2f20b86cb574a9fbc67146efd40c8246d8f685
Parents: 4a5c56e
Author: Nick Reich <nr...@pivotal.io>
Authored: Thu Aug 17 16:29:45 2017 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Mon Aug 28 15:21:11 2017 -0700
----------------------------------------------------------------------
.../geode/cache/snapshot/SnapshotIterator.java | 2 +-
.../snapshot/CacheSnapshotServiceImpl.java | 2 +-
.../snapshot/ParallelSnapshotFileMapper.java | 3 +-
.../snapshot/RegionSnapshotServiceImpl.java | 50 ++++-----
.../cache/snapshot/CacheSnapshotJUnitTest.java | 22 ++--
.../snapshot/ParallelSnapshotDUnitTest.java | 98 +++++++++++------
.../cache/snapshot/RegionSnapshotJUnitTest.java | 109 ++++++++-----------
.../geode/cache/snapshot/SnapshotTestCase.java | 56 ++++------
.../cache/snapshot/TestSnapshotFileMapper.java | 14 +--
.../cache/snapshot/WanSnapshotJUnitTest.java | 17 +--
.../ParallelSnapshotFileMapperTest.java | 7 +-
11 files changed, 185 insertions(+), 195 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/ca2f20b8/geode-core/src/main/java/org/apache/geode/cache/snapshot/SnapshotIterator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/cache/snapshot/SnapshotIterator.java b/geode-core/src/main/java/org/apache/geode/cache/snapshot/SnapshotIterator.java
index 767e63d..7022531 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/snapshot/SnapshotIterator.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/snapshot/SnapshotIterator.java
@@ -29,7 +29,7 @@ import java.util.Map.Entry;
*
* @since GemFire 7.0
*/
-public interface SnapshotIterator<K, V> {
+public interface SnapshotIterator<K, V> extends AutoCloseable {
/**
* Returns true if there are more elements in the iteration.
*
http://git-wip-us.apache.org/repos/asf/geode/blob/ca2f20b8/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/CacheSnapshotServiceImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/CacheSnapshotServiceImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/CacheSnapshotServiceImpl.java
index c35b413..b3c920c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/CacheSnapshotServiceImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/CacheSnapshotServiceImpl.java
@@ -42,7 +42,7 @@ public class CacheSnapshotServiceImpl implements CacheSnapshotService {
@Override
public SnapshotOptions<Object, Object> createOptions() {
- return new SnapshotOptionsImpl<Object, Object>();
+ return new SnapshotOptionsImpl<>();
}
@Override
http://git-wip-us.apache.org/repos/asf/geode/blob/ca2f20b8/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/ParallelSnapshotFileMapper.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/ParallelSnapshotFileMapper.java b/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/ParallelSnapshotFileMapper.java
index 8e7a4c2..e86dcc2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/ParallelSnapshotFileMapper.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/ParallelSnapshotFileMapper.java
@@ -38,8 +38,7 @@ public class ParallelSnapshotFileMapper implements SnapshotFileMapper {
@Override
public File[] mapImportPath(DistributedMember member, File snapshot) {
- // parallel import is not yet supported
- throw new UnsupportedOperationException();
+ return new File[] {snapshot};
}
private String getBaseName(File snapshot) {
http://git-wip-us.apache.org/repos/asf/geode/blob/ca2f20b8/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/RegionSnapshotServiceImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/RegionSnapshotServiceImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/RegionSnapshotServiceImpl.java
index c5e67ac..fb21594 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/RegionSnapshotServiceImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/RegionSnapshotServiceImpl.java
@@ -46,7 +46,6 @@ import org.apache.geode.internal.cache.snapshot.SnapshotPacket.SnapshotRecord;
import org.apache.geode.internal.i18n.LocalizedStrings;
import java.io.File;
-import java.io.FileFilter;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.Serializable;
@@ -60,6 +59,8 @@ import java.util.concurrent.Future;
import static org.apache.geode.distributed.internal.InternalDistributedSystem.getLoggerI18n;
+import org.apache.logging.log4j.LogManager;
+
/**
* Provides an implementation for region snapshots.
*
@@ -90,12 +91,7 @@ public class RegionSnapshotServiceImpl<K, V> implements RegionSnapshotService<K,
return new File[] {snapshot};
}
- return snapshot.listFiles(new FileFilter() {
- @Override
- public boolean accept(File pathname) {
- return !pathname.isDirectory();
- }
- });
+ return snapshot.listFiles(pathname -> !pathname.isDirectory());
}
};
@@ -141,7 +137,7 @@ public class RegionSnapshotServiceImpl<K, V> implements RegionSnapshotService<K,
@Override
public SnapshotOptions<K, V> createOptions() {
- return new SnapshotOptionsImpl<K, V>();
+ return new SnapshotOptionsImpl<>();
}
@Override
@@ -158,7 +154,7 @@ public class RegionSnapshotServiceImpl<K, V> implements RegionSnapshotService<K,
}
if (shouldRunInParallel(options)) {
- snapshotInParallel(new ParallelArgs<K, V>(snapshot, format, options),
+ snapshotInParallel(new ParallelArgs<>(snapshot, format, options),
new ParallelExportFunction<K, V>());
} else {
exportOnMember(snapshot, format, options);
@@ -176,9 +172,8 @@ public class RegionSnapshotServiceImpl<K, V> implements RegionSnapshotService<K,
throws IOException, ClassNotFoundException {
if (shouldRunInParallel(options)) {
- snapshotInParallel(new ParallelArgs<K, V>(snapshot, format, options),
- new ParallelImportFunction<K, V>());
- return;
+ snapshotInParallel(new ParallelArgs<>(snapshot, format, options),
+ new ParallelImportFunction<>());
} else {
importOnMember(snapshot, format, options);
@@ -241,12 +236,12 @@ public class RegionSnapshotServiceImpl<K, V> implements RegionSnapshotService<K,
// Would be interesting to use a PriorityQueue ordered on isDone()
// but this is probably close enough in practice.
- LinkedList<Future<?>> puts = new LinkedList<Future<?>>();
+ LinkedList<Future<?>> puts = new LinkedList<>();
GFSnapshotImporter in = new GFSnapshotImporter(snapshot);
try {
int bufferSize = 0;
- Map<K, V> buffer = new HashMap<K, V>();
+ Map<K, V> buffer = new HashMap<>();
SnapshotRecord record;
while ((record = in.readSnapshotRecord()) != null) {
@@ -286,14 +281,10 @@ public class RegionSnapshotServiceImpl<K, V> implements RegionSnapshotService<K,
puts.removeFirst().get();
}
- final Map<K, V> copy = new HashMap<K, V>(buffer);
+ final Map<K, V> copy = new HashMap<>(buffer);
Future<?> f = GemFireCacheImpl.getExisting("Importing region from snapshot")
- .getDistributionManager().getWaitingThreadPool().submit(new Runnable() {
- @Override
- public void run() {
- local.basicImportPutAll(copy, !options.shouldInvokeCallbacks());
- }
- });
+ .getDistributionManager().getWaitingThreadPool().submit((Runnable) () -> local
+ .basicImportPutAll(copy, !options.shouldInvokeCallbacks()));
puts.addLast(f);
buffer.clear();
@@ -400,12 +391,12 @@ public class RegionSnapshotServiceImpl<K, V> implements RegionSnapshotService<K,
static <K, V> Exporter<K, V> createExporter(Region<?, ?> region, SnapshotOptions<K, V> options) {
String pool = region.getAttributes().getPoolName();
if (pool != null) {
- return new ClientExporter<K, V>(PoolManager.find(pool));
+ return new ClientExporter<>(PoolManager.find(pool));
} else if (InternalDistributedSystem.getAnyInstance().isLoner()
|| region.getAttributes().getDataPolicy().equals(DataPolicy.NORMAL)
|| region.getAttributes().getDataPolicy().equals(DataPolicy.PRELOADED)
- || region instanceof LocalDataSet || (((SnapshotOptionsImpl<K, V>) options).isParallelMode()
+ || region instanceof LocalDataSet || (options.isParallelMode()
&& region.getAttributes().getDataPolicy().withPartitioning())) {
// Avoid function execution:
@@ -413,10 +404,10 @@ public class RegionSnapshotServiceImpl<K, V> implements RegionSnapshotService<K,
// for NORMAL/PRELOAD since they don't support fn execution
// for LocalDataSet since we're already running a fn
// for parallel ops since we're already running a fn
- return new LocalExporter<K, V>();
+ return new LocalExporter<>();
}
- return new WindowedExporter<K, V>();
+ return new WindowedExporter<>();
}
static LocalRegion getLocalRegion(Region<?, ?> region) {
@@ -566,11 +557,12 @@ public class RegionSnapshotServiceImpl<K, V> implements RegionSnapshotService<K,
if (files != null) {
for (File f : files) {
- if (f.isDirectory() || !f.exists()) {
- throw new IOException(
- LocalizedStrings.Snapshot_INVALID_IMPORT_FILE.toLocalizedString(f));
+ if (f.exists()) {
+ local.getSnapshotService().load(f, args.getFormat(), args.getOptions());
+ } else {
+ LogManager.getLogger(RegionSnapshotServiceImpl.class)
+ .info("Nothing to import as location does not exist: " + f.getAbsolutePath());
}
- local.getSnapshotService().load(f, args.getFormat(), args.getOptions());
}
}
context.getResultSender().lastResult(Boolean.TRUE);
http://git-wip-us.apache.org/repos/asf/geode/blob/ca2f20b8/geode-core/src/test/java/org/apache/geode/cache/snapshot/CacheSnapshotJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/cache/snapshot/CacheSnapshotJUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/snapshot/CacheSnapshotJUnitTest.java
index e310042..8999cda 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/snapshot/CacheSnapshotJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/snapshot/CacheSnapshotJUnitTest.java
@@ -35,14 +35,14 @@ public class CacheSnapshotJUnitTest extends SnapshotTestCase {
public void testExportAndImport() throws Exception {
for (final RegionType rt : RegionType.values()) {
for (final SerializationType st : SerializationType.values()) {
- Region<Integer, MyObject> region =
- rgen.createRegion(cache, ds.getName(), rt, "test-" + rt.name() + "-" + st.name());
+ Region<Integer, MyObject> region = regionGenerator.createRegion(cache, diskStore.getName(),
+ rt, "test-" + rt.name() + "-" + st.name());
region.putAll(createExpected(st));
}
}
// save all regions
- cache.getSnapshotService().save(snaps, SnapshotFormat.GEMFIRE);
+ cache.getSnapshotService().save(getSnapshotDirectory(), SnapshotFormat.GEMFIRE);
for (final RegionType rt : RegionType.values()) {
for (final SerializationType st : SerializationType.values()) {
@@ -51,12 +51,12 @@ public class CacheSnapshotJUnitTest extends SnapshotTestCase {
Region<Integer, MyObject> region = cache.getRegion(name);
region.destroyRegion();
- rgen.createRegion(cache, ds.getName(), rt, name);
+ regionGenerator.createRegion(cache, diskStore.getName(), rt, name);
}
}
// load all regions
- cache.getSnapshotService().load(snaps, SnapshotFormat.GEMFIRE);
+ cache.getSnapshotService().load(getSnapshotDirectory(), SnapshotFormat.GEMFIRE);
for (final RegionType rt : RegionType.values()) {
for (final SerializationType st : SerializationType.values()) {
@@ -73,8 +73,8 @@ public class CacheSnapshotJUnitTest extends SnapshotTestCase {
public void testFilter() throws Exception {
for (final RegionType rt : RegionType.values()) {
for (final SerializationType st : SerializationType.values()) {
- Region<Integer, MyObject> region =
- rgen.createRegion(cache, ds.getName(), rt, "test-" + rt.name() + "-" + st.name());
+ Region<Integer, MyObject> region = regionGenerator.createRegion(cache, diskStore.getName(),
+ rt, "test-" + rt.name() + "-" + st.name());
region.putAll(createExpected(st));
}
}
@@ -88,18 +88,20 @@ public class CacheSnapshotJUnitTest extends SnapshotTestCase {
// save even entries
CacheSnapshotService css = cache.getSnapshotService();
SnapshotOptions<Object, Object> options = css.createOptions().setFilter(even);
- cache.getSnapshotService().save(snaps, SnapshotFormat.GEMFIRE, options);
+ cache.getSnapshotService().save(getSnapshotDirectory(), SnapshotFormat.GEMFIRE, options);
for (final RegionType rt : RegionType.values()) {
for (final SerializationType st : SerializationType.values()) {
Region region = cache.getRegion("test-" + rt.name() + "-" + st.name());
region.destroyRegion();
- rgen.createRegion(cache, ds.getName(), rt, "test-" + rt.name() + "-" + st.name());
+ regionGenerator.createRegion(cache, diskStore.getName(), rt,
+ "test-" + rt.name() + "-" + st.name());
}
}
// load odd entries
- File[] snapshots = snaps.listFiles(pathname -> pathname.getName().startsWith("snapshot-"));
+ File[] snapshots =
+ getSnapshotDirectory().listFiles(pathname -> pathname.getName().startsWith("snapshot-"));
options = css.createOptions().setFilter(odd);
css.load(snapshots, SnapshotFormat.GEMFIRE, options);
http://git-wip-us.apache.org/repos/asf/geode/blob/ca2f20b8/geode-core/src/test/java/org/apache/geode/cache/snapshot/ParallelSnapshotDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/cache/snapshot/ParallelSnapshotDUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/snapshot/ParallelSnapshotDUnitTest.java
index 541a603..5b59674 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/snapshot/ParallelSnapshotDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/snapshot/ParallelSnapshotDUnitTest.java
@@ -17,6 +17,8 @@ package org.apache.geode.cache.snapshot;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import org.junit.Before;
+import org.junit.Rule;
import org.junit.experimental.categories.Category;
import org.junit.Test;
@@ -28,6 +30,7 @@ import java.io.IOException;
import java.util.Arrays;
import com.examples.snapshot.MyPdxSerializer;
+
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.Region;
@@ -36,43 +39,79 @@ import org.apache.geode.cache.snapshot.SnapshotOptions.SnapshotFormat;
import org.apache.geode.internal.cache.snapshot.SnapshotOptionsImpl;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.SerializableCallable;
+import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
@Category(DistributedTest.class)
public class ParallelSnapshotDUnitTest extends JUnit4CacheTestCase {
private static final byte[] ffff = new byte[] {0xf, 0xf, 0xf, 0xf};
private static final byte[] eeee = new byte[] {0xe, 0xe, 0xe, 0xe};
+ private static final int DATA_POINTS = 100;
+
+ private File directory;
+
+ @Rule
+ public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder();
+
+ @Before
+ public void setup() throws IOException {
+ directory = temporaryFolder.newFolder();
+ }
@Test
public void testExportImport() throws Exception {
+ loadCache();
doExport(false);
doImport(false);
}
@Test
public void testExportWithSequentialImport() throws Exception {
+ loadCache();
doExport(false);
doSequentialImport();
}
@Test
public void testExportImportErrors() throws Exception {
+ loadCache();
try {
doExport(true);
- fail();
+ fail("Expected exception not thrown");
} catch (Exception e) {
+ // do nothing on expected exception from test
}
doExport(false);
try {
doImport(true);
- fail();
+ fail("Expected exception not thrown");
} catch (Exception e) {
+ // do nothing on expected exception from test
}
}
+ /**
+ * This test ensures that parallel import succeeds even when each node does not have a file to
+ * import (import cluster larger than export one)
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testImportOnLargerCluster() throws Exception {
+ loadCache(2);
+ doExport(false, 2);
+ getCache().getRegion("test").destroyRegion();
+ loadCache();
+ doImport(false);
+ }
+
private void doExport(boolean explode) throws Exception {
+ doExport(explode, Host.getHost(0).getVMCount());
+ }
+
+ private void doExport(boolean explode, int nodes) throws Exception {
Region region = getCache().getRegion("test");
- for (int i = 0; i < 1000; i++) {
+ for (int i = 0; i < DATA_POINTS; i++) {
region.put(i, ffff);
}
@@ -85,7 +124,7 @@ public class ParallelSnapshotDUnitTest extends JUnit4CacheTestCase {
opt.setParallelMode(true);
opt.setMapper(mapper);
- File f = new File("mysnap.gfd").getAbsoluteFile();
+ File f = new File(directory, "mysnap.gfd").getAbsoluteFile();
rss.save(f, SnapshotFormat.GEMFIRE, opt);
mapper.setShouldExplode(false);
@@ -100,7 +139,7 @@ public class ParallelSnapshotDUnitTest extends JUnit4CacheTestCase {
}
};
- forEachVm(check, true);
+ forEachVm(check, true, nodes);
}
private void doImport(boolean explode) throws ClassNotFoundException, IOException {
@@ -114,14 +153,12 @@ public class ParallelSnapshotDUnitTest extends JUnit4CacheTestCase {
opt.setParallelMode(true);
opt.setMapper(mapper);
- final File f = new File("mysnap.gfd").getAbsoluteFile();
-
- for (int i = 0; i < 1000; i++) {
+ for (int i = 0; i < DATA_POINTS; i++) {
region.put(i, eeee);
}
- rss.load(f, SnapshotFormat.GEMFIRE, opt);
- for (int i = 0; i < 1000; i++) {
+ rss.load(directory, SnapshotFormat.GEMFIRE, opt);
+ for (int i = 0; i < DATA_POINTS; i++) {
assertTrue(Arrays.equals(ffff, (byte[]) region.get(i)));
}
}
@@ -132,48 +169,43 @@ public class ParallelSnapshotDUnitTest extends JUnit4CacheTestCase {
SnapshotOptionsImpl opt = (SnapshotOptionsImpl) rss.createOptions();
- for (int i = 0; i < 1000; i++) {
+ for (int i = 0; i < DATA_POINTS; i++) {
region.put(i, eeee);
}
-
- final File file = new File("").getAbsoluteFile();
- rss.load(file, SnapshotFormat.GEMFIRE, opt);
- for (int i = 0; i < 1000; i++) {
+ int vmCount = Host.getHost(0).getVMCount();
+ for (int i = 0; i <= vmCount; i++) {
+ rss.load(new File(directory, Integer.toString(i)), SnapshotFormat.GEMFIRE, opt);
+ }
+ for (int i = 0; i < DATA_POINTS; i++) {
assertTrue(Arrays.equals(ffff, (byte[]) region.get(i)));
}
}
- public Object forEachVm(SerializableCallable call, boolean local) throws Exception {
+ private void forEachVm(SerializableCallable call, boolean local) throws Exception {
+ this.forEachVm(call, local, Integer.MAX_VALUE);
+ }
+
+ private void forEachVm(SerializableCallable call, boolean local, int maxNodes) throws Exception {
Host host = Host.getHost(0);
- int vms = host.getVMCount();
+ int vms = Math.min(host.getVMCount(), maxNodes);
for (int i = 0; i < vms; ++i) {
host.getVM(i).invoke(call);
}
if (local) {
- return call.call();
+ call.call();
}
- return null;
}
@Override
- public final void postSetUp() throws Exception {
- loadCache();
- }
-
- @Override
- public final void postTearDownCacheTestCase() throws Exception {
- File[] snaps = new File(".").listFiles((dir, name) -> name.startsWith("mysnap"));
+ public final void postSetUp() throws Exception {}
- if (snaps != null) {
- for (File f : snaps) {
- f.delete();
- }
- }
+ private void loadCache() throws Exception {
+ this.loadCache(Integer.MAX_VALUE);
}
- public void loadCache() throws Exception {
+ private void loadCache(int maxNodes) throws Exception {
SerializableCallable setup = new SerializableCallable() {
@Override
public Object call() throws Exception {
@@ -187,6 +219,6 @@ public class ParallelSnapshotDUnitTest extends JUnit4CacheTestCase {
}
};
- forEachVm(setup, true);
+ forEachVm(setup, true, maxNodes);
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/ca2f20b8/geode-core/src/test/java/org/apache/geode/cache/snapshot/RegionSnapshotJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/cache/snapshot/RegionSnapshotJUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/snapshot/RegionSnapshotJUnitTest.java
index 706067a..46c491c 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/snapshot/RegionSnapshotJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/snapshot/RegionSnapshotJUnitTest.java
@@ -40,29 +40,27 @@ import static org.junit.Assert.*;
@Category(IntegrationTest.class)
public class RegionSnapshotJUnitTest extends SnapshotTestCase {
- private File f;
+ private File snapshotFile;
@Test
public void testExportAndReadSnapshot() throws Exception {
- for (final RegionType rt : RegionType.values()) {
+ for (final RegionType type : RegionType.values()) {
for (final SerializationType st : SerializationType.values()) {
- String name = "test-" + rt.name() + "-" + st.name();
- Region<Integer, MyObject> region = rgen.createRegion(cache, ds.getName(), rt, name);
+ String name = "test-" + type.name() + "-" + st.name();
+ Region<Integer, MyObject> region =
+ regionGenerator.createRegion(cache, diskStore.getName(), type, name);
final Map<Integer, MyObject> expected = createExpected(st);
region.putAll(expected);
- region.getSnapshotService().save(f, SnapshotFormat.GEMFIRE);
+ region.getSnapshotService().save(snapshotFile, SnapshotFormat.GEMFIRE);
- final Map<Integer, Object> read = new HashMap<Integer, Object>();
- SnapshotIterator<Integer, Object> iter = SnapshotReader.read(f);
- try {
+ final Map<Integer, Object> read = new HashMap<>();
+ try (SnapshotIterator<Integer, Object> iter = SnapshotReader.read(snapshotFile)) {
while (iter.hasNext()) {
Entry<Integer, Object> entry = iter.next();
read.put(entry.getKey(), entry.getValue());
}
- assertEquals("Comparison failure for " + rt.name() + "/" + st.name(), expected, read);
- } finally {
- iter.close();
+ assertEquals("Comparison failure for " + type.name() + "/" + st.name(), expected, read);
}
}
}
@@ -73,14 +71,15 @@ public class RegionSnapshotJUnitTest extends SnapshotTestCase {
for (final RegionType rt : RegionType.values()) {
for (final SerializationType st : SerializationType.values()) {
String name = "test-" + rt.name() + "-" + st.name();
- Region<Integer, MyObject> region = rgen.createRegion(cache, ds.getName(), rt, name);
+ Region<Integer, MyObject> region =
+ regionGenerator.createRegion(cache, diskStore.getName(), rt, name);
final Map<Integer, MyObject> expected = createExpected(st);
region.putAll(expected);
- region.getSnapshotService().save(f, SnapshotFormat.GEMFIRE);
+ region.getSnapshotService().save(snapshotFile, SnapshotFormat.GEMFIRE);
region.destroyRegion();
- region = rgen.createRegion(cache, ds.getName(), rt, name);
+ region = regionGenerator.createRegion(cache, diskStore.getName(), rt, name);
region.getAttributesMutator().setCacheWriter(new CacheWriterAdapter<Integer, MyObject>() {
@Override
@@ -98,7 +97,7 @@ public class RegionSnapshotJUnitTest extends SnapshotTestCase {
}
});
- region.getSnapshotService().load(f, SnapshotFormat.GEMFIRE);
+ region.getSnapshotService().load(snapshotFile, SnapshotFormat.GEMFIRE);
assertEquals("Comparison failure for " + rt.name() + "/" + st.name(), expected.entrySet(),
region.entrySet());
@@ -109,37 +108,30 @@ public class RegionSnapshotJUnitTest extends SnapshotTestCase {
@Test
public void testFilter() throws Exception {
- SnapshotFilter<Integer, MyObject> even = new SnapshotFilter<Integer, MyObject>() {
- @Override
- public boolean accept(Entry<Integer, MyObject> entry) {
- return entry.getKey() % 2 == 0;
- }
- };
+ SnapshotFilter<Integer, MyObject> even =
+ (SnapshotFilter<Integer, MyObject>) entry -> entry.getKey() % 2 == 0;
- SnapshotFilter<Integer, MyObject> odd = new SnapshotFilter<Integer, MyObject>() {
- @Override
- public boolean accept(Entry<Integer, MyObject> entry) {
- return entry.getKey() % 2 == 1;
- }
- };
+ SnapshotFilter<Integer, MyObject> odd =
+ (SnapshotFilter<Integer, MyObject>) entry -> entry.getKey() % 2 == 1;
for (final RegionType rt : RegionType.values()) {
for (final SerializationType st : SerializationType.values()) {
String name = "test-" + rt.name() + "-" + st.name();
- Region<Integer, MyObject> region = rgen.createRegion(cache, ds.getName(), rt, name);
+ Region<Integer, MyObject> region =
+ regionGenerator.createRegion(cache, diskStore.getName(), rt, name);
final Map<Integer, MyObject> expected = createExpected(st);
region.putAll(expected);
RegionSnapshotService<Integer, MyObject> rss = region.getSnapshotService();
SnapshotOptions<Integer, MyObject> options = rss.createOptions().setFilter(even);
- rss.save(f, SnapshotFormat.GEMFIRE, options);
+ rss.save(snapshotFile, SnapshotFormat.GEMFIRE, options);
region.destroyRegion();
- region = rgen.createRegion(cache, ds.getName(), rt, name);
+ region = regionGenerator.createRegion(cache, diskStore.getName(), rt, name);
rss = region.getSnapshotService();
options = rss.createOptions().setFilter(odd);
- rss.load(f, SnapshotFormat.GEMFIRE, options);
+ rss.load(snapshotFile, SnapshotFormat.GEMFIRE, options);
assertEquals("Comparison failure for " + rt.name() + "/" + st.name(), 0, region.size());
}
@@ -148,17 +140,15 @@ public class RegionSnapshotJUnitTest extends SnapshotTestCase {
@Test
public void testFilterExportException() throws Exception {
- SnapshotFilter<Integer, MyObject> oops = new SnapshotFilter<Integer, MyObject>() {
- @Override
- public boolean accept(Entry<Integer, MyObject> entry) {
- throw new RuntimeException();
- }
+ SnapshotFilter<Integer, MyObject> oops = (SnapshotFilter<Integer, MyObject>) entry -> {
+ throw new RuntimeException();
};
for (final RegionType rt : RegionType.values()) {
for (final SerializationType st : SerializationType.values()) {
String name = "test-" + rt.name() + "-" + st.name();
- Region<Integer, MyObject> region = rgen.createRegion(cache, ds.getName(), rt, name);
+ Region<Integer, MyObject> region =
+ regionGenerator.createRegion(cache, diskStore.getName(), rt, name);
final Map<Integer, MyObject> expected = createExpected(st);
region.putAll(expected);
@@ -167,17 +157,17 @@ public class RegionSnapshotJUnitTest extends SnapshotTestCase {
boolean caughtException = false;
try {
- rss.save(f, SnapshotFormat.GEMFIRE, options);
+ rss.save(snapshotFile, SnapshotFormat.GEMFIRE, options);
} catch (RuntimeException e) {
caughtException = true;
}
assertTrue(caughtException);
region.destroyRegion();
- region = rgen.createRegion(cache, ds.getName(), rt, name);
+ region = regionGenerator.createRegion(cache, diskStore.getName(), rt, name);
rss = region.getSnapshotService();
- rss.load(f, SnapshotFormat.GEMFIRE, options);
+ rss.load(snapshotFile, SnapshotFormat.GEMFIRE, options);
assertEquals("Comparison failure for " + rt.name() + "/" + st.name(), 0, region.size());
}
@@ -186,32 +176,30 @@ public class RegionSnapshotJUnitTest extends SnapshotTestCase {
@Test
public void testFilterImportException() throws Exception {
- SnapshotFilter<Integer, MyObject> oops = new SnapshotFilter<Integer, MyObject>() {
- @Override
- public boolean accept(Entry<Integer, MyObject> entry) {
- throw new RuntimeException();
- }
+ SnapshotFilter<Integer, MyObject> oops = (SnapshotFilter<Integer, MyObject>) entry -> {
+ throw new RuntimeException();
};
for (final RegionType rt : RegionType.values()) {
for (final SerializationType st : SerializationType.values()) {
String name = "test-" + rt.name() + "-" + st.name();
- Region<Integer, MyObject> region = rgen.createRegion(cache, ds.getName(), rt, name);
+ Region<Integer, MyObject> region =
+ regionGenerator.createRegion(cache, diskStore.getName(), rt, name);
final Map<Integer, MyObject> expected = createExpected(st);
region.putAll(expected);
RegionSnapshotService<Integer, MyObject> rss = region.getSnapshotService();
- rss.save(f, SnapshotFormat.GEMFIRE);
+ rss.save(snapshotFile, SnapshotFormat.GEMFIRE);
region.destroyRegion();
- region = rgen.createRegion(cache, ds.getName(), rt, name);
+ region = regionGenerator.createRegion(cache, diskStore.getName(), rt, name);
rss = region.getSnapshotService();
SnapshotOptions<Integer, MyObject> options = rss.createOptions().setFilter(oops);
boolean caughtException = false;
try {
- rss.load(f, SnapshotFormat.GEMFIRE, options);
+ rss.load(snapshotFile, SnapshotFormat.GEMFIRE, options);
} catch (RuntimeException e) {
caughtException = true;
}
@@ -225,14 +213,15 @@ public class RegionSnapshotJUnitTest extends SnapshotTestCase {
@Test
public void testInvalidate() throws Exception {
Region<Integer, MyObject> region =
- rgen.createRegion(cache, ds.getName(), RegionType.REPLICATE, "test");
- MyObject obj = rgen.createData(SerializationType.SERIALIZABLE, 1, "invalidated value");
+ regionGenerator.createRegion(cache, diskStore.getName(), RegionType.REPLICATE, "test");
+ MyObject obj =
+ regionGenerator.createData(SerializationType.SERIALIZABLE, 1, "invalidated value");
region.put(1, obj);
region.invalidate(1);
- region.getSnapshotService().save(f, SnapshotFormat.GEMFIRE);
- region.getSnapshotService().load(f, SnapshotFormat.GEMFIRE);
+ region.getSnapshotService().save(snapshotFile, SnapshotFormat.GEMFIRE);
+ region.getSnapshotService().load(snapshotFile, SnapshotFormat.GEMFIRE);
assertTrue(region.containsKey(1));
assertFalse(region.containsValueForKey(1));
@@ -251,11 +240,12 @@ public class RegionSnapshotJUnitTest extends SnapshotTestCase {
SerializationType st = SerializationType.PDX_SERIALIZER;
String name = "test-" + rt.name() + "-" + st.name() + "-dsid";
- Region<Integer, MyObject> region = rgen.createRegion(cache, ds.getName(), rt, name);
+ Region<Integer, MyObject> region =
+ regionGenerator.createRegion(cache, diskStore.getName(), rt, name);
final Map<Integer, MyObject> expected = createExpected(st);
region.putAll(expected);
- region.getSnapshotService().save(f, SnapshotFormat.GEMFIRE);
+ region.getSnapshotService().save(snapshotFile, SnapshotFormat.GEMFIRE);
cache.close();
@@ -264,22 +254,19 @@ public class RegionSnapshotJUnitTest extends SnapshotTestCase {
.setPdxSerializer(new MyPdxSerializer()).set(DISTRIBUTED_SYSTEM_ID, "100");
cache = cf2.create();
- final Map<Integer, Object> read = new HashMap<Integer, Object>();
- SnapshotIterator<Integer, Object> iter = SnapshotReader.read(f);
- try {
+ final Map<Integer, Object> read = new HashMap<>();
+ try (SnapshotIterator<Integer, Object> iter = SnapshotReader.read(snapshotFile)) {
while (iter.hasNext()) {
Entry<Integer, Object> entry = iter.next();
read.put(entry.getKey(), entry.getValue());
}
assertEquals("Comparison failure for " + rt.name() + "/" + st.name(), expected, read);
- } finally {
- iter.close();
}
}
@Before
public void setUp() throws Exception {
super.setUp();
- f = new File(snaps, "test.snapshot.gfd");
+ snapshotFile = new File(getSnapshotDirectory(), "test.snapshot.gfd");
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/ca2f20b8/geode-core/src/test/java/org/apache/geode/cache/snapshot/SnapshotTestCase.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/cache/snapshot/SnapshotTestCase.java b/geode-core/src/test/java/org/apache/geode/cache/snapshot/SnapshotTestCase.java
index 3898925..d0040f7 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/snapshot/SnapshotTestCase.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/snapshot/SnapshotTestCase.java
@@ -21,69 +21,55 @@ import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.DiskStore;
import org.apache.geode.cache.snapshot.RegionGenerator.SerializationType;
+
import org.junit.After;
import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
import java.io.File;
-import java.io.FilenameFilter;
import java.util.HashMap;
import java.util.Map;
-import java.util.Random;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
public class SnapshotTestCase {
- protected File store;
- protected File snaps;
+ private File snapshotDirectory;
protected Cache cache;
- protected RegionGenerator rgen;
- protected DiskStore ds;
+ RegionGenerator regionGenerator;
+ DiskStore diskStore;
+
+ @Rule
+ public TemporaryFolder baseDir = new TemporaryFolder();
@Before
public void setUp() throws Exception {
- store = new File("store-" + Math.abs(new Random().nextInt()));
- store.mkdir();
-
- snaps = new File("snapshots-" + Math.abs(new Random().nextInt()));
- snaps.mkdir();
+ File storeDirectory = baseDir.newFolder("store");
+ snapshotDirectory = baseDir.newFolder("snapshots");
- rgen = new RegionGenerator();
+ regionGenerator = new RegionGenerator();
CacheFactory cf = new CacheFactory().set(MCAST_PORT, "0").set(LOG_LEVEL, "error");
cache = cf.create();
- ds = cache.createDiskStoreFactory().setMaxOplogSize(1).setDiskDirs(new File[] {store})
- .create("snapshotTest");
+ diskStore = cache.createDiskStoreFactory().setMaxOplogSize(1)
+ .setDiskDirs(new File[] {storeDirectory}).create("snapshotTest");
}
@After
public void tearDown() throws Exception {
cache.close();
- deleteFiles(store);
- deleteFiles(snaps);
}
- public Map<Integer, MyObject> createExpected(SerializationType type) {
- Map<Integer, MyObject> expected = new HashMap<Integer, MyObject>();
- for (int i = 0; i < 1000; i++) {
- expected.put(i, rgen.createData(type, i, "The number is " + i));
- }
- return expected;
+ File getSnapshotDirectory() {
+ return snapshotDirectory;
}
- public static void deleteFiles(File dir) {
- File[] deletes = dir.listFiles(new FilenameFilter() {
- @Override
- public boolean accept(File dir, String name) {
- return true;
- }
- });
-
- if (deletes != null) {
- for (File f : deletes) {
- f.delete();
- }
+ Map<Integer, MyObject> createExpected(SerializationType type) {
+ Map<Integer, MyObject> expected = new HashMap<>();
+ for (int i = 0; i < 1000; i++) {
+ expected.put(i, regionGenerator.createData(type, i, "The number is " + i));
}
- dir.delete();
+ return expected;
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/ca2f20b8/geode-core/src/test/java/org/apache/geode/cache/snapshot/TestSnapshotFileMapper.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/cache/snapshot/TestSnapshotFileMapper.java b/geode-core/src/test/java/org/apache/geode/cache/snapshot/TestSnapshotFileMapper.java
index 953721f..ba89630 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/snapshot/TestSnapshotFileMapper.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/snapshot/TestSnapshotFileMapper.java
@@ -32,7 +32,9 @@ public class TestSnapshotFileMapper implements SnapshotFileMapper {
if (shouldExplode) {
throw new RuntimeException();
}
- return new File(snapshot.getParentFile(), mapFilename(snapshot));
+ File directory = new File(snapshot.getParent(), Integer.toString(1 + VM.getCurrentVMNum()));
+ directory.mkdirs();
+ return new File(directory, mapFilename(snapshot));
}
@Override
@@ -40,15 +42,11 @@ public class TestSnapshotFileMapper implements SnapshotFileMapper {
if (shouldExplode) {
throw new RuntimeException();
}
-
- File f = new File(snapshot.getParentFile(), mapFilename(snapshot));
- return new File[] {f};
+ File directory = new File(snapshot, Integer.toString(1 + VM.getCurrentVMNum()));
+ return new File[] {directory};
}
private String mapFilename(File snapshot) {
- String filename = snapshot.getName();
- int suffixLocation = filename.indexOf(RegionSnapshotService.SNAPSHOT_FILE_EXTENSION);
- return filename.substring(0, suffixLocation) + "-" + VM.getCurrentVMNum()
- + RegionSnapshotService.SNAPSHOT_FILE_EXTENSION;
+ return snapshot.getName();
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/ca2f20b8/geode-core/src/test/java/org/apache/geode/cache/snapshot/WanSnapshotJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/cache/snapshot/WanSnapshotJUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/snapshot/WanSnapshotJUnitTest.java
index 46245c3..b62b7bf 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/snapshot/WanSnapshotJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/snapshot/WanSnapshotJUnitTest.java
@@ -15,12 +15,13 @@
package org.apache.geode.cache.snapshot;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
import java.io.File;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.awaitility.Awaitility;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -37,7 +38,6 @@ import org.apache.geode.test.junit.categories.IntegrationTest;
public class WanSnapshotJUnitTest extends SnapshotTestCase {
private Region<Integer, MyObject> region;
private WanListener wan;
- private static final long MAX_WAIT = 5 * 60 * 1000; // 6 minutes
@Test
public void testWanCallback() throws Exception {
@@ -46,18 +46,12 @@ public class WanSnapshotJUnitTest extends SnapshotTestCase {
region.put(i, new MyObject(i, "clienttest " + i));
}
- File snapshot = new File("wan.snapshot.gfd");
+ File snapshot = new File(getSnapshotDirectory(), "wan.snapshot.gfd");
region.getSnapshotService().save(snapshot, SnapshotFormat.GEMFIRE);
region.clear();
- long start = System.currentTimeMillis();
- // wait for the events to drain out
- while (!wan.ticker.compareAndSet(count, 0)) {
- Thread.sleep(100);
- if (System.currentTimeMillis() - start > MAX_WAIT) {
- fail("Event did not drain in 5 minutes");
- }
- }
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).pollInterval(100, TimeUnit.MILLISECONDS)
+ .until(() -> wan.ticker.compareAndSet(count, 0));
region.getSnapshotService().load(snapshot, SnapshotFormat.GEMFIRE);
@@ -92,4 +86,3 @@ public class WanSnapshotJUnitTest extends SnapshotTestCase {
}
}
-
http://git-wip-us.apache.org/repos/asf/geode/blob/ca2f20b8/geode-core/src/test/java/org/apache/geode/internal/cache/snapshot/ParallelSnapshotFileMapperTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/snapshot/ParallelSnapshotFileMapperTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/snapshot/ParallelSnapshotFileMapperTest.java
index 2f045fc..371fc6e 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/snapshot/ParallelSnapshotFileMapperTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/snapshot/ParallelSnapshotFileMapperTest.java
@@ -69,9 +69,10 @@ public class ParallelSnapshotFileMapperTest {
}
@Test
- public void mapImportPathIsUnsupported() throws Exception {
- thrown.expect(UnsupportedOperationException.class);
- mapper.mapImportPath(null, null);
+ public void mapImportReturnsUnchangedPath() {
+ File file = new File(BASE_LOCATION + FILE_TYPE);
+ File[] mappedFiles = mapper.mapImportPath(null, file);
+ assertEquals(file, mappedFiles[0]);
}
@Test