You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by la...@apache.org on 2018/05/01 20:55:13 UTC

[geode] branch feature/GEODE-4987 updated (bbe9e3c -> 985c9be)

This is an automated email from the ASF dual-hosted git repository.

ladyvader pushed a change to branch feature/GEODE-4987
in repository https://gitbox.apache.org/repos/asf/geode.git.


 discard bbe9e3c  GEODE-4987: Add rebalancing tests with colocated PRs and AEQ
     new 985c9be  GEODE-4987: Add rebalancing tests with colocated PRs and AEQ

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (bbe9e3c)
            \
             N -- N -- N   refs/heads/feature/GEODE-4987 (985c9be)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../geode/internal/cache/control/RebalanceOperationDUnitTest.java    | 5 -----
 1 file changed, 5 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
ladyvader@apache.org.

[geode] 01/01: GEODE-4987: Add rebalancing tests with colocated PRs and AEQ

Posted by la...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ladyvader pushed a commit to branch feature/GEODE-4987
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 985c9be68a6baf813a02244cad0eb54589e5bab4
Author: Lynn Hughes-Godfrey <lh...@pivotal.io>
AuthorDate: Mon Apr 2 16:00:18 2018 -0700

    GEODE-4987: Add rebalancing tests with colocated PRs and AEQ
    
    * Added new tests for colocated PRs and AEQ
---
 .../cache/control/RebalanceOperationDUnitTest.java | 234 ++++++++++++++++++++-
 1 file changed, 233 insertions(+), 1 deletion(-)

diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/control/RebalanceOperationDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/control/RebalanceOperationDUnitTest.java
index 0cdc4e6..12f2c1c 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/control/RebalanceOperationDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/control/RebalanceOperationDUnitTest.java
@@ -18,6 +18,7 @@ import static org.apache.geode.distributed.ConfigurationProperties.ENFORCE_UNIQU
 import static org.apache.geode.distributed.ConfigurationProperties.REDUNDANCY_ZONE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
@@ -42,6 +43,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.IntStream;
 
 import org.junit.Ignore;
 import org.junit.Test;
@@ -62,6 +64,7 @@ import org.apache.geode.cache.LoaderHelper;
 import org.apache.geode.cache.PartitionAttributes;
 import org.apache.geode.cache.PartitionAttributesFactory;
 import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionAttributes;
 import org.apache.geode.cache.asyncqueue.AsyncEvent;
 import org.apache.geode.cache.asyncqueue.AsyncEventListener;
 import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
@@ -104,7 +107,10 @@ import org.apache.geode.test.junit.categories.DistributedTest;
 @Category(DistributedTest.class)
 public class RebalanceOperationDUnitTest extends JUnit4CacheTestCase {
 
-  private static final long MAX_WAIT = 60;
+  private static final long MAX_WAIT = 180;
+  private static final String parentRegion = "parent-region";
+  private static final String keyBase = "Object_";
+  private static final String colocatedRegionBase = "colocated-region-";
 
   @Override
   public final void postTearDownCacheTestCase() throws Exception {
@@ -118,6 +124,232 @@ public class RebalanceOperationDUnitTest extends JUnit4CacheTestCase {
     System.clearProperty(DistributionConfig.GEMFIRE_PREFIX + "resource.manager.threads");
   }
 
+  protected void putEntryInEachBucket(String regionName, int numBuckets) {
+    final Cache cache = getCache();
+    Region<Object, Object> region = cache.getRegion(regionName);
+    assertNotNull(cache.getRegion(regionName));
+    System.out.println("executing " + numBuckets + " ops on " + regionName);
+    IntStream.range(0, numBuckets).forEach(i -> region.put(keyBase + i, "value_" + i));
+    System.out.println("executed " + numBuckets + " ops on " + regionName);
+  }
+
+  protected SerializableRunnable getCreatePRRunnable(String regionName) {
+    SerializableRunnable runnable = new SerializableRunnable("createPR") {
+      public void run() {
+        createPR(parentRegion);
+        assertNotNull(getCache().getRegion(parentRegion) != null);
+      }
+    };
+    return runnable;
+  }
+
+  protected SerializableRunnable getConcOpsRunnable(String regionName, int numBuckets) {
+    SerializableRunnable runnable = new SerializableRunnable("doConcOps") {
+      public void run() {
+        final Cache cache = getCache();
+        Region<Object, Object> region = cache.getRegion(regionName);
+        System.out.println("In concOps on " + regionName);
+        IntStream.range(0, numBuckets).forEach(i -> region.put(keyBase + i, "value_" + i));
+        System.out.println("Done with " + numBuckets + " ops on " + regionName);
+      }
+    };
+    return runnable;
+  }
+
+  protected SerializableRunnable rebalance = new SerializableRunnable("rebalance") {
+    public void run() {
+      Cache cache = getCache();
+      ResourceManager manager = cache.getResourceManager();
+      System.out.println("starting rebalance");
+      RebalanceResults results = doRebalance(false, manager);
+      Set<PartitionRebalanceInfo> prInfoSet = results.getPartitionRebalanceDetails();
+      StringBuffer aStr;
+      for (PartitionRebalanceInfo prInfo : prInfoSet) {
+        aStr = new StringBuffer();
+        aStr.append(prInfo.getRegionPath());
+        aStr.append(" bucketTransfers = " + prInfo.getBucketTransfersCompleted());
+        aStr.append(" primaryTransfers = " + prInfo.getPrimaryTransfersCompleted());
+        aStr.append(" bucketCreates = " + prInfo.getBucketCreatesCompleted());
+        System.out.println("rebalance info for " + aStr.toString());
+      }
+      aStr = new StringBuffer();
+      aStr.append(" bucketTransfers = " + results.getTotalBucketTransfersCompleted());
+      aStr.append(" primaryTransfers = " + results.getTotalPrimaryTransfersCompleted());
+      aStr.append(" bucketCreates = " + results.getTotalBucketCreatesCompleted());
+      System.out.println("finished rebalance : overall rebalance results =  " + aStr.toString());
+    }
+  };
+
+  protected SerializableRunnable getCreateAEQRunnable(final String parentRegion) {
+    SerializableRunnable runnable = new SerializableRunnable("createAEQ") {
+      @Override
+      public void run() throws Exception {
+        // Create an async event listener that doesn't dispatch anything
+        cache.createAsyncEventQueueFactory().setMaximumQueueMemory(1).setParallel(true)
+            .create("parallelQueue", new AsyncEventListener() {
+              @Override
+              public void close() {}
+
+              @Override
+              public boolean processEvents(List<AsyncEvent> events) {
+                try {
+                  Thread.sleep(100);
+                } catch (InterruptedException e) {
+                  return false;
+                }
+                return false;
+              }
+            });
+        Region region = cache.getRegion(parentRegion);
+        region.getAttributesMutator().addAsyncEventQueueId("parallelQueue");
+      }
+    };
+    return runnable;
+  }
+
+  protected SerializableRunnable getCreateColocatedRegionRunnable(final String parentRegion,
+      final String regionName, final RegionAttributes ratts) {
+    SerializableRunnable runnable = new SerializableRunnable("createdColocatedRegion") {
+      @Override
+      public void run() throws Exception {
+        Cache cache = getCache();
+        AttributesFactory attr = new AttributesFactory();
+        PartitionAttributesFactory paf = new PartitionAttributesFactory();
+        paf.setRedundantCopies(1);
+        paf.setRecoveryDelay(-1);
+        paf.setStartupRecoveryDelay(-1);
+        paf.setColocatedWith(parentRegion);
+        PartitionAttributes prAttr = paf.create();
+        attr.setPartitionAttributes(prAttr);
+        RegionAttributes ratts = attr.create();
+
+        System.out.println("creating colocatedRegion " + regionName);
+        Region region = cache.createRegion(regionName, ratts);
+        System.out.println("created colocatedRegion " + regionName);
+      }
+    };
+    return runnable;
+  }
+
+  @Test
+  public void testRebalanceDuringColocatedPRCreation() throws Exception {
+
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+    VM vm3 = host.getVM(3);
+
+    // Create the region in 3 of 4 VMs
+    vm0.invoke(getCreatePRRunnable(parentRegion));
+    vm1.invoke(getCreatePRRunnable(parentRegion));
+    vm2.invoke(getCreatePRRunnable(parentRegion));
+
+    // add data (but don't define all buckets)
+    int numBuckets = vm0.invoke(() -> getCache().getRegion(parentRegion).getAttributes()
+        .getPartitionAttributes().getTotalNumBuckets());
+    vm0.invoke(() -> putEntryInEachBucket(parentRegion, numBuckets / 2));
+
+    // give rebalance some work to do by adding another vm
+    vm3.invoke(getCreatePRRunnable(parentRegion));
+
+    // create colocated regions while rebalance is in progress
+    AttributesFactory attr = new AttributesFactory();
+    PartitionAttributesFactory paf = new PartitionAttributesFactory();
+    paf.setRedundantCopies(1);
+    paf.setRecoveryDelay(-1);
+    paf.setStartupRecoveryDelay(-1);
+    paf.setColocatedWith(parentRegion);
+    PartitionAttributes prAttr = paf.create();
+    attr.setPartitionAttributes(prAttr);
+    RegionAttributes ratts = attr.create();
+
+    for (int i = 0; i < 1; i++) {
+
+      AsyncInvocation aiRebalancer = vm0.invokeAsync(rebalance);
+
+      String colocatedRegionName = colocatedRegionBase + i;
+      SerializableRunnable createColocatedRegion =
+          getCreateColocatedRegionRunnable(parentRegion, colocatedRegionName, ratts);
+
+      AsyncInvocation aiConcOps = vm1.invokeAsync(getConcOpsRunnable(parentRegion, numBuckets));
+
+      AsyncInvocation ai1 = vm0.invokeAsync(createColocatedRegion);
+      AsyncInvocation ai2 = vm1.invokeAsync(createColocatedRegion);
+      AsyncInvocation ai3 = vm2.invokeAsync(createColocatedRegion);
+      AsyncInvocation ai4 = vm3.invokeAsync(createColocatedRegion);
+
+      aiConcOps.join();
+      aiConcOps.checkException();
+
+      aiRebalancer.join();
+      aiRebalancer.checkException();
+
+      ai1.join();
+      ai2.join();
+      ai3.join();
+      ai4.join();
+
+      ai1.checkException();
+      ai2.checkException();
+      ai3.checkException();
+      ai4.checkException();
+
+    }
+
+    AsyncInvocation aiRebalancer = vm0.invokeAsync(rebalance);
+    aiRebalancer.join();
+    aiRebalancer.checkException();
+
+  }
+
+  @Test
+  public void testRebalanceDuringAEQCreation() throws Exception {
+
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+    VM vm3 = host.getVM(3);
+
+    // Create the region in 3 of 4 VMs
+    vm0.invoke(getCreatePRRunnable(parentRegion));
+    vm1.invoke(getCreatePRRunnable(parentRegion));
+    vm2.invoke(getCreatePRRunnable(parentRegion));
+
+    // add data (but don't define all buckets)
+    int numBuckets = vm0.invoke(() -> getCache().getRegion(parentRegion).getAttributes()
+        .getPartitionAttributes().getTotalNumBuckets());
+    vm0.invoke(() -> putEntryInEachBucket(parentRegion, numBuckets / 2));
+
+    // give rebalance some work to do by adding another vm
+    vm3.invoke(getCreatePRRunnable(parentRegion));
+
+    AsyncInvocation aiRebalancer = vm0.invokeAsync(rebalance);
+
+    AsyncInvocation ai1 = vm0.invokeAsync(getCreateAEQRunnable(parentRegion));
+    AsyncInvocation ai2 = vm1.invokeAsync(getCreateAEQRunnable(parentRegion));
+    AsyncInvocation ai3 = vm2.invokeAsync(getCreateAEQRunnable(parentRegion));
+    AsyncInvocation ai4 = vm3.invokeAsync(getCreateAEQRunnable(parentRegion));
+
+    aiRebalancer.join();
+    aiRebalancer.checkException();
+
+    ai1.join();
+    ai2.join();
+    ai3.join();
+    ai4.join();
+
+    ai1.checkException();
+    ai2.checkException();
+    ai3.checkException();
+    ai4.checkException();
+
+    aiRebalancer = vm0.invokeAsync(rebalance);
+    aiRebalancer.join();
+    aiRebalancer.checkException();
+  }
+
   @Test
   public void testRecoverRedundancySimulation() {
     recoverRedundancy(true);

-- 
To stop receiving notification emails like this one, please contact
ladyvader@apache.org.