You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/10/31 20:56:13 UTC

[33/50] [abbrv] incubator-geode git commit: GEODE-538: Add check for persistent data recovery

GEODE-538: Add check for persistent data recovery

PartitionedRegion.getNodeForBucketReadOrLoad can return an invalid node
if persistent data recovery is in process and a get() targets a bucket
that
hasn't been recoverd yet. This can result in returning an incorrect
value (null) or throwing ConflictingPersistentDataException from a get()
or put() on the region.

This change adds a check for persistent recovery to be completed
before creating the new bucket. If recovery isn't complete then the
operation on the region will fail with a PartitionOfflineException.

Queries on a region while persistent recovery is in progress can also
result in incorrect results so a similar check is added to
DefaultQuery.checkQueryOnPR.

This closes #264


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/11ef3ebb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/11ef3ebb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/11ef3ebb

Branch: refs/heads/feature/GEODE-1930
Commit: 11ef3ebbe30a8340f57776bf4063684b91ccd0a3
Parents: 7511ffa
Author: Ken Howe <kh...@pivotal.io>
Authored: Thu Oct 6 15:02:24 2016 -0700
Committer: Anil <ag...@pivotal.io>
Committed: Wed Oct 19 15:49:33 2016 -0700

----------------------------------------------------------------------
 .../org/apache/geode/cache/query/Query.java     |  12 +
 .../cache/query/internal/DefaultQuery.java      |   6 +-
 .../internal/cache/PRHARedundancyProvider.java  |   9 +-
 .../geode/internal/cache/PartitionedRegion.java |  18 +-
 .../geode/internal/i18n/LocalizedStrings.java   |   1 +
 .../partitioned/PRBasicQueryDUnitTest.java      | 221 ++++++++++
 .../query/partitioned/PRQueryDUnitHelper.java   | 185 +++++++++
 ...tentColocatedPartitionedRegionDUnitTest.java | 411 ++++++++++++++++++-
 8 files changed, 844 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/11ef3ebb/geode-core/src/main/java/org/apache/geode/cache/query/Query.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/Query.java b/geode-core/src/main/java/org/apache/geode/cache/query/Query.java
index e27687d..670b262 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/Query.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/Query.java
@@ -89,6 +89,9 @@ public interface Query {
    * @throws QueryExecutionLowMemoryException
    *         If the query gets canceled due to low memory conditions and
    *         the resource manager critical heap percentage has been set
+   * @throws PartitionOfflineException
+   *         If persistent data recovery is not complete for a partitioned
+   *         region referred to in the query.
    */
   public Object execute()
     throws FunctionDomainException, TypeMismatchException, NameResolutionException,
@@ -150,6 +153,9 @@ public interface Query {
    * @throws QueryExecutionLowMemoryException
    *         If the query gets canceled due to low memory conditions and
    *         the resource manager critical heap percentage has been set
+   * @throws PartitionOfflineException
+   *         If persistent data recovery is not complete for a partitioned
+   *         region referred to in the query.
    *         
    */
   public Object execute(Object[] params)
@@ -220,6 +226,9 @@ public interface Query {
    * @throws QueryExecutionLowMemoryException
    *         If the query gets canceled due to low memory conditions and
    *         the resource manager critical heap percentage has been set
+   * @throws PartitionOfflineException
+   *         If persistent data recovery is not complete for a partitioned
+   *         region referred to in the query.
    */
   public Object execute(RegionFunctionContext context)
     throws FunctionDomainException, TypeMismatchException, NameResolutionException,
@@ -291,6 +300,9 @@ public interface Query {
    * @throws QueryExecutionLowMemoryException
    *         If the query gets canceled due to low memory conditions and
    *         the resource manager critical heap percentage has been set
+   * @throws PartitionOfflineException
+   *         If persistent data recovery is not complete for a partitioned
+   *         region referred to in the query.
    */
   public Object execute(RegionFunctionContext context, Object[] params)
     throws FunctionDomainException, TypeMismatchException, NameResolutionException,

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/11ef3ebb/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java
index 58df390..8175d82 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java
@@ -27,11 +27,14 @@ import org.apache.geode.cache.client.internal.UserAttributes;
 import org.apache.geode.cache.execute.Function;
 import org.apache.geode.cache.execute.RegionFunctionContext;
 import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.cache.persistence.PartitionOfflineException;
+import org.apache.geode.cache.persistence.PersistentID;
 import org.apache.geode.cache.query.*;
 import org.apache.geode.cache.query.internal.cq.InternalCqQuery;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.internal.NanoTimer;
 import org.apache.geode.internal.cache.*;
+import org.apache.geode.internal.cache.partitioned.RegionAdvisor;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 
 import java.util.*;
@@ -581,7 +584,7 @@ public class DefaultQuery implements Query {
   }
 
 
-  private QueryExecutor checkQueryOnPR(Object[] parameters) throws RegionNotFoundException {
+  private QueryExecutor checkQueryOnPR(Object[] parameters) throws RegionNotFoundException, PartitionOfflineException {
 
     // check for PartititionedRegions. If a PartitionedRegion is referred to in the query,
     // then the following restrictions apply:
@@ -601,6 +604,7 @@ public class DefaultQuery implements Query {
         throw new RegionNotFoundException(LocalizedStrings.DefaultQuery_REGION_NOT_FOUND_0.toLocalizedString(regionPath));
       }
       if (rgn instanceof QueryExecutor) {
+        ((PartitionedRegion)rgn).checkPROffline();
         prs.add((QueryExecutor)rgn);
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/11ef3ebb/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java
index cfedb67..6245c37 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java
@@ -24,6 +24,7 @@ import org.apache.geode.cache.PartitionedRegionStorageException;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.cache.persistence.PartitionOfflineException;
+import org.apache.geode.cache.persistence.PersistentID;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.internal.DM;
 import org.apache.geode.distributed.internal.DistributionConfig;
@@ -495,16 +496,20 @@ public class PRHARedundancyProvider
    *           redundancy.
    * @throws PartitionedRegionException
    *           if d-lock can not be acquired to create bucket.
-   * 
+   * @throws PartitionOfflineException
+   *           if persistent data recovery is not complete for a partitioned
+   *           region referred to in the query.
    */
   public InternalDistributedMember
     createBucketAtomically(final int bucketId,
                            final int newBucketSize,
                            final long startTime,
                            final boolean finishIncompleteCreation, String partitionName) throws PartitionedRegionStorageException,
-                                    PartitionedRegionException
+                                    PartitionedRegionException, PartitionOfflineException
   {
     final boolean isDebugEnabled = logger.isDebugEnabled();
+
+    prRegion.checkPROffline();
     
     // If there are insufficient stores throw *before* we try acquiring the
     // (very expensive) bucket lock or the (somewhat expensive) monitor on this

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/11ef3ebb/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index baab79f..f7ecdaf 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -28,6 +28,8 @@ import org.apache.geode.cache.client.internal.*;
 import org.apache.geode.cache.execute.*;
 import org.apache.geode.cache.partition.PartitionListener;
 import org.apache.geode.cache.partition.PartitionNotAvailableException;
+import org.apache.geode.cache.persistence.PartitionOfflineException;
+import org.apache.geode.cache.persistence.PersistentID;
 import org.apache.geode.cache.query.*;
 import org.apache.geode.cache.query.internal.*;
 import org.apache.geode.cache.query.internal.index.*;
@@ -1397,6 +1399,21 @@ public class PartitionedRegion extends LocalRegion implements
     new UpdateAttributesProcessor(this).distribute(false);
   }
 
+  /**
+   * Throw an exception if persistent data recovery from disk is not complete
+   * for this region.
+   *
+   * @throws PartitionOfflineException
+   */
+  public void checkPROffline() throws PartitionOfflineException {
+    if (getDataPolicy().withPersistence() && !recoveredFromDisk) {
+      Set<PersistentID> persistIds = new HashSet(getRegionAdvisor().advisePersistentMembers().values());
+      persistIds.removeAll(getRegionAdvisor().adviseInitializedPersistentMembers().values());
+      throw new PartitionOfflineException(persistIds, LocalizedStrings.PRHARedundancyProvider_PARTITIONED_REGION_0_OFFLINE_HAS_UNRECOVERED_PERSISTENT_DATA_1
+          .toLocalizedString(new Object[] { getFullPath(), persistIds}));
+    }
+  }
+
   public final void updatePRConfig(PartitionRegionConfig prConfig,
       boolean putOnlyIfUpdated) {
     final Set<Node> nodes = prConfig.getNodes();
@@ -3057,7 +3074,6 @@ public class PartitionedRegion extends LocalRegion implements
       final RetryTimeKeeper snoozer) {
     final boolean isDebugEnabled = logger.isDebugEnabled();
     
-//    InternalDistributedSystem ids = (InternalDistributedSystem)this.cache.getDistributedSystem();
     RetryTimeKeeper localSnoozer = snoozer;
     // Prevent early access to buckets that are not completely created/formed
     // and

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/11ef3ebb/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java b/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
index 8bfdd68..7d762b8 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
@@ -702,6 +702,7 @@ public class LocalizedStrings {
   public static final StringId AbstractDistributionConfig_CLIENT_CONFLATION_PROP_NAME = new StringId(1839, "Client override for server queue conflation setting");
   public static final StringId PRHARRedundancyProvider_ALLOCATE_ENOUGH_MEMBERS_TO_HOST_BUCKET = new StringId(1840, "allocate enough members to host bucket.");
   public static final StringId PRHARedundancyProvider_TIME_OUT_WAITING_0_MS_FOR_CREATION_OF_BUCKET_FOR_PARTITIONED_REGION_1_MEMBERS_REQUESTED_TO_CREATE_THE_BUCKET_ARE_2 = new StringId(1841, "Time out waiting {0} ms for creation of bucket for partitioned region {1}. Members requested to create the bucket are: {2}");
+  public static final StringId PRHARedundancyProvider_PARTITIONED_REGION_0_OFFLINE_HAS_UNRECOVERED_PERSISTENT_DATA_1 = new StringId(1842, "Partitioned Region {0} is offline due to unrecovered persistent data, {1}");
 
   public static final StringId PUT_0_FAILED_TO_PUT_ENTRY_FOR_REGION_1_KEY_2_VALUE_3 = new StringId(1843, "{0}: Failed to put entry for region {1} key {2} value {3}");
   public static final StringId PUT_0_UNEXPECTED_EXCEPTION = new StringId(1844, "{0}: Unexpected Exception");

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/11ef3ebb/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRBasicQueryDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRBasicQueryDUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRBasicQueryDUnitTest.java
index 8ef907a..224a7e0 100755
--- a/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRBasicQueryDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRBasicQueryDUnitTest.java
@@ -29,6 +29,7 @@ import static org.apache.geode.cache.query.Utils.*;
 
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.Region;
+import org.apache.geode.cache.persistence.PartitionOfflineException;
 import org.apache.geode.cache.query.Index;
 import org.apache.geode.cache.query.IndexType;
 import org.apache.geode.cache.query.Query;
@@ -38,6 +39,7 @@ import org.apache.geode.cache.query.data.Portfolio;
 import org.apache.geode.cache.query.data.PortfolioData;
 import org.apache.geode.cache30.CacheSerializableRunnable;
 import org.apache.geode.internal.cache.PartitionedRegionDUnitTestCase;
+import org.apache.geode.test.dunit.AsyncInvocation;
 import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.LogWriterUtils;
 import org.apache.geode.test.dunit.VM;
@@ -67,6 +69,8 @@ public class PRBasicQueryDUnitTest extends PartitionedRegionDUnitTestCase
     }
   }
 
+  private final static int MAX_SYNC_WAIT = 30 * 1000;
+  
   PRQueryDUnitHelper PRQHelp = new PRQueryDUnitHelper();
 
   final String name = "Portfolios";
@@ -153,6 +157,223 @@ public class PRBasicQueryDUnitTest extends PartitionedRegionDUnitTestCase
             "PRQBasicQueryDUnitTest#testPRBasicQuerying: Querying PR's Test ENDED");
   }
   
+  /**
+   * A basic dunit test that <br>
+   * 1. Creates a PR and colocated child region Accessor and Data Store with redundantCopies = 0.
+   * 2. Populates the region with test data.
+   * 3. Fires a query on accessor VM and verifies the result. 
+   * 4. Shuts down the caches, then restarts them asynchronously
+   * 5. Attempt the query while the regions are being recovered
+   * @throws Exception
+   */
+  @Test
+  public void testColocatedPRQueryDuringRecovery() throws Exception
+  {
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0); 
+    VM vm1 = host.getVM(1);
+    setCacheInVMs(vm0, vm1);
+    LogWriterUtils.getLogWriter()
+        .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Querying PR Test with DACK Started");
+
+    // Creting PR's on the participating VM's
+    // Creating Accessor node on the VM0.
+    LogWriterUtils.getLogWriter()
+        .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Creating the Accessor node in the PR");
+
+    vm0.invoke(PRQHelp.getCacheSerializableRunnableForColocatedPRCreate(name,
+        redundancy, PortfolioData.class, true));
+    // Creating local region on vm0 to compare the results of query.
+    vm0.invoke(PRQHelp.getCacheSerializableRunnableForLocalRegionCreation(localName, PortfolioData.class));
+    LogWriterUtils.getLogWriter()
+        .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Successfully created the Accessor node in the PR");
+
+    // Creating the Datastores Nodes in the VM1.
+    LogWriterUtils.getLogWriter()
+        .info("PRQBasicQueryDUnitTest:testColocatedPRBasicQuerying ----- Creating the Datastore node in the PR");
+    vm1.invoke(PRQHelp.getCacheSerializableRunnableForColocatedPRCreate(name,
+        redundancy, PortfolioData.class, true));
+
+    LogWriterUtils.getLogWriter()
+        .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Successfully Created the Datastore node in the PR");
+
+    LogWriterUtils.getLogWriter()
+        .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Successfully Created PR's across all VM's");
+
+    // Generating portfolio object array to be populated across the PR's & Local
+    // Regions
+
+    final PortfolioData[] portfolio = createPortfolioData(cnt, cntDest);
+    // Putting the data into the PR's created
+    vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRPuts(name, portfolio,
+        cnt, cntDest));
+    vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRDuplicatePuts(name, portfolio,
+        cnt, cntDest));
+    
+    LogWriterUtils.getLogWriter()
+        .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Inserted Portfolio data across PR's");
+    vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRPuts(localName,
+        portfolio, cnt, cntDest));
+    vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRDuplicatePuts(localName,
+        portfolio, cnt, cntDest));
+
+    // querying the VM for data and comparing the result with query result of
+    // local region.
+    // querying the VM for data
+    vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRQueryAndCompareResults(
+        name, localName));
+
+    LogWriterUtils.getLogWriter()
+        .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Querying PR's 1st pass ENDED");
+
+    // Shut everything down and then restart to test queries during recovery
+    vm0.invoke(PRQHelp.getCacheSerializableRunnableForCloseCache());
+    vm1.invoke(PRQHelp.getCacheSerializableRunnableForCloseCache());
+    
+    // Re-create the regions - only create the parent regions on the datastores
+    setCacheInVMs(vm0, vm1);
+    LogWriterUtils.getLogWriter()
+        .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Creating the Accessor node in the PR");
+    vm0.invoke(PRQHelp.getCacheSerializableRunnableForColocatedParentCreate(name,
+        redundancy, PortfolioData.class, true));
+
+    // Creating local region on vm0 to compare the results of query.
+    vm0.invoke(PRQHelp.getCacheSerializableRunnableForLocalRegionCreation(localName, PortfolioData.class));
+    LogWriterUtils.getLogWriter()
+        .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Successfully created the Accessor node in the PR");
+    LogWriterUtils.getLogWriter()
+        .info("PRQBasicQueryDUnitTest:testColocatedPRBasicQuerying: re-creating the Datastore node in the PR");
+    vm1.invoke(PRQHelp.getCacheSerializableRunnableForColocatedParentCreate(name,
+        redundancy, PortfolioData.class, true));
+    
+    // Now start the child regions asynchronously so queries will happen during persistent recovery
+    AsyncInvocation vm0PR = vm0.invokeAsync(PRQHelp.getCacheSerializableRunnableForColocatedChildCreate(name,
+        redundancy, PortfolioData.class, true));
+    AsyncInvocation vm1PR = vm1.invokeAsync(PRQHelp.getCacheSerializableRunnableForColocatedChildCreate(name,
+        redundancy, PortfolioData.class, true));
+
+    // delay the query to let the recovery get underway
+    Thread.sleep(100);
+    
+    try {
+      // This is a repeat of the original query from before closing and restarting the datastores. This time
+      // it should fail due to persistent recovery that has not completed.
+      vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRQueryAndCompareResults(name, localName, true));
+      fail("Expected PartitionOfflineException when queryiong a region with offline colocated child");
+    } catch (Exception e) {
+      if (!(e.getCause() instanceof PartitionOfflineException)) {
+        e.printStackTrace();
+        throw e;
+      }
+    }
+    LogWriterUtils.getLogWriter()
+        .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Querying PR's 2nd pass (after restarting regions) ENDED");
+  }
+
+  /**
+   * A basic dunit test that <br>
+   * 1. Creates a PR and colocated child region Accessor and Data Store with redundantCopies = 0.
+   * 2. Populates the region with test data.
+   * 3. Fires a query on accessor VM and verifies the result. 
+   * 4. Shuts down the caches, then restarts them asynchronously, but don't restart the child region
+   * 5. Attempt the query while the region offline because of the missing child region
+   * @throws Exception
+   */
+  @SuppressWarnings("rawtypes")
+  @Test
+  public void testColocatedPRQueryDuringRecoveryWithMissingColocatedChild() throws Exception
+  {
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0); 
+    VM vm1 = host.getVM(1);
+    setCacheInVMs(vm0, vm1);
+    LogWriterUtils.getLogWriter()
+        .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Querying PR Test with DACK Started");
+
+    // Creting PR's on the participating VM's
+    // Creating Accessor node on the VM0.
+    LogWriterUtils.getLogWriter()
+        .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Creating the Accessor node in the PR");
+
+    vm0.invoke(PRQHelp.getCacheSerializableRunnableForColocatedPRCreate(name,
+        redundancy, PortfolioData.class, true));
+    // Creating local region on vm0 to compare the results of query.
+    vm0.invoke(PRQHelp.getCacheSerializableRunnableForLocalRegionCreation(localName, PortfolioData.class));
+    LogWriterUtils.getLogWriter()
+        .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Successfully created the Accessor node in the PR");
+
+    // Creating the Datastores Nodes in the VM1.
+    LogWriterUtils.getLogWriter()
+        .info("PRQBasicQueryDUnitTest:testColocatedPRBasicQuerying ----- Creating the Datastore node in the PR");
+    vm1.invoke(PRQHelp.getCacheSerializableRunnableForColocatedPRCreate(name,
+        redundancy, PortfolioData.class, true));
+
+    LogWriterUtils.getLogWriter()
+        .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Successfully Created the Datastore node in the PR");
+
+    LogWriterUtils.getLogWriter()
+        .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Successfully Created PR's across all VM's");
+
+    // Generating portfolio object array to be populated across the PR's & Local
+    // Regions
+
+    final PortfolioData[] portfolio = createPortfolioData(cnt, cntDest);
+    // Putting the data into the PR's created
+    vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRPuts(name, portfolio,
+        cnt, cntDest));
+    vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRDuplicatePuts(name, portfolio,
+        cnt, cntDest));
+    
+    LogWriterUtils.getLogWriter()
+        .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Inserted Portfolio data across PR's");
+    vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRPuts(localName,
+        portfolio, cnt, cntDest));
+    vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRDuplicatePuts(localName,
+        portfolio, cnt, cntDest));
+
+    // querying the VM for data and comparing the result with query result of
+    // local region.
+    // querying the VM for data
+    vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRQueryAndCompareResults(
+        name, localName));
+
+    LogWriterUtils.getLogWriter()
+        .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Querying PR's 1st pass ENDED");
+
+    // Shut everything down and then restart to test queries during recovery
+    vm0.invoke(PRQHelp.getCacheSerializableRunnableForCloseCache());
+    vm1.invoke(PRQHelp.getCacheSerializableRunnableForCloseCache());
+    
+    // Re-create the only the parent region
+    setCacheInVMs(vm0, vm1);
+    LogWriterUtils.getLogWriter()
+        .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Creating the Accessor node in the PR");
+    vm0.invoke(PRQHelp.getCacheSerializableRunnableForColocatedParentCreate(name,
+        redundancy, PortfolioData.class, true));
+
+    // Creating local region on vm0 to compare the results of query.
+    vm0.invoke(PRQHelp.getCacheSerializableRunnableForLocalRegionCreation(localName, PortfolioData.class));
+    LogWriterUtils.getLogWriter()
+        .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Successfully created the Accessor node in the PR");
+    LogWriterUtils.getLogWriter()
+        .info("PRQBasicQueryDUnitTest:testColocatedPRBasicQuerying ----- re-creating the Datastore node in the PR");
+    vm1.invoke(PRQHelp.getCacheSerializableRunnableForColocatedParentCreate(name,
+        redundancy, PortfolioData.class, true));
+
+    try {
+      // This is a repeat of the original query from before closing and restarting the datastores. This time
+      // it should fail due to persistent recovery that has not completed.
+      vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRQueryAndCompareResults(name, localName, true));
+      fail("Expected PartitionOfflineException when queryiong a region with offline colocated child");
+    } catch (Exception e) {
+      if (!(e.getCause() instanceof PartitionOfflineException)) {
+        throw e;
+      }
+    }
+    LogWriterUtils.getLogWriter()
+        .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Querying PR's 2nd pass (after restarting regions) ENDED");
+  }
+ 
   @Test
   public void testPRCountStarQuery() throws Exception
   {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/11ef3ebb/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRQueryDUnitHelper.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRQueryDUnitHelper.java b/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRQueryDUnitHelper.java
index cfb4190..9dc90fd 100755
--- a/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRQueryDUnitHelper.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRQueryDUnitHelper.java
@@ -39,6 +39,7 @@ import org.apache.geode.cache.CacheClosedException;
 import org.apache.geode.cache.CacheException;
 import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.DiskStore;
 import org.apache.geode.cache.EntryExistsException;
 import org.apache.geode.cache.EntryNotFoundException;
 import org.apache.geode.cache.PartitionAttributes;
@@ -249,6 +250,190 @@ public class PRQueryDUnitHelper implements Serializable {
     return (CacheSerializableRunnable)createPrRegion;
   }
 
+  /**
+   * This function creates a colocated pair of PR's given the scope & the
+   * redundancy parameters for the parent *
+   *
+   * @param regionName
+   * @param redundancy
+   * @param constraint
+   * @param makePersistent
+   * @return cacheSerializable object
+   */
+  public CacheSerializableRunnable getCacheSerializableRunnableForColocatedPRCreate(
+    final String regionName, final int redundancy, final Class constraint, boolean makePersistent) {
+
+    final String childRegionName = regionName + "Child";
+    final String diskName = "disk";
+    SerializableRunnable createPrRegion;
+    createPrRegion = new CacheSerializableRunnable(regionName) {
+      @Override
+      public void run2() throws CacheException
+      {
+
+        Cache cache = getCache();
+        Region partitionedregion = null;
+        Region childRegion = null;
+        AttributesFactory attr = new AttributesFactory();
+        attr.setValueConstraint(constraint);
+        if (makePersistent) {
+          DiskStore ds = cache.findDiskStore(diskName);
+          if (ds == null) {
+            ds = cache.createDiskStoreFactory().setDiskDirs(JUnit4CacheTestCase.getDiskDirs())
+                .create(diskName);
+          }
+          attr.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
+          attr.setDiskStoreName(diskName);
+        } else {
+          attr.setDataPolicy(DataPolicy.PARTITION);
+          attr.setDiskStoreName(null);
+        }
+
+        PartitionAttributesFactory paf = new PartitionAttributesFactory();
+        paf.setRedundantCopies(redundancy);
+        attr.setPartitionAttributes(paf.create());
+
+        // parent region
+        partitionedregion = cache.createRegion(regionName, attr.create());
+        assertNotNull(
+            "PRQueryDUnitHelper#getCacheSerializableRunnableForPRCreateWithRedundancy: Partitioned Region "
+                + regionName + " not in cache", cache.getRegion(regionName));
+        assertNotNull(
+            "PRQueryDUnitHelper#getCacheSerializableRunnableForPRCreateWithRedundancy: Partitioned Region ref null",
+            partitionedregion);
+        assertTrue(
+            "PRQueryDUnitHelper#getCacheSerializableRunnableForPRCreateWithRedundancy: Partitioned Region ref claims to be destroyed",
+            !partitionedregion.isDestroyed());
+
+        // child region
+        attr.setValueConstraint(constraint);
+        paf.setColocatedWith(regionName);
+        attr.setPartitionAttributes(paf.create());
+        childRegion = cache.createRegion(childRegionName, attr.create());
+      }
+    };
+
+    return (CacheSerializableRunnable)createPrRegion;
+  }
+
+  /**
+   * This function creates the parent region of colocated pair of PR's given the scope & the
+   * redundancy parameters for the parent *
+   *
+   * @param regionName
+   * @param redundancy
+   * @param constraint
+   * @param makePersistent
+   * @return cacheSerializable object
+   */
+  public CacheSerializableRunnable getCacheSerializableRunnableForColocatedParentCreate(
+    final String regionName, final int redundancy, final Class constraint, boolean makePersistent) {
+
+    final String childRegionName = regionName + "Child";
+    final String diskName = "disk";
+    SerializableRunnable createPrRegion;
+    createPrRegion = new CacheSerializableRunnable(regionName + "-NoChildRegion") {
+      @Override
+      public void run2() throws CacheException
+      {
+
+        Cache cache = getCache();
+        Region partitionedregion = null;
+        Region childRegion = null;
+        AttributesFactory attr = new AttributesFactory();
+        attr.setValueConstraint(constraint);
+        if (makePersistent) {
+          DiskStore ds = cache.findDiskStore(diskName);
+          if (ds == null) {
+            ds = cache.createDiskStoreFactory().setDiskDirs(JUnit4CacheTestCase.getDiskDirs())
+                .create(diskName);
+          }
+          attr.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
+          attr.setDiskStoreName(diskName);
+        } else {
+          attr.setDataPolicy(DataPolicy.PARTITION);
+          attr.setDiskStoreName(null);
+        }
+
+        PartitionAttributesFactory paf = new PartitionAttributesFactory();
+        paf.setRedundantCopies(redundancy);
+        attr.setPartitionAttributes(paf.create());
+
+        // parent region
+        partitionedregion = cache.createRegion(regionName, attr.create());
+        assertNotNull(
+            "PRQueryDUnitHelper#getCacheSerializableRunnableForPRCreateWithRedundancy: Partitioned Region "
+                + regionName + " not in cache", cache.getRegion(regionName));
+        assertNotNull(
+            "PRQueryDUnitHelper#getCacheSerializableRunnableForPRCreateWithRedundancy: Partitioned Region ref null",
+            partitionedregion);
+        assertTrue(
+            "PRQueryDUnitHelper#getCacheSerializableRunnableForPRCreateWithRedundancy: Partitioned Region ref claims to be destroyed",
+            !partitionedregion.isDestroyed());
+      }
+    };
+
+    return (CacheSerializableRunnable)createPrRegion;
+  }
+
+  /**
+   * This function creates the parent region of colocated pair of PR's given the scope & the
+   * redundancy parameters for the parent *
+   *
+   * @param regionName
+   * @param redundancy
+   * @param constraint
+   * @param isPersistent
+   * @return cacheSerializable object
+   */
+  public CacheSerializableRunnable getCacheSerializableRunnableForColocatedChildCreate(
+    final String regionName, final int redundancy, final Class constraint, boolean isPersistent) {
+
+    final String childRegionName = regionName + "Child";
+    final String diskName = "disk";
+    SerializableRunnable createPrRegion;
+    createPrRegion = new CacheSerializableRunnable(regionName + "-ChildRegion") {
+      @Override
+      public void run2() throws CacheException
+      {
+
+        Cache cache = getCache();
+        Region partitionedregion = null;
+        Region childRegion = null;
+        AttributesFactory attr = new AttributesFactory();
+        attr.setValueConstraint(constraint);
+        if (isPersistent) {
+          DiskStore ds = cache.findDiskStore(diskName);
+          if (ds == null) {
+//            ds = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs())
+            ds = cache.createDiskStoreFactory().setDiskDirs(org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase.getDiskDirs())
+                .create(diskName);
+          }
+          attr.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
+          attr.setDiskStoreName(diskName);
+        } else {
+          attr.setDataPolicy(DataPolicy.PARTITION);
+          attr.setDiskStoreName(null);
+        }
+
+        PartitionAttributesFactory paf = new PartitionAttributesFactory();
+        paf.setRedundantCopies(redundancy);
+        attr.setPartitionAttributes(paf.create());
+
+        // skip parent region creation
+        // partitionedregion = cache.createRegion(regionName, attr.create());
+
+        // child region
+        attr.setValueConstraint(constraint);
+        paf.setColocatedWith(regionName);
+        attr.setPartitionAttributes(paf.create());
+        childRegion = cache.createRegion(childRegionName, attr.create());
+      }
+    };
+
+    return (CacheSerializableRunnable)createPrRegion;
+  }
+
   public CacheSerializableRunnable getCacheSerializableRunnableForPRCreateLimitedBuckets(
       final String regionName, final int redundancy, final int buckets) {
         

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/11ef3ebb/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentColocatedPartitionedRegionDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentColocatedPartitionedRegionDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentColocatedPartitionedRegionDUnitTest.java
index 0a25228..c15d545 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentColocatedPartitionedRegionDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentColocatedPartitionedRegionDUnitTest.java
@@ -50,7 +50,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.jayway.awaitility.core.ConditionTimeoutException;
-import org.junit.experimental.categories.Category;
 
 import org.apache.geode.admin.internal.AdminDistributedSystemImpl;
 import org.apache.geode.cache.AttributesFactory;
@@ -64,6 +63,7 @@ import org.apache.geode.cache.Region;
 import org.apache.geode.cache.control.RebalanceOperation;
 import org.apache.geode.cache.control.RebalanceResults;
 import org.apache.geode.cache.persistence.PartitionOfflineException;
+import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.distributed.internal.DistributionMessage;
 import org.apache.geode.distributed.internal.DistributionMessageObserver;
@@ -72,11 +72,14 @@ import org.apache.geode.internal.FileUtil;
 import org.apache.geode.internal.cache.ColocationLogger;
 import org.apache.geode.internal.cache.InitialImageOperation.RequestImageMessage;
 import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
 import org.apache.geode.internal.cache.control.InternalResourceManager;
 import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceObserver;
 import org.apache.geode.test.dunit.Assert;
 import org.apache.geode.test.dunit.AsyncInvocation;
 import org.apache.geode.test.dunit.IgnoredException;
+import org.apache.geode.test.dunit.LogWriterUtils;
+import org.apache.geode.test.dunit.RMIException;
 import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.SerializableCallable;
 import org.apache.geode.test.dunit.SerializableRunnable;
@@ -2088,7 +2091,7 @@ public class PersistentColocatedPartitionedRegionDUnitTest extends PersistentPar
     };
     
     //runnable to create PRs
-    SerializableRunnable createPRs = new SerializableRunnable("region1") {
+    SerializableRunnable createPRs = new SerializableRunnable("createPRs") {
       public void run() {
         Cache cache = getCache();
         
@@ -2112,7 +2115,7 @@ public class PersistentColocatedPartitionedRegionDUnitTest extends PersistentPar
     };
     
     //runnable to close the cache.
-    SerializableRunnable closeCache = new SerializableRunnable("region1") {
+    SerializableRunnable closeCache = new SerializableRunnable("closeCache") {
       public void run() {
         closeCache();
       }
@@ -2120,7 +2123,7 @@ public class PersistentColocatedPartitionedRegionDUnitTest extends PersistentPar
     
     //Runnable to do a bunch of puts handle exceptions
     //due to the fact that member is offline.
-    SerializableRunnable doABunchOfPuts = new SerializableRunnable("region1") {
+    SerializableRunnable doABunchOfPuts = new SerializableRunnable("doABunchOfPuts") {
       public void run() {
         Cache cache = getCache();
         Region region = cache.getRegion(PR_REGION_NAME);
@@ -2200,7 +2203,7 @@ public class PersistentColocatedPartitionedRegionDUnitTest extends PersistentPar
   @Category(FlakyTest.class) // GEODE-506: time sensitive, async actions with 30 sec max
   @Test
   public void testRebalanceWithOfflineChildRegion() throws Throwable {
-    SerializableRunnable createParentPR = new SerializableRunnable() {
+    SerializableRunnable createParentPR = new SerializableRunnable("createParentPR") {
       public void run() {
         Cache cache = getCache();
         
@@ -2220,7 +2223,7 @@ public class PersistentColocatedPartitionedRegionDUnitTest extends PersistentPar
       }
     };
     
-    SerializableRunnable createChildPR = new SerializableRunnable() {
+    SerializableRunnable createChildPR = new SerializableRunnable("createChildPR") {
       public void run() {
         Cache cache = getCache();
         
@@ -2325,7 +2328,6 @@ public class PersistentColocatedPartitionedRegionDUnitTest extends PersistentPar
     };
     
     vm1.invoke(addHook);
-//    vm1.invoke(addHook);
     AsyncInvocation async0;
     AsyncInvocation async1;
     AsyncInvocation async2;
@@ -2335,7 +2337,6 @@ public class PersistentColocatedPartitionedRegionDUnitTest extends PersistentPar
       async1 = vm1.invokeAsync(createPRs);
 
       vm1.invoke(waitForHook);
-//      vm1.invoke(waitForHook);
       
       //Now create the parent region on vm-2. vm-2 did not
       //previous host the child region.
@@ -2347,7 +2348,6 @@ public class PersistentColocatedPartitionedRegionDUnitTest extends PersistentPar
     
     } finally {
       vm1.invoke(removeHook);
-//      vm1.invoke(removeHook);
     }
     
     async0.getResult(MAX_WAIT);
@@ -2473,6 +2473,188 @@ public class PersistentColocatedPartitionedRegionDUnitTest extends PersistentPar
     closeCache();
   }
 
+  @Test
+  public void testParentRegionGetWithOfflineChildRegion() throws Throwable {
+
+    SerializableRunnable createParentPR = new SerializableRunnable("createParentPR") {
+      public void run() {
+        String oldRetryTimeout = System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", "10000");
+        try {
+          Cache cache = getCache();
+          DiskStore ds = cache.findDiskStore("disk");
+          if (ds == null) {
+            ds = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create("disk");
+          }
+          AttributesFactory af = new AttributesFactory();
+          PartitionAttributesFactory paf = new PartitionAttributesFactory();
+          paf.setRedundantCopies(0);
+          paf.setRecoveryDelay(0);
+          af.setPartitionAttributes(paf.create());
+          af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
+          af.setDiskStoreName("disk");
+          cache.createRegion(PR_REGION_NAME, af.create());
+        } finally {
+          System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", String.valueOf(PartitionedRegionHelper.DEFAULT_TOTAL_WAIT_RETRY_ITERATION));
+        }
+      }
+    };
+
+    SerializableRunnable createChildPR = new SerializableRunnable("createChildPR") {
+      public void run() throws InterruptedException {
+        String oldRetryTimeout = System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", "10000");
+        try {
+          Cache cache = getCache();
+          AttributesFactory af = new AttributesFactory();
+          PartitionAttributesFactory paf = new PartitionAttributesFactory();
+          paf.setRedundantCopies(0);
+          paf.setRecoveryDelay(0);
+          paf.setColocatedWith(PR_REGION_NAME);
+          af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
+          af.setDiskStoreName("disk");
+          af.setPartitionAttributes(paf.create());
+          // delay child region creations to cause a delay in persistent recovery
+          Thread.sleep(100);
+          cache.createRegion("region2", af.create());
+        } finally {
+          System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", String.valueOf(PartitionedRegionHelper.DEFAULT_TOTAL_WAIT_RETRY_ITERATION));
+        }
+      }
+    };
+
+    boolean caughtException = false;
+    try {
+      // Expect a get() on the un-recovered (due to offline child) parent region to fail
+      regionGetWithOfflineChild(createParentPR, createChildPR, false);
+    } catch (Exception e) {
+      caughtException = true;
+      assertTrue(e instanceof RMIException);
+      assertTrue(e.getCause() instanceof PartitionOfflineException);
+    }
+    if (!caughtException) {
+      fail("Expected TimeoutException from remote");
+    }
+  }
+
+  @Test
+  public void testParentRegionGetWithRecoveryInProgress() throws Throwable {
+    SerializableRunnable createParentPR = new SerializableRunnable("createParentPR") {
+      public void run() {
+        String oldRetryTimeout = System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", "10000");
+        try {
+          Cache cache = getCache();
+          DiskStore ds = cache.findDiskStore("disk");
+          if (ds == null) {
+            ds = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create("disk");
+          }
+          AttributesFactory af = new AttributesFactory();
+          PartitionAttributesFactory paf = new PartitionAttributesFactory();
+          paf.setRedundantCopies(0);
+          paf.setRecoveryDelay(0);
+          af.setPartitionAttributes(paf.create());
+          af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
+          af.setDiskStoreName("disk");
+          cache.createRegion(PR_REGION_NAME, af.create());
+        } finally {
+          System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", String.valueOf(PartitionedRegionHelper.DEFAULT_TOTAL_WAIT_RETRY_ITERATION));
+        System.out.println("oldRetryTimeout = " + oldRetryTimeout);        }
+      }
+    };
+
+    SerializableRunnable createChildPR = new SerializableRunnable("createChildPR") {
+      public void run() throws InterruptedException {
+        String oldRetryTimeout = System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", "10000");
+        try {
+          Cache cache = getCache();
+          AttributesFactory af = new AttributesFactory();
+          PartitionAttributesFactory paf = new PartitionAttributesFactory();
+          paf.setRedundantCopies(0);
+          paf.setRecoveryDelay(0);
+          paf.setColocatedWith(PR_REGION_NAME);
+          af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
+          af.setDiskStoreName("disk");
+          af.setPartitionAttributes(paf.create());
+          cache.createRegion("region2", af.create());
+        } finally {
+          System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", String.valueOf(PartitionedRegionHelper.DEFAULT_TOTAL_WAIT_RETRY_ITERATION));
+        }
+      }
+    };
+
+    boolean caughtException = false;
+    try {
+      // Expect a get() on the un-recovered (due to offline child) parent region to fail
+      regionGetWithOfflineChild(createParentPR, createChildPR, false);
+    } catch (Exception e) {
+      caughtException = true;
+      assertTrue(e instanceof RMIException);
+      assertTrue(e.getCause() instanceof PartitionOfflineException);
+    }
+    if (!caughtException) {
+      fail("Expected TimeoutException from remote");
+    }
+  }
+
+  @Test
+  public void testParentRegionPutWithRecoveryInProgress() throws Throwable {
+    SerializableRunnable createParentPR = new SerializableRunnable("createParentPR") {
+      public void run() {
+        String oldRetryTimeout = System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", "10000");
+        System.out.println("oldRetryTimeout = " + oldRetryTimeout);
+        try {
+          Cache cache = getCache();
+          DiskStore ds = cache.findDiskStore("disk");
+          if (ds == null) {
+            ds = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create("disk");
+          }
+          AttributesFactory af = new AttributesFactory();
+          PartitionAttributesFactory paf = new PartitionAttributesFactory();
+          paf.setRedundantCopies(0);
+          paf.setRecoveryDelay(0);
+          af.setPartitionAttributes(paf.create());
+          af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
+          af.setDiskStoreName("disk");
+          cache.createRegion(PR_REGION_NAME, af.create());
+        } finally {
+          System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", String.valueOf(PartitionedRegionHelper.DEFAULT_TOTAL_WAIT_RETRY_ITERATION));
+        }
+      }
+    };
+
+    SerializableRunnable createChildPR = new SerializableRunnable("createChildPR") {
+      public void run() throws InterruptedException {
+        String oldRetryTimeout = System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", "10000");
+        try {
+          Cache cache = getCache();
+          AttributesFactory af = new AttributesFactory();
+          PartitionAttributesFactory paf = new PartitionAttributesFactory();
+          paf.setRedundantCopies(0);
+          paf.setRecoveryDelay(0);
+          paf.setColocatedWith(PR_REGION_NAME);
+          af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
+          af.setDiskStoreName("disk");
+          af.setPartitionAttributes(paf.create());
+          Thread.sleep(1000);
+          cache.createRegion("region2", af.create());
+        } finally {
+          System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", String.valueOf(PartitionedRegionHelper.DEFAULT_TOTAL_WAIT_RETRY_ITERATION));
+        }
+      }
+    };
+
+    boolean caughtException = false;
+    try {
+      // Expect a get() on the un-recovered (due to offline child) parent region to fail
+      regionGetWithOfflineChild(createParentPR, createChildPR, false);
+    } catch (Exception e) {
+      caughtException = true;
+      assertTrue(e instanceof RMIException);
+      assertTrue(e.getCause() instanceof PartitionOfflineException);
+    }
+    if (!caughtException) {
+      fail("Expected TimeoutException from remote");
+    }
+  }
+
   /**
    * Create three PRs on a VM, named region1, region2, and region3.
    * The colocated with attribute describes which region region3 
@@ -2523,15 +2705,15 @@ public class PersistentColocatedPartitionedRegionDUnitTest extends PersistentPar
     vm1.invoke(createParentPR);
     vm0.invoke(createChildPR);
     vm1.invoke(createChildPR);
-    
+
     //Create some buckets.
     createData(vm0, 0, NUM_BUCKETS, "a");
     createData(vm0, 0, NUM_BUCKETS, "a", "region2");
-    
+
     //Close the members
     closeCache(vm1);
     closeCache(vm0);
-    
+
     //Recreate the parent region. Try to make sure that
     //the member with the latest copy of the buckets
     //is the one that decides to throw away it's copy
@@ -2540,18 +2722,17 @@ public class PersistentColocatedPartitionedRegionDUnitTest extends PersistentPar
     AsyncInvocation async1 = vm1.invokeAsync(createParentPR);
     async0.getResult(MAX_WAIT);
     async1.getResult(MAX_WAIT);
-    
 
     //Now create the parent region on vm-2. vm-2 did not
     //previous host the child region.
     vm2.invoke(createParentPR);
-    
+
     //Rebalance the parent region.
     //This should not move any buckets, because
     //we haven't recovered the child region
     RebalanceResults rebalanceResults = rebalance(vm2);
     assertEquals(0, rebalanceResults.getTotalBucketTransfersCompleted());
-    
+
     //Recreate the child region. 
     async1 = vm1.invokeAsync(createChildPR);
     async0 = vm0.invokeAsync(createChildPR);
@@ -2568,6 +2749,206 @@ public class PersistentColocatedPartitionedRegionDUnitTest extends PersistentPar
     createData(vm0, 0, NUM_BUCKETS, "c", "region2");
   }
 
+  /**
+   * Create a colocated pair of persistent regions and populate them with data. Shut down the servers and then
+   * restart them and check the data.
+   * <p>
+   * On the restart, try region operations ({@code get()}) on the parent region before or during persistent recovery.
+   * The {@code concurrentCheckData} argument determines whether the operation from the parent region occurs before
+   * or concurrent with the child region creation and recovery.
+   *
+   * @param createParentPR {@link SerializableRunnable} for creating the parent region on one member
+   * @param createChildPR {@link SerializableRunnable} for creating the child region on one member
+   * @param concurrentCheckData
+   * @throws Throwable
+   */
+  public void regionGetWithOfflineChild(
+      SerializableRunnable createParentPR,
+      SerializableRunnable createChildPR,
+      boolean concurrentCheckData) throws Throwable {
+    Host host = Host.getHost(0);
+    final VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+
+    //Create the PRs on two members
+    vm0.invoke(createParentPR);
+    vm1.invoke(createParentPR);
+    vm0.invoke(createChildPR);
+    vm1.invoke(createChildPR);
+
+    //Create some buckets.
+    createData(vm0, 0, NUM_BUCKETS, "a");
+    createData(vm0, 0, NUM_BUCKETS, "a", "region2");
+
+    //Close the members
+    closeCache(vm1);
+    closeCache(vm0);
+
+    SerializableRunnable checkDataOnParent = (new SerializableRunnable("checkDataOnParent") {
+      @Override
+      public void run() {
+        Cache cache = getCache();
+        Region region = cache.getRegion(PR_REGION_NAME);
+
+        for (int i = 0; i < NUM_BUCKETS; i++) {
+          assertEquals("For key " + i, "a", region.get(i));
+        }
+      }
+    });
+
+    try {
+      //Recreate the parent region. Try to make sure that
+      //the member with the latest copy of the buckets
+      //is the one that decides to throw away it's copy
+      //by starting it last.
+      AsyncInvocation async0 = vm0.invokeAsync(createParentPR);
+      AsyncInvocation async1 = vm1.invokeAsync(createParentPR);
+      async0.getResult(MAX_WAIT);
+      async1.getResult(MAX_WAIT);
+      //Now create the parent region on vm-2. vm-2 did not
+      //previously host the child region.
+      vm2.invoke(createParentPR);
+
+      AsyncInvocation async2 = null;
+      AsyncInvocation asyncCheck = null;
+      if (concurrentCheckData) {
+        //Recreate the child region.
+        async1 = vm1.invokeAsync(createChildPR);
+        async0 = vm0.invokeAsync(createChildPR);
+        async2 = vm2.invokeAsync(new SerializableRunnable("delay") {
+          @Override
+          public void run() throws InterruptedException {
+            Thread.sleep(100);
+            vm2.invoke(createChildPR);
+          }
+        });
+
+        asyncCheck = vm0.invokeAsync(checkDataOnParent);
+      } else {
+        vm0.invoke(checkDataOnParent);
+      }
+      async0.getResult(MAX_WAIT);
+      async1.getResult(MAX_WAIT);
+      async2.getResult(MAX_WAIT);
+      asyncCheck.getResult(MAX_WAIT);
+      //Validate the data
+      checkData(vm0, 0, NUM_BUCKETS, "a");
+      checkData(vm0, 0, NUM_BUCKETS, "a", "region2");
+      //Make sure we can actually use the buckets in the child region.
+      createData(vm0, 0, NUM_BUCKETS, "c", "region2");
+    } finally {
+      //Close the members
+      closeCache(vm1);
+      closeCache(vm0);
+      closeCache(vm2);
+    }
+  }
+  /**
+   * Create a colocated pair of persistent regions and populate them with data. Shut down the servers and then
+   * restart them.
+   * <p>
+   * On the restart, try region operations ({@code put()}) on the parent region before or during persistent recovery.
+   * The {@code concurrentCreatekData} argument determines whether the operation from the parent region occurs before
+   * or concurrent with the child region creation and recovery.
+   *
+   * @param createParentPR {@link SerializableRunnable} for creating the parent region on one member
+   * @param createChildPR {@link SerializableRunnable} for creating the child region on one member
+   * @param concurrentCreateData
+   * @throws Throwable
+   */
+  public void regionPutWithOfflineChild(
+      SerializableRunnable createParentPR,
+      SerializableRunnable createChildPR,
+      boolean concurrentCreateData) throws Throwable {
+    Host host = Host.getHost(0);
+    final VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+
+    SerializableRunnable checkDataOnParent = (new SerializableRunnable("checkDataOnParent") {
+      @Override
+      public void run() {
+        Cache cache = getCache();
+        Region region = cache.getRegion(PR_REGION_NAME);
+
+        for (int i = 0; i < NUM_BUCKETS; i++) {
+          assertEquals("For key " + i, "a", region.get(i));
+        }
+      }
+    });
+
+    SerializableRunnable createDataOnParent = new SerializableRunnable("createDataOnParent") {
+
+      public void run() {
+        Cache cache = getCache();
+        LogWriterUtils.getLogWriter().info("creating data in " + PR_REGION_NAME);
+        Region region = cache.getRegion(PR_REGION_NAME);
+
+        for (int i = 0; i < NUM_BUCKETS; i++) {
+          region.put(i, "c");
+          assertEquals("For key " + i, "c", region.get(i));
+        }
+      }
+    };
+
+    //Create the PRs on two members
+    vm0.invoke(createParentPR);
+    vm1.invoke(createParentPR);
+    vm0.invoke(createChildPR);
+    vm1.invoke(createChildPR);
+
+    //Create some buckets.
+    createData(vm0, 0, NUM_BUCKETS, "a");
+    createData(vm0, 0, NUM_BUCKETS, "a", "region2");
+
+    //Close the members
+    closeCache(vm1);
+    closeCache(vm0);
+
+    try {
+      //Recreate the parent region. Try to make sure that
+      //the member with the latest copy of the buckets
+      //is the one that decides to throw away it's copy
+      //by starting it last.
+      AsyncInvocation async0 = vm0.invokeAsync(createParentPR);
+      AsyncInvocation async1 = vm1.invokeAsync(createParentPR);
+      async0.getResult(MAX_WAIT);
+      async1.getResult(MAX_WAIT);
+      //Now create the parent region on vm-2. vm-2 did not
+      //previous host the child region.
+      vm2.invoke(createParentPR);
+
+      AsyncInvocation async2 = null;
+      AsyncInvocation asyncPut = null;
+      if (concurrentCreateData) {
+        //Recreate the child region.
+        async1 = vm1.invokeAsync(createChildPR);
+        async0 = vm0.invokeAsync(createChildPR);
+        async2 = vm2.invokeAsync(createChildPR);
+
+        Thread.sleep(100);
+        asyncPut = vm0.invokeAsync(createDataOnParent);
+      } else {
+        vm0.invoke(createDataOnParent);
+      }
+      async0.getResult(MAX_WAIT);
+      async1.getResult(MAX_WAIT);
+      async2.getResult(MAX_WAIT);
+      asyncPut.getResult(MAX_WAIT);
+      //Validate the data
+      checkData(vm0, 0, NUM_BUCKETS, "c");
+      checkData(vm0, 0, NUM_BUCKETS, "a", "region2");
+      //Make sure we can actually use the buckets in the child region.
+      createData(vm0, 0, NUM_BUCKETS, "c", "region2");
+    } finally {
+      //Close the members
+      closeCache(vm1);
+      closeCache(vm0);
+      closeCache(vm2);
+    }
+  }
+
   private RebalanceResults rebalance(VM vm) {
     return (RebalanceResults) vm.invoke(new SerializableCallable() {