You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by je...@apache.org on 2016/10/04 17:03:13 UTC

[07/38] incubator-geode git commit: GEODE-1801: Updated the ClientTypeRegistration.java to handle Snapshot reading on the client side.

GEODE-1801: Updated the ClientTypeRegistration.java to handle Snapshot reading on the client side.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/cf0b3784
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/cf0b3784
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/cf0b3784

Branch: refs/heads/feature/e2e-testing
Commit: cf0b378429b643513366ce90226e4f6472056c7b
Parents: a1938b3
Author: Udo Kohlmeyer <uk...@pivotal.io>
Authored: Mon Sep 26 09:43:53 2016 +1000
Committer: Udo Kohlmeyer <uk...@pivotal.io>
Committed: Mon Sep 26 09:43:53 2016 +1000

----------------------------------------------------------------------
 .../pdx/internal/ClientTypeRegistration.java    | 277 +++++++------------
 .../cache/snapshot/GFSnapshotDUnitTest.java     | 216 +++++++++++++++
 .../internal/cache/snapshot/TestObject.java     |  78 ++++++
 3 files changed, 393 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cf0b3784/geode-core/src/main/java/org/apache/geode/pdx/internal/ClientTypeRegistration.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/pdx/internal/ClientTypeRegistration.java b/geode-core/src/main/java/org/apache/geode/pdx/internal/ClientTypeRegistration.java
index 9476ef4..b7f27f3 100644
--- a/geode-core/src/main/java/org/apache/geode/pdx/internal/ClientTypeRegistration.java
+++ b/geode-core/src/main/java/org/apache/geode/pdx/internal/ClientTypeRegistration.java
@@ -21,8 +21,6 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 
-import org.apache.logging.log4j.Logger;
-
 import org.apache.geode.InternalGemFireError;
 import org.apache.geode.cache.CacheClosedException;
 import org.apache.geode.cache.client.Pool;
@@ -41,110 +39,90 @@ import org.apache.geode.cache.wan.GatewaySender;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.PoolManagerImpl;
 import org.apache.geode.internal.logging.LogService;
-import org.apache.geode.pdx.PdxInitializationException;
+import org.apache.logging.log4j.Logger;
 
 /**
  *
  */
 public class ClientTypeRegistration implements TypeRegistration {
+
   private static final Logger logger = LogService.getLogger();
-  
+
   private final GemFireCacheImpl cache;
-  
-  private volatile boolean typeRegistryInUse = false;
 
   public ClientTypeRegistration(GemFireCacheImpl cache) {
     this.cache = cache;
   }
-  
+
   public int defineType(PdxType newType) {
-    verifyConfiguration(); 
     Collection<Pool> pools = getAllPools();
-    
+
     ServerConnectivityException lastException = null;
-    for(Pool pool: pools) {
+    int newTypeId = -1;
+    for (Pool pool : pools) {
       try {
-        int result = GetPDXIdForTypeOp.execute((ExecutablePool) pool, newType);
-        newType.setTypeId(result);
-        sendTypeToAllPools(newType, result, pools, pool);
-        return result;
-      } catch(ServerConnectivityException e) {
+        newTypeId = GetPDXIdForTypeOp.execute((ExecutablePool) pool, newType);
+        newType.setTypeId(newTypeId);
+        sendTypeToAllPools(newType, newTypeId, pool);
+        return newTypeId;
+      } catch (ServerConnectivityException e) {
         //ignore, try the next pool.
         lastException = e;
       }
     }
-    if(lastException != null) {
-      throw lastException;
-    } else {
-      if (this.cache.isClosed()) {
-        throw new CacheClosedException("PDX detected cache was closed");
-      }
-      throw new CacheClosedException("Client pools have been closed so the PDX type registry can not define a type.");
-    }
+    throw returnCorrectExceptionForFailure(pools, newTypeId, lastException);
   }
-  
-  private void sendTypeToAllPools(PdxType type, int id,
-      Collection<Pool> pools, Pool definingPool) {
-    
-    for(Pool pool: pools) {
-      if(pool.equals(definingPool)) {
-        continue;
-      }
-      
-      try {
-        AddPDXTypeOp.execute((ExecutablePool) pool, id, type);
-      } catch(ServerConnectivityException ignore) {
-        logger.debug("Received an exception sending pdx type to pool {}, {}", pool, ignore.getMessage(), ignore);
-        //TODO DAN - is it really safe to ignore this? What if this is the pool
-        //we're about to do a put on? I think maybe we really should pass the context
-        //down to this point, if it is available. Maybe just an optional thread local?
-        //Then we could go straight to that pool to register the type and bail otherwise.
-      }
+
+  private void sendTypeToAllPools(PdxType type, int id, Pool pool) {
+
+    try {
+      AddPDXTypeOp.execute((ExecutablePool) pool, id, type);
+    } catch (ServerConnectivityException ignore) {
+      logger.debug("Received an exception sending pdx type to pool {}, {}", pool, ignore.getMessage(), ignore);
+      //TODO DAN - is it really safe to ignore this? What if this is the pool
+      //we're about to do a put on? I think maybe we really should pass the context
+      //down to this point, if it is available. Maybe just an optional thread local?
+      //Then we could go straight to that pool to register the type and bail otherwise.
     }
-    
   }
 
   public PdxType getType(int typeId) {
-    verifyConfiguration();
     Collection<Pool> pools = getAllPools();
-    
+
     ServerConnectivityException lastException = null;
-    for(Pool pool: pools) {
+    for (Pool pool : pools) {
       try {
         PdxType type = GetPDXTypeByIdOp.execute((ExecutablePool) pool, typeId);
-        if(type != null) {
+        if (type != null) {
           return type;
         }
-      } catch(ServerConnectivityException e) {
+      } catch (ServerConnectivityException e) {
         logger.debug("Received an exception getting pdx type from pool {}, {}", pool, e.getMessage(), e);
         //ignore, try the next pool.
         lastException = e;
       }
     }
-    
-    if(lastException != null) {
+
+    if (lastException != null) {
       throw lastException;
     } else {
-      if(pools.isEmpty()) {
+      if (pools.isEmpty()) {
         if (this.cache.isClosed()) {
           throw this.cache.getCacheClosedException("PDX detected cache was closed", null);
+        } else {
+          throw new CacheClosedException("Client pools have been closed so the PDX type registry can not lookup a type.");
         }
-        throw new CacheClosedException("Client pools have been closed so the PDX type registry can not lookup a type.");
       } else {
         throw new InternalGemFireError("getType: Unable to determine PDXType for id " + typeId + " from existing client to server pools " + pools);
       }
     }
   }
-  
-  private Collection<Pool> getAllPools() {
-    return getAllPools(cache);
-  }
-  
-  private static Collection<Pool> getAllPools(GemFireCacheImpl cache) {
+
+  private static Collection<Pool> getAllPools() {
     Collection<Pool> pools = PoolManagerImpl.getPMI().getMap().values();
-    for(Iterator<Pool> itr= pools.iterator(); itr.hasNext(); ) {
+    for (Iterator<Pool> itr = pools.iterator(); itr.hasNext(); ) {
       PoolImpl pool = (PoolImpl) itr.next();
-      if(pool.isUsedByGateway()) {
+      if (pool.isUsedByGateway()) {
         itr.remove();
       }
     }
@@ -164,9 +142,9 @@ public class ClientTypeRegistration implements TypeRegistration {
   }
 
   public void gatewaySenderStarted(GatewaySender gatewaySender) {
-    checkAllowed();
+    //do nothing
   }
-  
+
   public void creatingPersistentRegion() {
     //do nothing
   }
@@ -176,47 +154,35 @@ public class ClientTypeRegistration implements TypeRegistration {
   }
 
   public int getEnumId(Enum<?> v) {
-    EnumInfo ei = new EnumInfo(v);
-    Collection<Pool> pools = getAllPools();
+    EnumInfo enumInfo = new EnumInfo(v);
+    return processEnumInfoForEnumId(enumInfo);
+  }
 
+  private int processEnumInfoForEnumId(EnumInfo enumInfo) {
+    Collection<Pool> pools = getAllPools();
     ServerConnectivityException lastException = null;
-    for(Pool pool: pools) {
+    for (Pool pool : pools) {
       try {
-        int result = GetPDXIdForEnumOp.execute((ExecutablePool) pool, ei);
-        sendEnumIdToAllPools(ei, result, pools, pool);
+        int result = GetPDXIdForEnumOp.execute((ExecutablePool) pool, enumInfo);
+        sendEnumIdToAllPools(enumInfo, result, pool);
         return result;
-      } catch(ServerConnectivityException e) {
+      } catch (ServerConnectivityException e) {
         //ignore, try the next pool.
         lastException = e;
       }
     }
-    if (lastException != null) {
-      throw lastException;
-    } else {
-      if (this.cache.isClosed()) {
-        throw new CacheClosedException("PDX detected cache was closed");
-      }
-      throw new CacheClosedException("Client pools have been closed so the PDX type registry can not define a type.");
-    }
+    throw returnCorrectExceptionForFailure(pools, -1, lastException);
   }
-  
-  private void sendEnumIdToAllPools(EnumInfo enumInfo, int id,
-      Collection<Pool> pools, Pool definingPool) {
 
-    for (Pool pool: pools) {
-      if (pool.equals(definingPool)) {
-        continue;
-      }
-
-      try {
-        AddPDXEnumOp.execute((ExecutablePool) pool, id, enumInfo);
-      } catch(ServerConnectivityException ignore) {
-        logger.debug("Received an exception sending pdx type to pool {}, {}", pool, ignore.getMessage(), ignore);
-        //TODO DAN - is it really safe to ignore this? What if this is the pool
-        //we're about to do a put on? I think maybe we really should pass the context
-        //down to this point, if it is available. Maybe just an optional thread local?
-        //Then we could go straight to that pool to register the type and bail otherwise.
-      }
+  private void sendEnumIdToAllPools(EnumInfo enumInfo, int id, Pool pool) {
+    try {
+      AddPDXEnumOp.execute((ExecutablePool) pool, id, enumInfo);
+    } catch (ServerConnectivityException ignore) {
+      logger.debug("Received an exception sending pdx type to pool {}, {}", pool, ignore.getMessage(), ignore);
+      //TODO DAN - is it really safe to ignore this? What if this is the pool
+      //we're about to do a put on? I think maybe we really should pass the context
+      //down to this point, if it is available. Maybe just an optional thread local?
+      //Then we could go straight to that pool to register the type and bail otherwise.
     }
   }
 
@@ -225,76 +191,27 @@ public class ClientTypeRegistration implements TypeRegistration {
   }
 
   public int defineEnum(EnumInfo newInfo) {
-    Collection<Pool> pools = getAllPools();
-    
-    ServerConnectivityException lastException = null;
-    for(Pool pool: pools) {
-      try {
-        int result = GetPDXIdForEnumOp.execute((ExecutablePool) pool, newInfo);
-        sendEnumIdToAllPools(newInfo, result, pools, pool);
-        return result;
-      } catch(ServerConnectivityException e) {
-        //ignore, try the next pool.
-        lastException = e;
-      }
-    }
-    
-    
-    if(lastException != null) {
-      throw lastException;
-    } else {
-      if (this.cache.isClosed()) {
-        throw new CacheClosedException("PDX detected cache was closed");
-      }
-      throw new CacheClosedException("Client pools have been closed so the PDX type registry can not define a type.");
-    }
-   }
+    return processEnumInfoForEnumId(newInfo);
+  }
 
   public EnumInfo getEnumById(int enumId) {
     Collection<Pool> pools = getAllPools();
-    
+
     ServerConnectivityException lastException = null;
-    for(Pool pool: pools) {
+    for (Pool pool : pools) {
       try {
         EnumInfo result = GetPDXEnumByIdOp.execute((ExecutablePool) pool, enumId);
-        if(result != null) {
+        if (result != null) {
           return result;
         }
-      } catch(ServerConnectivityException e) {
+      } catch (ServerConnectivityException e) {
         logger.debug("Received an exception getting pdx type from pool {}, {}", pool, e.getMessage(), e);
         //ignore, try the next pool.
         lastException = e;
       }
     }
-    
-    if(lastException != null) {
-      throw lastException;
-    } else {
-      if(pools.isEmpty()) {
-        if (this.cache.isClosed()) {
-          throw this.cache.getCacheClosedException("PDX detected cache was closed", null);
-        }
-        throw new CacheClosedException("Client pools have been closed so the PDX type registry can not lookup an enum.");
-      } else {
-        throw new InternalGemFireError("getEnum: Unable to determine pdx enum for id " + enumId + " from existing client to server pools " + pools);
-      }
-    }
-  }
-  
-  private void verifyConfiguration() {
-    if(typeRegistryInUse) {
-      return;
-    } else {
-      typeRegistryInUse = true;
-      checkAllowed();
-    }
-  }
-  
-  private void checkAllowed() {
-    //Anything is allowed until the registry is in use.
-    if(!typeRegistryInUse) {
-      return;
-    }
+
+    throw returnCorrectExceptionForFailure(pools, enumId, lastException);
   }
 
   @SuppressWarnings({ "unchecked", "serial" })
@@ -307,8 +224,8 @@ public class ClientTypeRegistration implements TypeRegistration {
       }
       throw new CacheClosedException("Client pools have been closed so the PDX type registry is not available.");
     }
-    
-    Map<Integer, PdxType> types = new HashMap<Integer, PdxType>();
+
+    Map<Integer, PdxType> types = new HashMap<>();
     for (Pool p : pools) {
       try {
         types.putAll(GetPDXTypesOp.execute((ExecutablePool) p));
@@ -329,27 +246,28 @@ public class ClientTypeRegistration implements TypeRegistration {
       }
       throw new CacheClosedException("Client pools have been closed so the PDX type registry is not available.");
     }
-    
-    Map<Integer, EnumInfo> enums = new HashMap<Integer, EnumInfo>();
+
+    Map<Integer, EnumInfo> enums = new HashMap<>();
     for (Pool p : pools) {
       enums.putAll(GetPDXEnumsOp.execute((ExecutablePool) p));
     }
     return enums;
   }
-  
+
 
   @Override
   public PdxType getPdxTypeForField(String fieldName, String className) {
     for (Object value : types().values()) {
-      if (value instanceof PdxType){
+      if (value instanceof PdxType) {
         PdxType pdxType = (PdxType) value;
-        if(pdxType.getClassName().equals(className) && pdxType.getPdxField(fieldName) != null){
+        if (pdxType.getClassName().equals(className) && pdxType.getPdxField(fieldName) != null) {
           return pdxType;
         }
       }
     }
     return null;
   }
+
   @Override
   public void testClearRegistry() {
   }
@@ -362,50 +280,53 @@ public class ClientTypeRegistration implements TypeRegistration {
   @Override
   public void addImportedType(int typeId, PdxType importedType) {
     Collection<Pool> pools = getAllPools();
-    
+
     ServerConnectivityException lastException = null;
-    for(Pool pool: pools) {
+    for (Pool pool : pools) {
       try {
-        sendTypeToAllPools(importedType, typeId, pools, pool);
-      } catch(ServerConnectivityException e) {
+        sendTypeToAllPools(importedType, typeId, pool);
+        return;
+      } catch (ServerConnectivityException e) {
         //ignore, try the next pool.
         lastException = e;
       }
     }
-    if(lastException != null) {
-      throw lastException;
-    } else {
-      if (this.cache.isClosed()) {
-        throw new CacheClosedException("PDX detected cache was closed");
-      }
-      throw new CacheClosedException("Client pools have been closed so the PDX type registry can not define a type.");
-    }
+    throw returnCorrectExceptionForFailure(pools, typeId, lastException);
   }
 
   @Override
   public void addImportedEnum(int enumId, EnumInfo importedInfo) {
     Collection<Pool> pools = getAllPools();
-    
+
     ServerConnectivityException lastException = null;
-    for(Pool pool: pools) {
+    for (Pool pool : pools) {
       try {
-        sendEnumIdToAllPools(importedInfo, enumId, pools, pool);
-      } catch(ServerConnectivityException e) {
+        sendEnumIdToAllPools(importedInfo, enumId, pool);
+      } catch (ServerConnectivityException e) {
         //ignore, try the next pool.
         lastException = e;
       }
     }
-    
-    if(lastException != null) {
+
+    throw returnCorrectExceptionForFailure(pools, enumId, lastException);
+  }
+
+  private RuntimeException returnCorrectExceptionForFailure(final Collection<Pool> pools, final int typeId, final ServerConnectivityException lastException) {
+    if (lastException != null) {
       throw lastException;
     } else {
-      if (this.cache.isClosed()) {
-        throw new CacheClosedException("PDX detected cache was closed");
+      if (pools.isEmpty()) {
+        if (this.cache.isClosed()) {
+          throw this.cache.getCacheClosedException("PDX detected cache was closed", null);
+        } else {
+          throw new CacheClosedException("Client pools have been closed so the PDX type registry can not lookup a type.");
+        }
+      } else {
+        throw new InternalGemFireError("Unable to determine PDXType for id " + typeId);
       }
-      throw new CacheClosedException("Client pools have been closed so the PDX type registry can not define a type.");
     }
   }
-  
+
   @Override
   public int getLocalSize() {
     return 0;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cf0b3784/geode-core/src/test/java/org/apache/geode/internal/cache/snapshot/GFSnapshotDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/snapshot/GFSnapshotDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/snapshot/GFSnapshotDUnitTest.java
new file mode 100644
index 0000000..2ca3aec
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/snapshot/GFSnapshotDUnitTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.internal.cache.snapshot;
+
+import static org.apache.geode.distributed.ConfigurationProperties.*;
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.TreeMap;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.snapshot.SnapshotIterator;
+import org.apache.geode.cache.snapshot.SnapshotOptions.SnapshotFormat;
+import org.apache.geode.cache.snapshot.SnapshotReader;
+import org.apache.geode.distributed.LocatorLauncher;
+import org.apache.geode.distributed.ServerLauncher;
+import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.pdx.ReflectionBasedAutoSerializer;
+import org.apache.geode.test.dunit.DistributedTestUtils;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.NetworkUtils;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(DistributedTest.class)
+public class GFSnapshotDUnitTest extends JUnit4DistributedTestCase {
+
+  private VM locator;
+  private VM server;
+  private VM client;
+  private Host host;
+  private LocatorLauncher locatorLauncher;
+  private static ServerLauncher serverLauncher;
+
+  @Rule
+  public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder();
+
+  @Before
+  public void before() {
+    host = Host.getHost(0);
+    locator = host.getVM(0);
+    server = host.getVM(1);
+    client = host.getVM(2);
+  }
+
+  @Test
+  public void testDataExportAndIterate() throws IOException, ClassNotFoundException {
+    int locatorPort = AvailablePortHelper.getRandomAvailableTCPPort();
+    String serverHostName = NetworkUtils.getServerHostName(host);
+
+    Properties properties = configureCommonProperties(new Properties());
+
+    locator.invoke("Start Locator", () -> configureAndStartLocator(locatorPort, serverHostName, properties));
+    server.invoke("Start Server", () -> configureAndStartServer(locatorPort, serverHostName, properties));
+    client.invoke("Start client", () -> {
+      createAndStartClient(locatorPort, serverHostName);
+      return null;
+    });
+    client.invoke("Populate data", () -> populateDataOnClient());
+    String snapshotFilePath = server.invoke("Export data snapshot", () -> createSnapshot());
+    client.invoke("Iterate over snapshot", () -> {
+      ClientCache clientCache = ClientCacheFactory.getAnyInstance();
+      clientCache.close();
+      createAndStartClient(locatorPort, serverHostName);
+      iterateOverSnapshot(snapshotFilePath);
+    });
+  }
+
+  private void createAndStartClient(final int locatorPort, final String serverHostName) {
+    ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
+    clientCacheFactory.set("log-level", "config")
+                      .addPoolLocator(serverHostName, locatorPort);
+    ClientCache clientCache = clientCacheFactory.create();
+    clientCache.createClientRegionFactory(ClientRegionShortcut.PROXY).create("TestRegion");
+  }
+
+  private Object populateDataOnClient() {
+    ClientCache clientCache = ClientCacheFactory.getAnyInstance();
+    Region testRegion = clientCache.getRegion("TestRegion");
+    for (int i = 0; i < 100; i++) {
+      testRegion.put(i, new TestObject(i, "owner_" + i));
+    }
+    return null;
+  }
+
+  private String createSnapshot() throws IOException {
+    final String memberName = getUniqueName() + "-server";
+    File file = temporaryFolder.newFolder(memberName + "-snapshot");
+    Cache cache = CacheFactory.getAnyInstance();
+    cache.getSnapshotService().save(file, SnapshotFormat.GEMFIRE);
+    return file.getAbsolutePath();
+  }
+
+  private void iterateOverSnapshot(final String snapshotFilePath) throws IOException, ClassNotFoundException {
+
+    File mySnapshot = new File(snapshotFilePath + "/snapshot-TestRegion");
+    SnapshotIterator<Integer, TestObject> snapshotIterator = SnapshotReader.read(mySnapshot);
+
+    Map<Integer, TestObject> result = new TreeMap<>();
+
+    try {
+      while (snapshotIterator.hasNext()) {
+        Entry<Integer, TestObject> entry = snapshotIterator.next();
+        int key = entry.getKey();
+        TestObject value = entry.getValue();
+        result.put(key, value);
+      }
+    } finally {
+      snapshotIterator.close();
+    }
+    assertEquals(100, result.size());
+    int count = 0;
+    for (Entry<Integer, TestObject> entry : result.entrySet()) {
+      assertEquals(count, (int) entry.getKey());
+      assertEquals(new TestObject(count, "owner_" + count), entry.getValue());
+      count++;
+    }
+  }
+
+  private Properties configureCommonProperties(final Properties properties) {
+    properties.setProperty(MCAST_PORT, "0");
+    properties.setProperty(ENABLE_CLUSTER_CONFIGURATION, "false");
+    properties.setProperty(USE_CLUSTER_CONFIGURATION, "false");
+    return properties;
+  }
+
+  private void configureAndStartLocator(final int locatorPort, final String serverHostName, final Properties properties) throws IOException {
+    DistributedTestUtils.deleteLocatorStateFile();
+
+    final String memberName = getUniqueName() + "-locator";
+    final File workingDirectory = temporaryFolder.newFolder(memberName);
+
+    LocatorLauncher.Builder builder = new LocatorLauncher.Builder();
+
+    for (String propertyName : properties.stringPropertyNames()) {
+      builder.set(propertyName, properties.getProperty(propertyName));
+    }
+    locatorLauncher = builder.setBindAddress(serverHostName)
+                             .setHostnameForClients(serverHostName)
+                             .setMemberName(memberName)
+                             .setPort(locatorPort)
+                             .setWorkingDirectory(workingDirectory.getCanonicalPath())
+                             .build();
+    locatorLauncher.start();
+
+  }
+
+  private void configureAndStartServer(final int locatorPort, final String serverHostName, final Properties properties) throws IOException {
+    final String memberName = getUniqueName() + "-server";
+    final File workingDirectory = temporaryFolder.newFolder(memberName);
+    final File pdxDirectory = temporaryFolder.newFolder(memberName + "-pdx");
+    final File diskStoreDirectory = temporaryFolder.newFolder(memberName + "-disk");
+
+
+    ServerLauncher.Builder builder = new ServerLauncher.Builder();
+
+    for (String propertyName : properties.stringPropertyNames()) {
+      builder.set(propertyName, properties.getProperty(propertyName));
+    }
+
+    serverLauncher = builder.set("locators", serverHostName + "[" + locatorPort + "]")
+                            .setMemberName(memberName)
+                            .set("log-level", "config")
+                            .setHostNameForClients(serverHostName)
+                            .setServerBindAddress(serverHostName)
+                            .setServerPort(0)
+                            .setWorkingDirectory(workingDirectory.getCanonicalPath())
+                            .setPdxDiskStore("pdxDS")
+                            .setPdxPersistent(true)
+                            .build();
+    serverLauncher.start();
+
+    Cache cache = CacheFactory.getAnyInstance();
+
+    cache.createDiskStoreFactory().setDiskDirsAndSizes(new File[] { pdxDirectory }, new int[] { 16000 }).create("pdxDS");
+
+    cache.createDiskStoreFactory().setDiskDirsAndSizes(new File[] { diskStoreDirectory }, new int[] { 16000 }).create("diskStore");
+
+    RegionFactory<Object, Object> regionFactory = cache.createRegionFactory();
+    regionFactory.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE).setScope(Scope.DISTRIBUTED_ACK).setDiskStoreName("diskStore").create("TestRegion");
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cf0b3784/geode-core/src/test/java/org/apache/geode/internal/cache/snapshot/TestObject.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/snapshot/TestObject.java b/geode-core/src/test/java/org/apache/geode/internal/cache/snapshot/TestObject.java
new file mode 100644
index 0000000..d10efe3
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/snapshot/TestObject.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.internal.cache.snapshot;
+
+import org.apache.geode.pdx.PdxReader;
+import org.apache.geode.pdx.PdxSerializable;
+import org.apache.geode.pdx.PdxWriter;
+
+public class TestObject implements PdxSerializable {
+
+  public int id;
+  public String owner;
+
+  public TestObject() {
+  }
+
+  public TestObject(final int id, final String owner) {
+    this.id = id;
+    this.owner = owner;
+  }
+
+  @Override
+  public boolean equals(final Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    final TestObject that = (TestObject) o;
+
+    if (id != that.id) {
+      return false;
+    }
+    return owner != null ? owner.equals(that.owner) : that.owner == null;
+
+  }
+
+  @Override
+  public int hashCode() {
+    int result = id;
+    result = 31 * result + (owner != null ? owner.hashCode() : 0);
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return "TestObject [id=" + id + ", owner=" + owner + "]";
+  }
+
+  @Override
+  public void toData(final PdxWriter writer) {
+    writer.markIdentityField("id");
+    writer.writeInt("id", id);
+    writer.writeString("owner", owner);
+  }
+
+  @Override
+  public void fromData(final PdxReader reader) {
+    id = reader.readInt("id");
+    owner = reader.readString("owner");
+  }
+}
\ No newline at end of file