You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by la...@apache.org on 2018/05/02 21:49:26 UTC

[geode] branch feature/GEODE-4987 updated (a8ae434 -> 7729d71)

This is an automated email from the ASF dual-hosted git repository.

ladyvader pushed a change to branch feature/GEODE-4987
in repository https://gitbox.apache.org/repos/asf/geode.git.


 discard a8ae434  GEODE-4987: Add rebalancing tests with colocated PRs and AEQ
     new 7729d71  GEODE-4987: Add rebalancing tests with colocated PRs and AEQ

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (a8ae434)
            \
             N -- N -- N   refs/heads/feature/GEODE-4987 (7729d71)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../geode/internal/cache/control/RebalanceOperationDUnitTest.java | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
ladyvader@apache.org.

[geode] 01/01: GEODE-4987: Add rebalancing tests with colocated PRs and AEQ

Posted by la...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ladyvader pushed a commit to branch feature/GEODE-4987
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 7729d71c98a7749b72d3e751caa2b2fb29949237
Author: Lynn Hughes-Godfrey <lh...@pivotal.io>
AuthorDate: Mon Apr 2 16:00:18 2018 -0700

    GEODE-4987: Add rebalancing tests with colocated PRs and AEQ
    
    * Added new tests for colocated PRs and AEQ
---
 .../cache/control/RebalanceOperationDUnitTest.java | 234 ++++++++++++++++++++-
 1 file changed, 233 insertions(+), 1 deletion(-)

diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/control/RebalanceOperationDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/control/RebalanceOperationDUnitTest.java
index 0cdc4e6..e2591fc 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/control/RebalanceOperationDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/control/RebalanceOperationDUnitTest.java
@@ -18,6 +18,7 @@ import static org.apache.geode.distributed.ConfigurationProperties.ENFORCE_UNIQU
 import static org.apache.geode.distributed.ConfigurationProperties.REDUNDANCY_ZONE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
@@ -42,6 +43,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.IntStream;
 
 import org.junit.Ignore;
 import org.junit.Test;
@@ -62,6 +64,7 @@ import org.apache.geode.cache.LoaderHelper;
 import org.apache.geode.cache.PartitionAttributes;
 import org.apache.geode.cache.PartitionAttributesFactory;
 import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionAttributes;
 import org.apache.geode.cache.asyncqueue.AsyncEvent;
 import org.apache.geode.cache.asyncqueue.AsyncEventListener;
 import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
@@ -104,7 +107,10 @@ import org.apache.geode.test.junit.categories.DistributedTest;
 @Category(DistributedTest.class)
 public class RebalanceOperationDUnitTest extends JUnit4CacheTestCase {
 
-  private static final long MAX_WAIT = 60;
+  private static final long MAX_WAIT = 600;
+  private static final String parentRegion = "parent-region";
+  private static final String keyBase = "Object_";
+  private static final String colocatedRegionBase = "colocated-region-";
 
   @Override
   public final void postTearDownCacheTestCase() throws Exception {
@@ -118,6 +124,232 @@ public class RebalanceOperationDUnitTest extends JUnit4CacheTestCase {
     System.clearProperty(DistributionConfig.GEMFIRE_PREFIX + "resource.manager.threads");
   }
 
+  protected void putEntryInEachBucket(String regionName, int numBuckets) {
+    final Cache cache = getCache();
+    Region<Object, Object> region = cache.getRegion(regionName);
+    assertNotNull(cache.getRegion(regionName));
+    System.out.println("executing " + numBuckets + " ops on " + regionName);
+    IntStream.range(0, numBuckets).forEach(i -> region.put(keyBase + i, "value_" + i));
+    System.out.println("executed " + numBuckets + " ops on " + regionName);
+  }
+
+  protected SerializableRunnable getCreatePRRunnable(String regionName) {
+    SerializableRunnable runnable = new SerializableRunnable("createPR") {
+      public void run() {
+        createPR(parentRegion);
+        assertNotNull(getCache().getRegion(parentRegion) != null);
+      }
+    };
+    return runnable;
+  }
+
+  protected SerializableRunnable getConcOpsRunnable(String regionName, int numBuckets) {
+    SerializableRunnable runnable = new SerializableRunnable("doConcOps") {
+      public void run() {
+        final Cache cache = getCache();
+        Region<Object, Object> region = cache.getRegion(regionName);
+        System.out.println("In concOps on " + regionName);
+        IntStream.range(0, numBuckets).forEach(i -> region.put(keyBase + i, "value_" + i));
+        System.out.println("Done with " + numBuckets + " ops on " + regionName);
+      }
+    };
+    return runnable;
+  }
+
+  protected SerializableRunnable rebalance = new SerializableRunnable("rebalance") {
+    public void run() {
+      Cache cache = getCache();
+      ResourceManager manager = cache.getResourceManager();
+      System.out.println("starting rebalance");
+      RebalanceResults results = doRebalance(false, manager);
+      Set<PartitionRebalanceInfo> prInfoSet = results.getPartitionRebalanceDetails();
+      StringBuffer aStr;
+      for (PartitionRebalanceInfo prInfo : prInfoSet) {
+        aStr = new StringBuffer();
+        aStr.append(prInfo.getRegionPath());
+        aStr.append(" bucketTransfers = " + prInfo.getBucketTransfersCompleted());
+        aStr.append(" primaryTransfers = " + prInfo.getPrimaryTransfersCompleted());
+        aStr.append(" bucketCreates = " + prInfo.getBucketCreatesCompleted());
+        System.out.println("rebalance info for " + aStr.toString());
+      }
+      aStr = new StringBuffer();
+      aStr.append(" bucketTransfers = " + results.getTotalBucketTransfersCompleted());
+      aStr.append(" primaryTransfers = " + results.getTotalPrimaryTransfersCompleted());
+      aStr.append(" bucketCreates = " + results.getTotalBucketCreatesCompleted());
+      System.out.println("finished rebalance : overall rebalance results =  " + aStr.toString());
+    }
+  };
+
+  protected SerializableRunnable getCreateAEQRunnable(final String parentRegion) {
+    SerializableRunnable runnable = new SerializableRunnable("createAEQ") {
+      @Override
+      public void run() throws Exception {
+        // Create an async event listener that doesn't dispatch anything
+        cache.createAsyncEventQueueFactory().setMaximumQueueMemory(1).setParallel(true)
+            .create("parallelQueue", new AsyncEventListener() {
+              @Override
+              public void close() {}
+
+              @Override
+              public boolean processEvents(List<AsyncEvent> events) {
+                try {
+                  Thread.sleep(100);
+                } catch (InterruptedException e) {
+                  return false;
+                }
+                return false;
+              }
+            });
+        Region region = cache.getRegion(parentRegion);
+        region.getAttributesMutator().addAsyncEventQueueId("parallelQueue");
+      }
+    };
+    return runnable;
+  }
+
+  protected SerializableRunnable getCreateColocatedRegionRunnable(final String parentRegion,
+      final String regionName, final RegionAttributes ratts) {
+    SerializableRunnable runnable = new SerializableRunnable("createdColocatedRegion") {
+      @Override
+      public void run() throws Exception {
+        Cache cache = getCache();
+        AttributesFactory attr = new AttributesFactory();
+        PartitionAttributesFactory paf = new PartitionAttributesFactory();
+        paf.setRedundantCopies(1);
+        paf.setRecoveryDelay(-1);
+        paf.setStartupRecoveryDelay(-1);
+        paf.setColocatedWith(parentRegion);
+        PartitionAttributes prAttr = paf.create();
+        attr.setPartitionAttributes(prAttr);
+        RegionAttributes ratts = attr.create();
+
+        System.out.println("creating colocatedRegion " + regionName);
+        Region region = cache.createRegion(regionName, ratts);
+        System.out.println("created colocatedRegion " + regionName);
+      }
+    };
+    return runnable;
+  }
+
+  @Test
+  public void testRebalanceDuringColocatedPRCreation() throws Exception {
+
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+    VM vm3 = host.getVM(3);
+
+    // Create the region in 3 of 4 VMs
+    vm0.invoke(getCreatePRRunnable(parentRegion));
+    vm1.invoke(getCreatePRRunnable(parentRegion));
+    vm2.invoke(getCreatePRRunnable(parentRegion));
+
+    // add data (but don't define all buckets)
+    int numBuckets = vm0.invoke(() -> getCache().getRegion(parentRegion).getAttributes()
+        .getPartitionAttributes().getTotalNumBuckets());
+    vm0.invoke(() -> putEntryInEachBucket(parentRegion, numBuckets / 2));
+
+    // give rebalance some work to do by adding another vm
+    vm3.invoke(getCreatePRRunnable(parentRegion));
+
+    // create colocated regions while rebalance is in progress
+    AttributesFactory attr = new AttributesFactory();
+    PartitionAttributesFactory paf = new PartitionAttributesFactory();
+    paf.setRedundantCopies(1);
+    paf.setRecoveryDelay(-1);
+    paf.setStartupRecoveryDelay(-1);
+    paf.setColocatedWith(parentRegion);
+    PartitionAttributes prAttr = paf.create();
+    attr.setPartitionAttributes(prAttr);
+    RegionAttributes ratts = attr.create();
+
+    for (int i = 0; i < 1; i++) {
+
+      AsyncInvocation aiRebalancer = vm0.invokeAsync(rebalance);
+
+      String colocatedRegionName = colocatedRegionBase + i;
+      SerializableRunnable createColocatedRegion =
+          getCreateColocatedRegionRunnable(parentRegion, colocatedRegionName, ratts);
+
+      AsyncInvocation aiConcOps = vm1.invokeAsync(getConcOpsRunnable(parentRegion, numBuckets));
+
+      AsyncInvocation ai1 = vm0.invokeAsync(createColocatedRegion);
+      AsyncInvocation ai2 = vm1.invokeAsync(createColocatedRegion);
+      AsyncInvocation ai3 = vm2.invokeAsync(createColocatedRegion);
+      AsyncInvocation ai4 = vm3.invokeAsync(createColocatedRegion);
+
+      aiConcOps.join();
+      aiConcOps.checkException();
+
+      aiRebalancer.join();
+      aiRebalancer.checkException();
+
+      ai1.join();
+      ai2.join();
+      ai3.join();
+      ai4.join();
+
+      ai1.checkException();
+      ai2.checkException();
+      ai3.checkException();
+      ai4.checkException();
+
+    }
+
+    AsyncInvocation aiRebalancer = vm0.invokeAsync(rebalance);
+    aiRebalancer.join();
+    aiRebalancer.checkException();
+
+  }
+
+  @Test
+  public void testRebalanceDuringAEQCreation() throws Exception {
+
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+    VM vm3 = host.getVM(3);
+
+    // Create the region in 3 of 4 VMs
+    vm0.invoke(getCreatePRRunnable(parentRegion));
+    vm1.invoke(getCreatePRRunnable(parentRegion));
+    vm2.invoke(getCreatePRRunnable(parentRegion));
+
+    // add data (but don't define all buckets)
+    int numBuckets = vm0.invoke(() -> getCache().getRegion(parentRegion).getAttributes()
+        .getPartitionAttributes().getTotalNumBuckets());
+    vm0.invoke(() -> putEntryInEachBucket(parentRegion, numBuckets / 2));
+
+    // give rebalance some work to do by adding another vm
+    vm3.invoke(getCreatePRRunnable(parentRegion));
+
+    AsyncInvocation aiRebalancer = vm0.invokeAsync(rebalance);
+
+    AsyncInvocation ai1 = vm0.invokeAsync(getCreateAEQRunnable(parentRegion));
+    AsyncInvocation ai2 = vm1.invokeAsync(getCreateAEQRunnable(parentRegion));
+    AsyncInvocation ai3 = vm2.invokeAsync(getCreateAEQRunnable(parentRegion));
+    AsyncInvocation ai4 = vm3.invokeAsync(getCreateAEQRunnable(parentRegion));
+
+    ai1.join();
+    ai2.join();
+    ai3.join();
+    ai4.join();
+
+    ai1.checkException();
+    ai2.checkException();
+    ai3.checkException();
+    ai4.checkException();
+
+    aiRebalancer.join();
+    aiRebalancer.checkException();
+
+    aiRebalancer = vm0.invokeAsync(rebalance);
+    aiRebalancer.join();
+    aiRebalancer.checkException();
+  }
+
   @Test
   public void testRecoverRedundancySimulation() {
     recoverRedundancy(true);

-- 
To stop receiving notification emails like this one, please contact
ladyvader@apache.org.