You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2020/11/17 23:20:07 UTC

[geode] branch revert-5752-feature/GEODE-8714 created (now d729892)

This is an automated email from the ASF dual-hosted git repository.

udo pushed a change to branch revert-5752-feature/GEODE-8714
in repository https://gitbox.apache.org/repos/asf/geode.git.


      at d729892  Revert "GEODE-8714: return event to queue at stoping of gw sender (#5752)"

This branch includes the following new commits:

     new d729892  Revert "GEODE-8714: return event to queue at stoping of gw sender (#5752)"

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[geode] 01/01: Revert "GEODE-8714: return event to queue at stoping of gw sender (#5752)"

Posted by ud...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

udo pushed a commit to branch revert-5752-feature/GEODE-8714
in repository https://gitbox.apache.org/repos/asf/geode.git

commit d7298924e184b729b34759c25c8714bb3f4f69a8
Author: Udo Kohlmeyer <ko...@users.noreply.github.com>
AuthorDate: Wed Nov 18 10:17:38 2020 +1100

    Revert "GEODE-8714: return event to queue at stoping of gw sender (#5752)"
    
    This reverts commit 1eb9f344b3f83871e32a521c57068176230fb04e.
---
 .../wan/parallel/ParallelGatewaySenderQueue.java   |   9 +-
 ...ANPersistenceEnabledGatewaySenderDUnitTest.java | 100 ---------------------
 2 files changed, 6 insertions(+), 103 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 2788bb2..9366d5f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -1731,9 +1731,12 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     for (int i = helpArray.length - 1; i >= 0; i--) {
       GatewaySenderEventImpl event = (GatewaySenderEventImpl) helpArray[i];
       final int bucketId = event.getBucketId();
-      BucketRegionQueue brq = getBucketRegionQueueByBucketId(getRandomShadowPR(), bucketId);
-      if (brq != null) {
-        brq.pushKeyIntoQueue(event.getShadowKey());
+      final PartitionedRegion region = (PartitionedRegion) event.getRegion();
+      if (region.getRegionAdvisor().isPrimaryForBucket(bucketId)) {
+        BucketRegionQueue brq = getBucketRegionQueueByBucketId(getRandomShadowPR(), bucketId);
+        if (brq != null) {
+          brq.pushKeyIntoQueue(event.getShadowKey());
+        }
       }
     }
   }
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
index 9364187..7070ae0 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
@@ -1897,106 +1897,6 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends WANTest
     vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 3000));
   }
 
-  /**
-   * Enable persistence for PR and GatewaySender. Do some puts in local region. Restart 1 server,
-   * then stop gateway sender, and stop server. After that create receiver on remote site.
-   * Check if the remote site receives all the events.
-   */
-  @Test
-  public void testPersistentPRWithGatewaySenderPersistenceEnabled_RestartAndStopServer() {
-    // create locator on local site
-    Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
-    // create locator on remote site
-    Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
-
-    // create receiver on remote site
-    createCacheInVMs(nyPort, vm2, vm3);
-    // createReceiverInVMs(vm2, vm3);
-
-    // create cache in local site
-    createCacheInVMs(lnPort, vm4, vm5);
-    vm4.invoke(() -> setNumDispatcherThreadsForTheRun(2));
-    vm5.invoke(() -> setNumDispatcherThreadsForTheRun(2));
-
-    // create senders with disk store
-    String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
-        true, 100, 10, false, true, null, null, true));
-    String diskStore2 = (String) vm5.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
-        true, 100, 10, false, true, null, null, true));
-
-    LogWriterUtils.getLogWriter()
-        .info("The DS are: " + diskStore1 + "," + diskStore2);
-
-    // create PR on remote site
-    vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
-        13, isOffHeap()));
-    vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), null, 1,
-        13, isOffHeap()));
-
-    // create PR on local site
-    vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
-        13, isOffHeap()));
-    vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
-        13, isOffHeap()));
-
-    // start the senders on local site
-    startSenderInVMs("ln", vm4, vm5);
-
-    // wait for senders to become running
-    vm4.invoke(waitForSenderRunnable());
-    vm5.invoke(waitForSenderRunnable());
-
-    // start puts in region on local site
-    vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 10));
-    LogWriterUtils.getLogWriter().info("Completed puts in the region");
-
-    // --------------------close and rebuild local site
-    // -------------------------------------------------
-    // kill the sender in vm5
-    vm5.invoke(killSenderRunnable());
-
-    LogWriterUtils.getLogWriter().info("Killed vm5 sender.");
-
-    // restart the vm
-    createCacheInVMs(lnPort, vm5);
-    vm5.invoke(() -> setNumDispatcherThreadsForTheRun(2));
-
-    LogWriterUtils.getLogWriter().info("Created back the cache");
-
-    // create senders with disk store
-    vm5.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2, true, 100, 10, false, true,
-        null, diskStore2, false));
-
-    LogWriterUtils.getLogWriter().info("Created the senders back from the disk store.");
-    // create PR on local site
-    vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(getTestMethodName(), "ln", 1,
-        13, isOffHeap()));
-
-
-    LogWriterUtils.getLogWriter().info("Created back the partitioned regions");
-
-    LogWriterUtils.getLogWriter().info("Waiting for senders running.");
-    // wait for senders running
-    vm5.invoke(waitForSenderRunnable());
-
-    LogWriterUtils.getLogWriter().info("All the senders are now running...");
-
-    // ----------------------------------------------------------------------------------------------------
-
-    vm4.invoke(() -> WANTestBase.stopSender("ln"));
-    vm5.invoke(() -> WANTestBase.stopSender("ln"));
-
-    vm5.invoke(killSenderRunnable());
-
-    vm4.invoke(() -> WANTestBase.startSender("ln"));
-
-
-    createReceiverInVMs(vm2, vm3);
-
-    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 10));
-    vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 10));
-  }
-
 
   /**
    * setIgnoreQueue has lots of callers by reflection