You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/10/31 20:56:13 UTC
[33/50] [abbrv] incubator-geode git commit: GEODE-538: Add check for
persistent data recovery
GEODE-538: Add check for persistent data recovery
PartitionedRegion.getNodeForBucketReadOrLoad can return an invalid node
if persistent data recovery is in process and a get() targets a bucket
that
hasn't been recoverd yet. This can result in returning an incorrect
value (null) or throwing ConflictingPersistentDataException from a get()
or put() on the region.
This change adds a check for persistent recovery to be completed
before creating the new bucket. If recovery isn't complete then the
operation on the region will fail with a PartitionOfflineException.
Queries on a region while persistent recovery is in progress can also
result in incorrect results so a similar check is added to
DefaultQuery.checkQueryOnPR.
This closes #264
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/11ef3ebb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/11ef3ebb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/11ef3ebb
Branch: refs/heads/feature/GEODE-1930
Commit: 11ef3ebbe30a8340f57776bf4063684b91ccd0a3
Parents: 7511ffa
Author: Ken Howe <kh...@pivotal.io>
Authored: Thu Oct 6 15:02:24 2016 -0700
Committer: Anil <ag...@pivotal.io>
Committed: Wed Oct 19 15:49:33 2016 -0700
----------------------------------------------------------------------
.../org/apache/geode/cache/query/Query.java | 12 +
.../cache/query/internal/DefaultQuery.java | 6 +-
.../internal/cache/PRHARedundancyProvider.java | 9 +-
.../geode/internal/cache/PartitionedRegion.java | 18 +-
.../geode/internal/i18n/LocalizedStrings.java | 1 +
.../partitioned/PRBasicQueryDUnitTest.java | 221 ++++++++++
.../query/partitioned/PRQueryDUnitHelper.java | 185 +++++++++
...tentColocatedPartitionedRegionDUnitTest.java | 411 ++++++++++++++++++-
8 files changed, 844 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/11ef3ebb/geode-core/src/main/java/org/apache/geode/cache/query/Query.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/Query.java b/geode-core/src/main/java/org/apache/geode/cache/query/Query.java
index e27687d..670b262 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/Query.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/Query.java
@@ -89,6 +89,9 @@ public interface Query {
* @throws QueryExecutionLowMemoryException
* If the query gets canceled due to low memory conditions and
* the resource manager critical heap percentage has been set
+ * @throws PartitionOfflineException
+ * If persistent data recovery is not complete for a partitioned
+ * region referred to in the query.
*/
public Object execute()
throws FunctionDomainException, TypeMismatchException, NameResolutionException,
@@ -150,6 +153,9 @@ public interface Query {
* @throws QueryExecutionLowMemoryException
* If the query gets canceled due to low memory conditions and
* the resource manager critical heap percentage has been set
+ * @throws PartitionOfflineException
+ * If persistent data recovery is not complete for a partitioned
+ * region referred to in the query.
*
*/
public Object execute(Object[] params)
@@ -220,6 +226,9 @@ public interface Query {
* @throws QueryExecutionLowMemoryException
* If the query gets canceled due to low memory conditions and
* the resource manager critical heap percentage has been set
+ * @throws PartitionOfflineException
+ * If persistent data recovery is not complete for a partitioned
+ * region referred to in the query.
*/
public Object execute(RegionFunctionContext context)
throws FunctionDomainException, TypeMismatchException, NameResolutionException,
@@ -291,6 +300,9 @@ public interface Query {
* @throws QueryExecutionLowMemoryException
* If the query gets canceled due to low memory conditions and
* the resource manager critical heap percentage has been set
+ * @throws PartitionOfflineException
+ * If persistent data recovery is not complete for a partitioned
+ * region referred to in the query.
*/
public Object execute(RegionFunctionContext context, Object[] params)
throws FunctionDomainException, TypeMismatchException, NameResolutionException,
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/11ef3ebb/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java
index 58df390..8175d82 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQuery.java
@@ -27,11 +27,14 @@ import org.apache.geode.cache.client.internal.UserAttributes;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.RegionFunctionContext;
import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.cache.persistence.PartitionOfflineException;
+import org.apache.geode.cache.persistence.PersistentID;
import org.apache.geode.cache.query.*;
import org.apache.geode.cache.query.internal.cq.InternalCqQuery;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.internal.NanoTimer;
import org.apache.geode.internal.cache.*;
+import org.apache.geode.internal.cache.partitioned.RegionAdvisor;
import org.apache.geode.internal.i18n.LocalizedStrings;
import java.util.*;
@@ -581,7 +584,7 @@ public class DefaultQuery implements Query {
}
- private QueryExecutor checkQueryOnPR(Object[] parameters) throws RegionNotFoundException {
+ private QueryExecutor checkQueryOnPR(Object[] parameters) throws RegionNotFoundException, PartitionOfflineException {
// check for PartititionedRegions. If a PartitionedRegion is referred to in the query,
// then the following restrictions apply:
@@ -601,6 +604,7 @@ public class DefaultQuery implements Query {
throw new RegionNotFoundException(LocalizedStrings.DefaultQuery_REGION_NOT_FOUND_0.toLocalizedString(regionPath));
}
if (rgn instanceof QueryExecutor) {
+ ((PartitionedRegion)rgn).checkPROffline();
prs.add((QueryExecutor)rgn);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/11ef3ebb/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java
index cfedb67..6245c37 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java
@@ -24,6 +24,7 @@ import org.apache.geode.cache.PartitionedRegionStorageException;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.persistence.PartitionOfflineException;
+import org.apache.geode.cache.persistence.PersistentID;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.DM;
import org.apache.geode.distributed.internal.DistributionConfig;
@@ -495,16 +496,20 @@ public class PRHARedundancyProvider
* redundancy.
* @throws PartitionedRegionException
* if d-lock can not be acquired to create bucket.
- *
+ * @throws PartitionOfflineException
+ * if persistent data recovery is not complete for a partitioned
+ * region referred to in the query.
*/
public InternalDistributedMember
createBucketAtomically(final int bucketId,
final int newBucketSize,
final long startTime,
final boolean finishIncompleteCreation, String partitionName) throws PartitionedRegionStorageException,
- PartitionedRegionException
+ PartitionedRegionException, PartitionOfflineException
{
final boolean isDebugEnabled = logger.isDebugEnabled();
+
+ prRegion.checkPROffline();
// If there are insufficient stores throw *before* we try acquiring the
// (very expensive) bucket lock or the (somewhat expensive) monitor on this
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/11ef3ebb/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index baab79f..f7ecdaf 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -28,6 +28,8 @@ import org.apache.geode.cache.client.internal.*;
import org.apache.geode.cache.execute.*;
import org.apache.geode.cache.partition.PartitionListener;
import org.apache.geode.cache.partition.PartitionNotAvailableException;
+import org.apache.geode.cache.persistence.PartitionOfflineException;
+import org.apache.geode.cache.persistence.PersistentID;
import org.apache.geode.cache.query.*;
import org.apache.geode.cache.query.internal.*;
import org.apache.geode.cache.query.internal.index.*;
@@ -1397,6 +1399,21 @@ public class PartitionedRegion extends LocalRegion implements
new UpdateAttributesProcessor(this).distribute(false);
}
+ /**
+ * Throw an exception if persistent data recovery from disk is not complete
+ * for this region.
+ *
+ * @throws PartitionOfflineException
+ */
+ public void checkPROffline() throws PartitionOfflineException {
+ if (getDataPolicy().withPersistence() && !recoveredFromDisk) {
+ Set<PersistentID> persistIds = new HashSet(getRegionAdvisor().advisePersistentMembers().values());
+ persistIds.removeAll(getRegionAdvisor().adviseInitializedPersistentMembers().values());
+ throw new PartitionOfflineException(persistIds, LocalizedStrings.PRHARedundancyProvider_PARTITIONED_REGION_0_OFFLINE_HAS_UNRECOVERED_PERSISTENT_DATA_1
+ .toLocalizedString(new Object[] { getFullPath(), persistIds}));
+ }
+ }
+
public final void updatePRConfig(PartitionRegionConfig prConfig,
boolean putOnlyIfUpdated) {
final Set<Node> nodes = prConfig.getNodes();
@@ -3057,7 +3074,6 @@ public class PartitionedRegion extends LocalRegion implements
final RetryTimeKeeper snoozer) {
final boolean isDebugEnabled = logger.isDebugEnabled();
-// InternalDistributedSystem ids = (InternalDistributedSystem)this.cache.getDistributedSystem();
RetryTimeKeeper localSnoozer = snoozer;
// Prevent early access to buckets that are not completely created/formed
// and
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/11ef3ebb/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java b/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
index 8bfdd68..7d762b8 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
@@ -702,6 +702,7 @@ public class LocalizedStrings {
public static final StringId AbstractDistributionConfig_CLIENT_CONFLATION_PROP_NAME = new StringId(1839, "Client override for server queue conflation setting");
public static final StringId PRHARRedundancyProvider_ALLOCATE_ENOUGH_MEMBERS_TO_HOST_BUCKET = new StringId(1840, "allocate enough members to host bucket.");
public static final StringId PRHARedundancyProvider_TIME_OUT_WAITING_0_MS_FOR_CREATION_OF_BUCKET_FOR_PARTITIONED_REGION_1_MEMBERS_REQUESTED_TO_CREATE_THE_BUCKET_ARE_2 = new StringId(1841, "Time out waiting {0} ms for creation of bucket for partitioned region {1}. Members requested to create the bucket are: {2}");
+ public static final StringId PRHARedundancyProvider_PARTITIONED_REGION_0_OFFLINE_HAS_UNRECOVERED_PERSISTENT_DATA_1 = new StringId(1842, "Partitioned Region {0} is offline due to unrecovered persistent data, {1}");
public static final StringId PUT_0_FAILED_TO_PUT_ENTRY_FOR_REGION_1_KEY_2_VALUE_3 = new StringId(1843, "{0}: Failed to put entry for region {1} key {2} value {3}");
public static final StringId PUT_0_UNEXPECTED_EXCEPTION = new StringId(1844, "{0}: Unexpected Exception");
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/11ef3ebb/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRBasicQueryDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRBasicQueryDUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRBasicQueryDUnitTest.java
index 8ef907a..224a7e0 100755
--- a/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRBasicQueryDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRBasicQueryDUnitTest.java
@@ -29,6 +29,7 @@ import static org.apache.geode.cache.query.Utils.*;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.Region;
+import org.apache.geode.cache.persistence.PartitionOfflineException;
import org.apache.geode.cache.query.Index;
import org.apache.geode.cache.query.IndexType;
import org.apache.geode.cache.query.Query;
@@ -38,6 +39,7 @@ import org.apache.geode.cache.query.data.Portfolio;
import org.apache.geode.cache.query.data.PortfolioData;
import org.apache.geode.cache30.CacheSerializableRunnable;
import org.apache.geode.internal.cache.PartitionedRegionDUnitTestCase;
+import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.VM;
@@ -67,6 +69,8 @@ public class PRBasicQueryDUnitTest extends PartitionedRegionDUnitTestCase
}
}
+ private final static int MAX_SYNC_WAIT = 30 * 1000;
+
PRQueryDUnitHelper PRQHelp = new PRQueryDUnitHelper();
final String name = "Portfolios";
@@ -153,6 +157,223 @@ public class PRBasicQueryDUnitTest extends PartitionedRegionDUnitTestCase
"PRQBasicQueryDUnitTest#testPRBasicQuerying: Querying PR's Test ENDED");
}
+ /**
+ * A basic dunit test that <br>
+ * 1. Creates a PR and colocated child region Accessor and Data Store with redundantCopies = 0.
+ * 2. Populates the region with test data.
+ * 3. Fires a query on accessor VM and verifies the result.
+ * 4. Shuts down the caches, then restarts them asynchronously
+ * 5. Attempt the query while the regions are being recovered
+ * @throws Exception
+ */
+ @Test
+ public void testColocatedPRQueryDuringRecovery() throws Exception
+ {
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ setCacheInVMs(vm0, vm1);
+ LogWriterUtils.getLogWriter()
+ .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Querying PR Test with DACK Started");
+
+ // Creting PR's on the participating VM's
+ // Creating Accessor node on the VM0.
+ LogWriterUtils.getLogWriter()
+ .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Creating the Accessor node in the PR");
+
+ vm0.invoke(PRQHelp.getCacheSerializableRunnableForColocatedPRCreate(name,
+ redundancy, PortfolioData.class, true));
+ // Creating local region on vm0 to compare the results of query.
+ vm0.invoke(PRQHelp.getCacheSerializableRunnableForLocalRegionCreation(localName, PortfolioData.class));
+ LogWriterUtils.getLogWriter()
+ .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Successfully created the Accessor node in the PR");
+
+ // Creating the Datastores Nodes in the VM1.
+ LogWriterUtils.getLogWriter()
+ .info("PRQBasicQueryDUnitTest:testColocatedPRBasicQuerying ----- Creating the Datastore node in the PR");
+ vm1.invoke(PRQHelp.getCacheSerializableRunnableForColocatedPRCreate(name,
+ redundancy, PortfolioData.class, true));
+
+ LogWriterUtils.getLogWriter()
+ .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Successfully Created the Datastore node in the PR");
+
+ LogWriterUtils.getLogWriter()
+ .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Successfully Created PR's across all VM's");
+
+ // Generating portfolio object array to be populated across the PR's & Local
+ // Regions
+
+ final PortfolioData[] portfolio = createPortfolioData(cnt, cntDest);
+ // Putting the data into the PR's created
+ vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRPuts(name, portfolio,
+ cnt, cntDest));
+ vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRDuplicatePuts(name, portfolio,
+ cnt, cntDest));
+
+ LogWriterUtils.getLogWriter()
+ .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Inserted Portfolio data across PR's");
+ vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRPuts(localName,
+ portfolio, cnt, cntDest));
+ vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRDuplicatePuts(localName,
+ portfolio, cnt, cntDest));
+
+ // querying the VM for data and comparing the result with query result of
+ // local region.
+ // querying the VM for data
+ vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRQueryAndCompareResults(
+ name, localName));
+
+ LogWriterUtils.getLogWriter()
+ .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Querying PR's 1st pass ENDED");
+
+ // Shut everything down and then restart to test queries during recovery
+ vm0.invoke(PRQHelp.getCacheSerializableRunnableForCloseCache());
+ vm1.invoke(PRQHelp.getCacheSerializableRunnableForCloseCache());
+
+ // Re-create the regions - only create the parent regions on the datastores
+ setCacheInVMs(vm0, vm1);
+ LogWriterUtils.getLogWriter()
+ .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Creating the Accessor node in the PR");
+ vm0.invoke(PRQHelp.getCacheSerializableRunnableForColocatedParentCreate(name,
+ redundancy, PortfolioData.class, true));
+
+ // Creating local region on vm0 to compare the results of query.
+ vm0.invoke(PRQHelp.getCacheSerializableRunnableForLocalRegionCreation(localName, PortfolioData.class));
+ LogWriterUtils.getLogWriter()
+ .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Successfully created the Accessor node in the PR");
+ LogWriterUtils.getLogWriter()
+ .info("PRQBasicQueryDUnitTest:testColocatedPRBasicQuerying: re-creating the Datastore node in the PR");
+ vm1.invoke(PRQHelp.getCacheSerializableRunnableForColocatedParentCreate(name,
+ redundancy, PortfolioData.class, true));
+
+ // Now start the child regions asynchronously so queries will happen during persistent recovery
+ AsyncInvocation vm0PR = vm0.invokeAsync(PRQHelp.getCacheSerializableRunnableForColocatedChildCreate(name,
+ redundancy, PortfolioData.class, true));
+ AsyncInvocation vm1PR = vm1.invokeAsync(PRQHelp.getCacheSerializableRunnableForColocatedChildCreate(name,
+ redundancy, PortfolioData.class, true));
+
+ // delay the query to let the recovery get underway
+ Thread.sleep(100);
+
+ try {
+ // This is a repeat of the original query from before closing and restarting the datastores. This time
+ // it should fail due to persistent recovery that has not completed.
+ vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRQueryAndCompareResults(name, localName, true));
+ fail("Expected PartitionOfflineException when queryiong a region with offline colocated child");
+ } catch (Exception e) {
+ if (!(e.getCause() instanceof PartitionOfflineException)) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+ LogWriterUtils.getLogWriter()
+ .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Querying PR's 2nd pass (after restarting regions) ENDED");
+ }
+
+ /**
+ * A basic dunit test that <br>
+ * 1. Creates a PR and colocated child region Accessor and Data Store with redundantCopies = 0.
+ * 2. Populates the region with test data.
+ * 3. Fires a query on accessor VM and verifies the result.
+ * 4. Shuts down the caches, then restarts them asynchronously, but don't restart the child region
+ * 5. Attempt the query while the region offline because of the missing child region
+ * @throws Exception
+ */
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testColocatedPRQueryDuringRecoveryWithMissingColocatedChild() throws Exception
+ {
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ setCacheInVMs(vm0, vm1);
+ LogWriterUtils.getLogWriter()
+ .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Querying PR Test with DACK Started");
+
+ // Creting PR's on the participating VM's
+ // Creating Accessor node on the VM0.
+ LogWriterUtils.getLogWriter()
+ .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Creating the Accessor node in the PR");
+
+ vm0.invoke(PRQHelp.getCacheSerializableRunnableForColocatedPRCreate(name,
+ redundancy, PortfolioData.class, true));
+ // Creating local region on vm0 to compare the results of query.
+ vm0.invoke(PRQHelp.getCacheSerializableRunnableForLocalRegionCreation(localName, PortfolioData.class));
+ LogWriterUtils.getLogWriter()
+ .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Successfully created the Accessor node in the PR");
+
+ // Creating the Datastores Nodes in the VM1.
+ LogWriterUtils.getLogWriter()
+ .info("PRQBasicQueryDUnitTest:testColocatedPRBasicQuerying ----- Creating the Datastore node in the PR");
+ vm1.invoke(PRQHelp.getCacheSerializableRunnableForColocatedPRCreate(name,
+ redundancy, PortfolioData.class, true));
+
+ LogWriterUtils.getLogWriter()
+ .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Successfully Created the Datastore node in the PR");
+
+ LogWriterUtils.getLogWriter()
+ .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Successfully Created PR's across all VM's");
+
+ // Generating portfolio object array to be populated across the PR's & Local
+ // Regions
+
+ final PortfolioData[] portfolio = createPortfolioData(cnt, cntDest);
+ // Putting the data into the PR's created
+ vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRPuts(name, portfolio,
+ cnt, cntDest));
+ vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRDuplicatePuts(name, portfolio,
+ cnt, cntDest));
+
+ LogWriterUtils.getLogWriter()
+ .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Inserted Portfolio data across PR's");
+ vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRPuts(localName,
+ portfolio, cnt, cntDest));
+ vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRDuplicatePuts(localName,
+ portfolio, cnt, cntDest));
+
+ // querying the VM for data and comparing the result with query result of
+ // local region.
+ // querying the VM for data
+ vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRQueryAndCompareResults(
+ name, localName));
+
+ LogWriterUtils.getLogWriter()
+ .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Querying PR's 1st pass ENDED");
+
+ // Shut everything down and then restart to test queries during recovery
+ vm0.invoke(PRQHelp.getCacheSerializableRunnableForCloseCache());
+ vm1.invoke(PRQHelp.getCacheSerializableRunnableForCloseCache());
+
+ // Re-create the only the parent region
+ setCacheInVMs(vm0, vm1);
+ LogWriterUtils.getLogWriter()
+ .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Creating the Accessor node in the PR");
+ vm0.invoke(PRQHelp.getCacheSerializableRunnableForColocatedParentCreate(name,
+ redundancy, PortfolioData.class, true));
+
+ // Creating local region on vm0 to compare the results of query.
+ vm0.invoke(PRQHelp.getCacheSerializableRunnableForLocalRegionCreation(localName, PortfolioData.class));
+ LogWriterUtils.getLogWriter()
+ .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Successfully created the Accessor node in the PR");
+ LogWriterUtils.getLogWriter()
+ .info("PRQBasicQueryDUnitTest:testColocatedPRBasicQuerying ----- re-creating the Datastore node in the PR");
+ vm1.invoke(PRQHelp.getCacheSerializableRunnableForColocatedParentCreate(name,
+ redundancy, PortfolioData.class, true));
+
+ try {
+ // This is a repeat of the original query from before closing and restarting the datastores. This time
+ // it should fail due to persistent recovery that has not completed.
+ vm0.invoke(PRQHelp.getCacheSerializableRunnableForPRQueryAndCompareResults(name, localName, true));
+ fail("Expected PartitionOfflineException when queryiong a region with offline colocated child");
+ } catch (Exception e) {
+ if (!(e.getCause() instanceof PartitionOfflineException)) {
+ throw e;
+ }
+ }
+ LogWriterUtils.getLogWriter()
+ .info("PRQBasicQueryDUnitTest#testColocatedPRBasicQuerying: Querying PR's 2nd pass (after restarting regions) ENDED");
+ }
+
@Test
public void testPRCountStarQuery() throws Exception
{
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/11ef3ebb/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRQueryDUnitHelper.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRQueryDUnitHelper.java b/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRQueryDUnitHelper.java
index cfb4190..9dc90fd 100755
--- a/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRQueryDUnitHelper.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRQueryDUnitHelper.java
@@ -39,6 +39,7 @@ import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.DiskStore;
import org.apache.geode.cache.EntryExistsException;
import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.cache.PartitionAttributes;
@@ -249,6 +250,190 @@ public class PRQueryDUnitHelper implements Serializable {
return (CacheSerializableRunnable)createPrRegion;
}
+ /**
+ * This function creates a colocated pair of PR's given the scope & the
+ * redundancy parameters for the parent *
+ *
+ * @param regionName
+ * @param redundancy
+ * @param constraint
+ * @param makePersistent
+ * @return cacheSerializable object
+ */
+ public CacheSerializableRunnable getCacheSerializableRunnableForColocatedPRCreate(
+ final String regionName, final int redundancy, final Class constraint, boolean makePersistent) {
+
+ final String childRegionName = regionName + "Child";
+ final String diskName = "disk";
+ SerializableRunnable createPrRegion;
+ createPrRegion = new CacheSerializableRunnable(regionName) {
+ @Override
+ public void run2() throws CacheException
+ {
+
+ Cache cache = getCache();
+ Region partitionedregion = null;
+ Region childRegion = null;
+ AttributesFactory attr = new AttributesFactory();
+ attr.setValueConstraint(constraint);
+ if (makePersistent) {
+ DiskStore ds = cache.findDiskStore(diskName);
+ if (ds == null) {
+ ds = cache.createDiskStoreFactory().setDiskDirs(JUnit4CacheTestCase.getDiskDirs())
+ .create(diskName);
+ }
+ attr.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
+ attr.setDiskStoreName(diskName);
+ } else {
+ attr.setDataPolicy(DataPolicy.PARTITION);
+ attr.setDiskStoreName(null);
+ }
+
+ PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ paf.setRedundantCopies(redundancy);
+ attr.setPartitionAttributes(paf.create());
+
+ // parent region
+ partitionedregion = cache.createRegion(regionName, attr.create());
+ assertNotNull(
+ "PRQueryDUnitHelper#getCacheSerializableRunnableForPRCreateWithRedundancy: Partitioned Region "
+ + regionName + " not in cache", cache.getRegion(regionName));
+ assertNotNull(
+ "PRQueryDUnitHelper#getCacheSerializableRunnableForPRCreateWithRedundancy: Partitioned Region ref null",
+ partitionedregion);
+ assertTrue(
+ "PRQueryDUnitHelper#getCacheSerializableRunnableForPRCreateWithRedundancy: Partitioned Region ref claims to be destroyed",
+ !partitionedregion.isDestroyed());
+
+ // child region
+ attr.setValueConstraint(constraint);
+ paf.setColocatedWith(regionName);
+ attr.setPartitionAttributes(paf.create());
+ childRegion = cache.createRegion(childRegionName, attr.create());
+ }
+ };
+
+ return (CacheSerializableRunnable)createPrRegion;
+ }
+
+ /**
+ * This function creates the parent region of colocated pair of PR's given the scope & the
+ * redundancy parameters for the parent *
+ *
+ * @param regionName
+ * @param redundancy
+ * @param constraint
+ * @param makePersistent
+ * @return cacheSerializable object
+ */
+ public CacheSerializableRunnable getCacheSerializableRunnableForColocatedParentCreate(
+ final String regionName, final int redundancy, final Class constraint, boolean makePersistent) {
+
+ final String childRegionName = regionName + "Child";
+ final String diskName = "disk";
+ SerializableRunnable createPrRegion;
+ createPrRegion = new CacheSerializableRunnable(regionName + "-NoChildRegion") {
+ @Override
+ public void run2() throws CacheException
+ {
+
+ Cache cache = getCache();
+ Region partitionedregion = null;
+ Region childRegion = null;
+ AttributesFactory attr = new AttributesFactory();
+ attr.setValueConstraint(constraint);
+ if (makePersistent) {
+ DiskStore ds = cache.findDiskStore(diskName);
+ if (ds == null) {
+ ds = cache.createDiskStoreFactory().setDiskDirs(JUnit4CacheTestCase.getDiskDirs())
+ .create(diskName);
+ }
+ attr.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
+ attr.setDiskStoreName(diskName);
+ } else {
+ attr.setDataPolicy(DataPolicy.PARTITION);
+ attr.setDiskStoreName(null);
+ }
+
+ PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ paf.setRedundantCopies(redundancy);
+ attr.setPartitionAttributes(paf.create());
+
+ // parent region
+ partitionedregion = cache.createRegion(regionName, attr.create());
+ assertNotNull(
+ "PRQueryDUnitHelper#getCacheSerializableRunnableForPRCreateWithRedundancy: Partitioned Region "
+ + regionName + " not in cache", cache.getRegion(regionName));
+ assertNotNull(
+ "PRQueryDUnitHelper#getCacheSerializableRunnableForPRCreateWithRedundancy: Partitioned Region ref null",
+ partitionedregion);
+ assertTrue(
+ "PRQueryDUnitHelper#getCacheSerializableRunnableForPRCreateWithRedundancy: Partitioned Region ref claims to be destroyed",
+ !partitionedregion.isDestroyed());
+ }
+ };
+
+ return (CacheSerializableRunnable)createPrRegion;
+ }
+
+ /**
+ * This function creates the parent region of colocated pair of PR's given the scope & the
+ * redundancy parameters for the parent *
+ *
+ * @param regionName
+ * @param redundancy
+ * @param constraint
+ * @param isPersistent
+ * @return cacheSerializable object
+ */
+ public CacheSerializableRunnable getCacheSerializableRunnableForColocatedChildCreate(
+ final String regionName, final int redundancy, final Class constraint, boolean isPersistent) {
+
+ final String childRegionName = regionName + "Child";
+ final String diskName = "disk";
+ SerializableRunnable createPrRegion;
+ createPrRegion = new CacheSerializableRunnable(regionName + "-ChildRegion") {
+ @Override
+ public void run2() throws CacheException
+ {
+
+ Cache cache = getCache();
+ Region partitionedregion = null;
+ Region childRegion = null;
+ AttributesFactory attr = new AttributesFactory();
+ attr.setValueConstraint(constraint);
+ if (isPersistent) {
+ DiskStore ds = cache.findDiskStore(diskName);
+ if (ds == null) {
+// ds = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs())
+ ds = cache.createDiskStoreFactory().setDiskDirs(org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase.getDiskDirs())
+ .create(diskName);
+ }
+ attr.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
+ attr.setDiskStoreName(diskName);
+ } else {
+ attr.setDataPolicy(DataPolicy.PARTITION);
+ attr.setDiskStoreName(null);
+ }
+
+ PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ paf.setRedundantCopies(redundancy);
+ attr.setPartitionAttributes(paf.create());
+
+ // skip parent region creation
+ // partitionedregion = cache.createRegion(regionName, attr.create());
+
+ // child region
+ attr.setValueConstraint(constraint);
+ paf.setColocatedWith(regionName);
+ attr.setPartitionAttributes(paf.create());
+ childRegion = cache.createRegion(childRegionName, attr.create());
+ }
+ };
+
+ return (CacheSerializableRunnable)createPrRegion;
+ }
+
public CacheSerializableRunnable getCacheSerializableRunnableForPRCreateLimitedBuckets(
final String regionName, final int redundancy, final int buckets) {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/11ef3ebb/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentColocatedPartitionedRegionDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentColocatedPartitionedRegionDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentColocatedPartitionedRegionDUnitTest.java
index 0a25228..c15d545 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentColocatedPartitionedRegionDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentColocatedPartitionedRegionDUnitTest.java
@@ -50,7 +50,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import com.jayway.awaitility.core.ConditionTimeoutException;
-import org.junit.experimental.categories.Category;
import org.apache.geode.admin.internal.AdminDistributedSystemImpl;
import org.apache.geode.cache.AttributesFactory;
@@ -64,6 +63,7 @@ import org.apache.geode.cache.Region;
import org.apache.geode.cache.control.RebalanceOperation;
import org.apache.geode.cache.control.RebalanceResults;
import org.apache.geode.cache.persistence.PartitionOfflineException;
+import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.DistributionMessageObserver;
@@ -72,11 +72,14 @@ import org.apache.geode.internal.FileUtil;
import org.apache.geode.internal.cache.ColocationLogger;
import org.apache.geode.internal.cache.InitialImageOperation.RequestImageMessage;
import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
import org.apache.geode.internal.cache.control.InternalResourceManager;
import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceObserver;
import org.apache.geode.test.dunit.Assert;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.IgnoredException;
+import org.apache.geode.test.dunit.LogWriterUtils;
+import org.apache.geode.test.dunit.RMIException;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.SerializableCallable;
import org.apache.geode.test.dunit.SerializableRunnable;
@@ -2088,7 +2091,7 @@ public class PersistentColocatedPartitionedRegionDUnitTest extends PersistentPar
};
//runnable to create PRs
- SerializableRunnable createPRs = new SerializableRunnable("region1") {
+ SerializableRunnable createPRs = new SerializableRunnable("createPRs") {
public void run() {
Cache cache = getCache();
@@ -2112,7 +2115,7 @@ public class PersistentColocatedPartitionedRegionDUnitTest extends PersistentPar
};
//runnable to close the cache.
- SerializableRunnable closeCache = new SerializableRunnable("region1") {
+ SerializableRunnable closeCache = new SerializableRunnable("closeCache") {
public void run() {
closeCache();
}
@@ -2120,7 +2123,7 @@ public class PersistentColocatedPartitionedRegionDUnitTest extends PersistentPar
//Runnable to do a bunch of puts handle exceptions
//due to the fact that member is offline.
- SerializableRunnable doABunchOfPuts = new SerializableRunnable("region1") {
+ SerializableRunnable doABunchOfPuts = new SerializableRunnable("doABunchOfPuts") {
public void run() {
Cache cache = getCache();
Region region = cache.getRegion(PR_REGION_NAME);
@@ -2200,7 +2203,7 @@ public class PersistentColocatedPartitionedRegionDUnitTest extends PersistentPar
@Category(FlakyTest.class) // GEODE-506: time sensitive, async actions with 30 sec max
@Test
public void testRebalanceWithOfflineChildRegion() throws Throwable {
- SerializableRunnable createParentPR = new SerializableRunnable() {
+ SerializableRunnable createParentPR = new SerializableRunnable("createParentPR") {
public void run() {
Cache cache = getCache();
@@ -2220,7 +2223,7 @@ public class PersistentColocatedPartitionedRegionDUnitTest extends PersistentPar
}
};
- SerializableRunnable createChildPR = new SerializableRunnable() {
+ SerializableRunnable createChildPR = new SerializableRunnable("createChildPR") {
public void run() {
Cache cache = getCache();
@@ -2325,7 +2328,6 @@ public class PersistentColocatedPartitionedRegionDUnitTest extends PersistentPar
};
vm1.invoke(addHook);
-// vm1.invoke(addHook);
AsyncInvocation async0;
AsyncInvocation async1;
AsyncInvocation async2;
@@ -2335,7 +2337,6 @@ public class PersistentColocatedPartitionedRegionDUnitTest extends PersistentPar
async1 = vm1.invokeAsync(createPRs);
vm1.invoke(waitForHook);
-// vm1.invoke(waitForHook);
//Now create the parent region on vm-2. vm-2 did not
//previous host the child region.
@@ -2347,7 +2348,6 @@ public class PersistentColocatedPartitionedRegionDUnitTest extends PersistentPar
} finally {
vm1.invoke(removeHook);
-// vm1.invoke(removeHook);
}
async0.getResult(MAX_WAIT);
@@ -2473,6 +2473,188 @@ public class PersistentColocatedPartitionedRegionDUnitTest extends PersistentPar
closeCache();
}
+ @Test
+ public void testParentRegionGetWithOfflineChildRegion() throws Throwable {
+
+ SerializableRunnable createParentPR = new SerializableRunnable("createParentPR") {
+ public void run() {
+ String oldRetryTimeout = System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", "10000");
+ try {
+ Cache cache = getCache();
+ DiskStore ds = cache.findDiskStore("disk");
+ if (ds == null) {
+ ds = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create("disk");
+ }
+ AttributesFactory af = new AttributesFactory();
+ PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ paf.setRedundantCopies(0);
+ paf.setRecoveryDelay(0);
+ af.setPartitionAttributes(paf.create());
+ af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
+ af.setDiskStoreName("disk");
+ cache.createRegion(PR_REGION_NAME, af.create());
+ } finally {
+ System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", String.valueOf(PartitionedRegionHelper.DEFAULT_TOTAL_WAIT_RETRY_ITERATION));
+ }
+ }
+ };
+
+ SerializableRunnable createChildPR = new SerializableRunnable("createChildPR") {
+ public void run() throws InterruptedException {
+ String oldRetryTimeout = System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", "10000");
+ try {
+ Cache cache = getCache();
+ AttributesFactory af = new AttributesFactory();
+ PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ paf.setRedundantCopies(0);
+ paf.setRecoveryDelay(0);
+ paf.setColocatedWith(PR_REGION_NAME);
+ af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
+ af.setDiskStoreName("disk");
+ af.setPartitionAttributes(paf.create());
+ // delay child region creations to cause a delay in persistent recovery
+ Thread.sleep(100);
+ cache.createRegion("region2", af.create());
+ } finally {
+ System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", String.valueOf(PartitionedRegionHelper.DEFAULT_TOTAL_WAIT_RETRY_ITERATION));
+ }
+ }
+ };
+
+ boolean caughtException = false;
+ try {
+ // Expect a get() on the un-recovered (due to offline child) parent region to fail
+ regionGetWithOfflineChild(createParentPR, createChildPR, false);
+ } catch (Exception e) {
+ caughtException = true;
+ assertTrue(e instanceof RMIException);
+ assertTrue(e.getCause() instanceof PartitionOfflineException);
+ }
+ if (!caughtException) {
+ fail("Expected TimeoutException from remote");
+ }
+ }
+
+ @Test
+ public void testParentRegionGetWithRecoveryInProgress() throws Throwable {
+ SerializableRunnable createParentPR = new SerializableRunnable("createParentPR") {
+ public void run() {
+ String oldRetryTimeout = System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", "10000");
+ try {
+ Cache cache = getCache();
+ DiskStore ds = cache.findDiskStore("disk");
+ if (ds == null) {
+ ds = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create("disk");
+ }
+ AttributesFactory af = new AttributesFactory();
+ PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ paf.setRedundantCopies(0);
+ paf.setRecoveryDelay(0);
+ af.setPartitionAttributes(paf.create());
+ af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
+ af.setDiskStoreName("disk");
+ cache.createRegion(PR_REGION_NAME, af.create());
+ } finally {
+ System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", String.valueOf(PartitionedRegionHelper.DEFAULT_TOTAL_WAIT_RETRY_ITERATION));
+ System.out.println("oldRetryTimeout = " + oldRetryTimeout); }
+ }
+ };
+
+ SerializableRunnable createChildPR = new SerializableRunnable("createChildPR") {
+ public void run() throws InterruptedException {
+ String oldRetryTimeout = System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", "10000");
+ try {
+ Cache cache = getCache();
+ AttributesFactory af = new AttributesFactory();
+ PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ paf.setRedundantCopies(0);
+ paf.setRecoveryDelay(0);
+ paf.setColocatedWith(PR_REGION_NAME);
+ af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
+ af.setDiskStoreName("disk");
+ af.setPartitionAttributes(paf.create());
+ cache.createRegion("region2", af.create());
+ } finally {
+ System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", String.valueOf(PartitionedRegionHelper.DEFAULT_TOTAL_WAIT_RETRY_ITERATION));
+ }
+ }
+ };
+
+ boolean caughtException = false;
+ try {
+ // Expect a get() on the un-recovered (due to offline child) parent region to fail
+ regionGetWithOfflineChild(createParentPR, createChildPR, false);
+ } catch (Exception e) {
+ caughtException = true;
+ assertTrue(e instanceof RMIException);
+ assertTrue(e.getCause() instanceof PartitionOfflineException);
+ }
+ if (!caughtException) {
+ fail("Expected TimeoutException from remote");
+ }
+ }
+
+ @Test
+ public void testParentRegionPutWithRecoveryInProgress() throws Throwable {
+ SerializableRunnable createParentPR = new SerializableRunnable("createParentPR") {
+ public void run() {
+ String oldRetryTimeout = System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", "10000");
+ System.out.println("oldRetryTimeout = " + oldRetryTimeout);
+ try {
+ Cache cache = getCache();
+ DiskStore ds = cache.findDiskStore("disk");
+ if (ds == null) {
+ ds = cache.createDiskStoreFactory().setDiskDirs(getDiskDirs()).create("disk");
+ }
+ AttributesFactory af = new AttributesFactory();
+ PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ paf.setRedundantCopies(0);
+ paf.setRecoveryDelay(0);
+ af.setPartitionAttributes(paf.create());
+ af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
+ af.setDiskStoreName("disk");
+ cache.createRegion(PR_REGION_NAME, af.create());
+ } finally {
+ System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", String.valueOf(PartitionedRegionHelper.DEFAULT_TOTAL_WAIT_RETRY_ITERATION));
+ }
+ }
+ };
+
+ SerializableRunnable createChildPR = new SerializableRunnable("createChildPR") {
+ public void run() throws InterruptedException {
+ String oldRetryTimeout = System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", "10000");
+ try {
+ Cache cache = getCache();
+ AttributesFactory af = new AttributesFactory();
+ PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ paf.setRedundantCopies(0);
+ paf.setRecoveryDelay(0);
+ paf.setColocatedWith(PR_REGION_NAME);
+ af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
+ af.setDiskStoreName("disk");
+ af.setPartitionAttributes(paf.create());
+ Thread.sleep(1000);
+ cache.createRegion("region2", af.create());
+ } finally {
+ System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "partitionedRegionRetryTimeout", String.valueOf(PartitionedRegionHelper.DEFAULT_TOTAL_WAIT_RETRY_ITERATION));
+ }
+ }
+ };
+
+ boolean caughtException = false;
+ try {
+ // Expect a get() on the un-recovered (due to offline child) parent region to fail
+ regionGetWithOfflineChild(createParentPR, createChildPR, false);
+ } catch (Exception e) {
+ caughtException = true;
+ assertTrue(e instanceof RMIException);
+ assertTrue(e.getCause() instanceof PartitionOfflineException);
+ }
+ if (!caughtException) {
+ fail("Expected TimeoutException from remote");
+ }
+ }
+
/**
* Create three PRs on a VM, named region1, region2, and region3.
* The colocated with attribute describes which region region3
@@ -2523,15 +2705,15 @@ public class PersistentColocatedPartitionedRegionDUnitTest extends PersistentPar
vm1.invoke(createParentPR);
vm0.invoke(createChildPR);
vm1.invoke(createChildPR);
-
+
//Create some buckets.
createData(vm0, 0, NUM_BUCKETS, "a");
createData(vm0, 0, NUM_BUCKETS, "a", "region2");
-
+
//Close the members
closeCache(vm1);
closeCache(vm0);
-
+
//Recreate the parent region. Try to make sure that
//the member with the latest copy of the buckets
//is the one that decides to throw away it's copy
@@ -2540,18 +2722,17 @@ public class PersistentColocatedPartitionedRegionDUnitTest extends PersistentPar
AsyncInvocation async1 = vm1.invokeAsync(createParentPR);
async0.getResult(MAX_WAIT);
async1.getResult(MAX_WAIT);
-
//Now create the parent region on vm-2. vm-2 did not
//previous host the child region.
vm2.invoke(createParentPR);
-
+
//Rebalance the parent region.
//This should not move any buckets, because
//we haven't recovered the child region
RebalanceResults rebalanceResults = rebalance(vm2);
assertEquals(0, rebalanceResults.getTotalBucketTransfersCompleted());
-
+
//Recreate the child region.
async1 = vm1.invokeAsync(createChildPR);
async0 = vm0.invokeAsync(createChildPR);
@@ -2568,6 +2749,206 @@ public class PersistentColocatedPartitionedRegionDUnitTest extends PersistentPar
createData(vm0, 0, NUM_BUCKETS, "c", "region2");
}
+ /**
+ * Create a colocated pair of persistent regions and populate them with data. Shut down the servers and then
+ * restart them and check the data.
+ * <p>
+ * On the restart, try region operations ({@code get()}) on the parent region before or during persistent recovery.
+ * The {@code concurrentCheckData} argument determines whether the operation from the parent region occurs before
+ * or concurrent with the child region creation and recovery.
+ *
+ * @param createParentPR {@link SerializableRunnable} for creating the parent region on one member
+ * @param createChildPR {@link SerializableRunnable} for creating the child region on one member
+ * @param concurrentCheckData
+ * @throws Throwable
+ */
+ public void regionGetWithOfflineChild(
+ SerializableRunnable createParentPR,
+ SerializableRunnable createChildPR,
+ boolean concurrentCheckData) throws Throwable {
+ Host host = Host.getHost(0);
+ final VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = host.getVM(2);
+
+ //Create the PRs on two members
+ vm0.invoke(createParentPR);
+ vm1.invoke(createParentPR);
+ vm0.invoke(createChildPR);
+ vm1.invoke(createChildPR);
+
+ //Create some buckets.
+ createData(vm0, 0, NUM_BUCKETS, "a");
+ createData(vm0, 0, NUM_BUCKETS, "a", "region2");
+
+ //Close the members
+ closeCache(vm1);
+ closeCache(vm0);
+
+ SerializableRunnable checkDataOnParent = (new SerializableRunnable("checkDataOnParent") {
+ @Override
+ public void run() {
+ Cache cache = getCache();
+ Region region = cache.getRegion(PR_REGION_NAME);
+
+ for (int i = 0; i < NUM_BUCKETS; i++) {
+ assertEquals("For key " + i, "a", region.get(i));
+ }
+ }
+ });
+
+ try {
+ //Recreate the parent region. Try to make sure that
+ //the member with the latest copy of the buckets
+ //is the one that decides to throw away it's copy
+ //by starting it last.
+ AsyncInvocation async0 = vm0.invokeAsync(createParentPR);
+ AsyncInvocation async1 = vm1.invokeAsync(createParentPR);
+ async0.getResult(MAX_WAIT);
+ async1.getResult(MAX_WAIT);
+ //Now create the parent region on vm-2. vm-2 did not
+ //previously host the child region.
+ vm2.invoke(createParentPR);
+
+ AsyncInvocation async2 = null;
+ AsyncInvocation asyncCheck = null;
+ if (concurrentCheckData) {
+ //Recreate the child region.
+ async1 = vm1.invokeAsync(createChildPR);
+ async0 = vm0.invokeAsync(createChildPR);
+ async2 = vm2.invokeAsync(new SerializableRunnable("delay") {
+ @Override
+ public void run() throws InterruptedException {
+ Thread.sleep(100);
+ vm2.invoke(createChildPR);
+ }
+ });
+
+ asyncCheck = vm0.invokeAsync(checkDataOnParent);
+ } else {
+ vm0.invoke(checkDataOnParent);
+ }
+ async0.getResult(MAX_WAIT);
+ async1.getResult(MAX_WAIT);
+ async2.getResult(MAX_WAIT);
+ asyncCheck.getResult(MAX_WAIT);
+ //Validate the data
+ checkData(vm0, 0, NUM_BUCKETS, "a");
+ checkData(vm0, 0, NUM_BUCKETS, "a", "region2");
+ //Make sure we can actually use the buckets in the child region.
+ createData(vm0, 0, NUM_BUCKETS, "c", "region2");
+ } finally {
+ //Close the members
+ closeCache(vm1);
+ closeCache(vm0);
+ closeCache(vm2);
+ }
+ }
+ /**
+ * Create a colocated pair of persistent regions and populate them with data. Shut down the servers and then
+ * restart them.
+ * <p>
+ * On the restart, try region operations ({@code put()}) on the parent region before or during persistent recovery.
+ * The {@code concurrentCreatekData} argument determines whether the operation from the parent region occurs before
+ * or concurrent with the child region creation and recovery.
+ *
+ * @param createParentPR {@link SerializableRunnable} for creating the parent region on one member
+ * @param createChildPR {@link SerializableRunnable} for creating the child region on one member
+ * @param concurrentCreateData
+ * @throws Throwable
+ */
+ public void regionPutWithOfflineChild(
+ SerializableRunnable createParentPR,
+ SerializableRunnable createChildPR,
+ boolean concurrentCreateData) throws Throwable {
+ Host host = Host.getHost(0);
+ final VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = host.getVM(2);
+
+ SerializableRunnable checkDataOnParent = (new SerializableRunnable("checkDataOnParent") {
+ @Override
+ public void run() {
+ Cache cache = getCache();
+ Region region = cache.getRegion(PR_REGION_NAME);
+
+ for (int i = 0; i < NUM_BUCKETS; i++) {
+ assertEquals("For key " + i, "a", region.get(i));
+ }
+ }
+ });
+
+ SerializableRunnable createDataOnParent = new SerializableRunnable("createDataOnParent") {
+
+ public void run() {
+ Cache cache = getCache();
+ LogWriterUtils.getLogWriter().info("creating data in " + PR_REGION_NAME);
+ Region region = cache.getRegion(PR_REGION_NAME);
+
+ for (int i = 0; i < NUM_BUCKETS; i++) {
+ region.put(i, "c");
+ assertEquals("For key " + i, "c", region.get(i));
+ }
+ }
+ };
+
+ //Create the PRs on two members
+ vm0.invoke(createParentPR);
+ vm1.invoke(createParentPR);
+ vm0.invoke(createChildPR);
+ vm1.invoke(createChildPR);
+
+ //Create some buckets.
+ createData(vm0, 0, NUM_BUCKETS, "a");
+ createData(vm0, 0, NUM_BUCKETS, "a", "region2");
+
+ //Close the members
+ closeCache(vm1);
+ closeCache(vm0);
+
+ try {
+ //Recreate the parent region. Try to make sure that
+ //the member with the latest copy of the buckets
+ //is the one that decides to throw away it's copy
+ //by starting it last.
+ AsyncInvocation async0 = vm0.invokeAsync(createParentPR);
+ AsyncInvocation async1 = vm1.invokeAsync(createParentPR);
+ async0.getResult(MAX_WAIT);
+ async1.getResult(MAX_WAIT);
+ //Now create the parent region on vm-2. vm-2 did not
+ //previous host the child region.
+ vm2.invoke(createParentPR);
+
+ AsyncInvocation async2 = null;
+ AsyncInvocation asyncPut = null;
+ if (concurrentCreateData) {
+ //Recreate the child region.
+ async1 = vm1.invokeAsync(createChildPR);
+ async0 = vm0.invokeAsync(createChildPR);
+ async2 = vm2.invokeAsync(createChildPR);
+
+ Thread.sleep(100);
+ asyncPut = vm0.invokeAsync(createDataOnParent);
+ } else {
+ vm0.invoke(createDataOnParent);
+ }
+ async0.getResult(MAX_WAIT);
+ async1.getResult(MAX_WAIT);
+ async2.getResult(MAX_WAIT);
+ asyncPut.getResult(MAX_WAIT);
+ //Validate the data
+ checkData(vm0, 0, NUM_BUCKETS, "c");
+ checkData(vm0, 0, NUM_BUCKETS, "a", "region2");
+ //Make sure we can actually use the buckets in the child region.
+ createData(vm0, 0, NUM_BUCKETS, "c", "region2");
+ } finally {
+ //Close the members
+ closeCache(vm1);
+ closeCache(vm0);
+ closeCache(vm2);
+ }
+ }
+
private RebalanceResults rebalance(VM vm) {
return (RebalanceResults) vm.invoke(new SerializableCallable() {