You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by la...@apache.org on 2018/04/30 22:27:11 UTC
[geode] 01/01: GEODE-4987: Add rebalancing tests with colocated PRs
and AEQ
This is an automated email from the ASF dual-hosted git repository.
ladyvader pushed a commit to branch feature/GEODE-4987
in repository https://gitbox.apache.org/repos/asf/geode.git
commit bee5ee0b8fe30d603a73e963cd112c62d701d1e5
Author: Lynn Hughes-Godfrey <lh...@pivotal.io>
AuthorDate: Mon Apr 2 16:00:18 2018 -0700
GEODE-4987: Add rebalancing tests with colocated PRs and AEQ
* Added new tests for colocated PRs and AEQ
---
.../cache/control/RebalanceOperationDUnitTest.java | 240 ++++++++++++++++++++-
1 file changed, 239 insertions(+), 1 deletion(-)
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/control/RebalanceOperationDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/control/RebalanceOperationDUnitTest.java
index 0cdc4e6..5efe322 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/control/RebalanceOperationDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/control/RebalanceOperationDUnitTest.java
@@ -31,6 +31,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -42,6 +43,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.stream.IntStream;
import org.junit.Ignore;
import org.junit.Test;
@@ -62,6 +64,7 @@ import org.apache.geode.cache.LoaderHelper;
import org.apache.geode.cache.PartitionAttributes;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.asyncqueue.AsyncEvent;
import org.apache.geode.cache.asyncqueue.AsyncEventListener;
import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
@@ -104,7 +107,10 @@ import org.apache.geode.test.junit.categories.DistributedTest;
@Category(DistributedTest.class)
public class RebalanceOperationDUnitTest extends JUnit4CacheTestCase {
- private static final long MAX_WAIT = 60;
+ private static final long MAX_WAIT = 60000;
+ private static final String parentRegion = "parent-region";
+ private static final String keyBase = "Object_";
+ private static final String colocatedRegionBase = "colocated-region-";
@Override
public final void postTearDownCacheTestCase() throws Exception {
@@ -118,6 +124,238 @@ public class RebalanceOperationDUnitTest extends JUnit4CacheTestCase {
System.clearProperty(DistributionConfig.GEMFIRE_PREFIX + "resource.manager.threads");
}
+ protected void putEntryInEachBucket(String regionName, int numBuckets) {
+ final Cache cache = getCache();
+ Region<Object, Object> region = cache.getRegion(regionName);
+ assertNotNull(cache.getRegion(regionName));
+ System.out.println("VADER: executing " + numBuckets + " ops on " + regionName);
+ IntStream.range(0, numBuckets).forEach(i -> region.put(keyBase + i, "value_" + i));
+ System.out.println("VADER: executed " + numBuckets + " ops on " + regionName);
+ }
+
+ protected SerializableRunnable getCreatePRRunnable(String regionName) {
+ SerializableRunnable runnable = new SerializableRunnable("createPR") {
+ public void run() {
+ createPR(parentRegion);
+ assertNotNull(getCache().getRegion(parentRegion) != null);
+ }
+ };
+ return runnable;
+ }
+
+ protected SerializableRunnable getConcOpsRunnable(String regionName, int numBuckets) {
+ SerializableRunnable runnable = new SerializableRunnable("doConcOps") {
+ public void run() {
+ final Cache cache = getCache();
+ Region<Object, Object> region = cache.getRegion(regionName);
+ System.out.println("VADER: In concOps on " + regionName);
+ IntStream.range(0, numBuckets).forEach(i -> region.put(keyBase + i, "value_" + i));
+ System.out.println("VADER: Done with " + numBuckets + " ops on " + regionName);
+ }
+ };
+ return runnable;
+ }
+
+ protected SerializableRunnable rebalance = new SerializableRunnable("rebalance") {
+ public void run() {
+ Cache cache = getCache();
+ ResourceManager manager = cache.getResourceManager();
+ System.out.println("VADER: starting rebalance");
+ RebalanceResults results = doRebalance(false, manager);
+ Set<PartitionRebalanceInfo> prInfoSet = results.getPartitionRebalanceDetails();
+ StringBuffer aStr;
+ for (PartitionRebalanceInfo prInfo : prInfoSet) {
+ aStr = new StringBuffer();
+ aStr.append(prInfo.getRegionPath());
+ aStr.append(" bucketTransfers = " + prInfo.getBucketTransfersCompleted());
+ aStr.append(" primaryTransfers = " + prInfo.getPrimaryTransfersCompleted());
+ aStr.append(" bucketCreates = " + prInfo.getBucketCreatesCompleted());
+ System.out.println("VADER: rebalance info for " + aStr.toString());
+ }
+ aStr = new StringBuffer();
+ aStr.append(" bucketTransfers = " + results.getTotalBucketTransfersCompleted());
+ aStr.append(" primaryTransfers = " + results.getTotalPrimaryTransfersCompleted());
+ aStr.append(" bucketCreates = " + results.getTotalBucketCreatesCompleted());
+ System.out
+ .println("VADER: finished rebalance : overall rebalance results = " + aStr.toString());
+ }
+ };
+
+ protected SerializableRunnable getCreateAEQRunnable(final String parentRegion) {
+ SerializableRunnable runnable = new SerializableRunnable("createAEQ") {
+ @Override
+ public void run() throws Exception {
+ // Create an async event listener that doesn't dispatch anything
+ cache.createAsyncEventQueueFactory().setMaximumQueueMemory(1).setParallel(true)
+ .create("parallelQueue", new AsyncEventListener() {
+ @Override
+ public void close() {}
+
+ @Override
+ public boolean processEvents(List<AsyncEvent> events) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ return false;
+ }
+ return false;
+ }
+ });
+ Region region = cache.getRegion(parentRegion);
+ region.getAttributesMutator().addAsyncEventQueueId("parallelQueue");
+ }
+ };
+ return runnable;
+ }
+
+ protected SerializableRunnable getCreateColocatedRegionRunnable(final String parentRegion,
+ final String regionName, final RegionAttributes ratts) {
+ SerializableRunnable runnable = new SerializableRunnable("createdColocatedRegion") {
+ @Override
+ public void run() throws Exception {
+ Cache cache = getCache();
+ AttributesFactory attr = new AttributesFactory();
+ PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ paf.setRedundantCopies(1);
+ paf.setRecoveryDelay(-1);
+ paf.setStartupRecoveryDelay(-1);
+ paf.setColocatedWith(parentRegion);
+ PartitionAttributes prAttr = paf.create();
+ attr.setPartitionAttributes(prAttr);
+ RegionAttributes ratts = attr.create();
+
+ System.out.println("VADER: creating colocatedRegion " + regionName);
+ Region region = cache.createRegion(regionName, ratts);
+ System.out.println("VADER: created colocatedRegion " + regionName);
+ }
+ };
+ return runnable;
+ }
+
+ @Test
+ public void testRebalanceDuringColocatedPRCreation() throws Exception {
+
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = host.getVM(2);
+ VM vm3 = host.getVM(3);
+
+ // Create the region in 3 of 4 VMs
+ vm0.invoke(getCreatePRRunnable(parentRegion));
+ vm1.invoke(getCreatePRRunnable(parentRegion));
+ vm2.invoke(getCreatePRRunnable(parentRegion));
+
+ // add data (but don't define all buckets)
+ int numBuckets = vm0.invoke(() -> getCache().getRegion(parentRegion).getAttributes()
+ .getPartitionAttributes().getTotalNumBuckets());
+ vm0.invoke(() -> putEntryInEachBucket(parentRegion, numBuckets / 2));
+
+ // give rebalance some work to do by adding another vm
+ vm3.invoke(getCreatePRRunnable(parentRegion));
+
+ // create colocated regions while rebalance is in progress
+ AttributesFactory attr = new AttributesFactory();
+ PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ paf.setRedundantCopies(1);
+ paf.setRecoveryDelay(-1);
+ paf.setStartupRecoveryDelay(-1);
+ paf.setColocatedWith(parentRegion);
+ PartitionAttributes prAttr = paf.create();
+ attr.setPartitionAttributes(prAttr);
+ RegionAttributes ratts = attr.create();
+
+ for (int i = 0; i < 1; i++) {
+
+ AsyncInvocation aiRebalancer = vm0.invokeAsync(rebalance);
+
+ String colocatedRegionName = colocatedRegionBase + i;
+ SerializableRunnable createColocatedRegion =
+ getCreateColocatedRegionRunnable(parentRegion, colocatedRegionName, ratts);
+
+ AsyncInvocation aiConcOps = vm1.invokeAsync(getConcOpsRunnable(parentRegion, numBuckets));
+
+ AsyncInvocation ai1 = vm0.invokeAsync(createColocatedRegion);
+ AsyncInvocation ai2 = vm1.invokeAsync(createColocatedRegion);
+ AsyncInvocation ai3 = vm2.invokeAsync(createColocatedRegion);
+ AsyncInvocation ai4 = vm3.invokeAsync(createColocatedRegion);
+
+ aiConcOps.join();
+ aiConcOps.checkException();
+
+ aiRebalancer.join();
+ aiRebalancer.checkException();
+
+ ai1.join();
+ ai2.join();
+ ai3.join();
+ ai4.join();
+
+ ai1.checkException();
+ ai2.checkException();
+ ai3.checkException();
+ ai4.checkException();
+
+ }
+
+ AsyncInvocation aiRebalancer = vm0.invokeAsync(rebalance);
+ aiRebalancer.join();
+ aiRebalancer.checkException();
+
+ }
+
+ @Test
+ public void testRebalanceDuringAEQCreation() throws Exception {
+
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = host.getVM(2);
+ VM vm3 = host.getVM(3);
+
+ // Create the region in 3 of 4 VMs
+ vm0.invoke(getCreatePRRunnable(parentRegion));
+ vm1.invoke(getCreatePRRunnable(parentRegion));
+ vm2.invoke(getCreatePRRunnable(parentRegion));
+
+ // add data (but don't define all buckets)
+ int numBuckets = vm0.invoke(() -> getCache().getRegion(parentRegion).getAttributes()
+ .getPartitionAttributes().getTotalNumBuckets());
+ vm0.invoke(() -> putEntryInEachBucket(parentRegion, numBuckets / 2));
+
+ // give rebalance some work to do by adding another vm
+ vm3.invoke(getCreatePRRunnable(parentRegion));
+
+ AsyncInvocation aiRebalancer = vm0.invokeAsync(rebalance);
+
+ // AsyncInvocation aiConcOps = vm1.invokeAsync(getConcOpsRunnable(parentRegion, numBuckets));
+
+ AsyncInvocation ai1 = vm0.invokeAsync(getCreateAEQRunnable(parentRegion));
+ AsyncInvocation ai2 = vm1.invokeAsync(getCreateAEQRunnable(parentRegion));
+ AsyncInvocation ai3 = vm2.invokeAsync(getCreateAEQRunnable(parentRegion));
+ AsyncInvocation ai4 = vm3.invokeAsync(getCreateAEQRunnable(parentRegion));
+
+ // aiConcOps.join();
+ // aiConcOps.checkException();
+
+ aiRebalancer.join();
+ aiRebalancer.checkException();
+
+ ai1.join();
+ ai2.join();
+ ai3.join();
+ ai4.join();
+
+ ai1.checkException();
+ ai2.checkException();
+ ai3.checkException();
+ ai4.checkException();
+
+ aiRebalancer = vm0.invokeAsync(rebalance);
+ aiRebalancer.join();
+ aiRebalancer.checkException();
+ }
+
@Test
public void testRecoverRedundancySimulation() {
recoverRedundancy(true);
--
To stop receiving notification emails like this one, please contact
ladyvader@apache.org.