You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2019/06/11 00:31:56 UTC

[geode] branch feature/GEODE-6853 created (now 96e6146)

This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a change to branch feature/GEODE-6853
in repository https://gitbox.apache.org/repos/asf/geode.git.


      at 96e6146  GEODE-6853: Do not request region sync during message processing.

This branch includes the following new commits:

     new 96e6146  GEODE-6853: Do not request region sync during message processing.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[geode] 01/01: GEODE-6853: Do not request region sync during message processing.

Posted by es...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch feature/GEODE-6853
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 96e614641606959df40b0e48d975660608fbed0e
Author: eshu <es...@pivotal.io>
AuthorDate: Mon Jun 10 17:27:38 2019 -0700

    GEODE-6853: Do not request region sync during message processing.
    
     * Move schedule region synchronize message to DistriubtedRegion.
     * Use the time task to request region sync with no deplay when processing reion sync message.
---
 .../distributed/internal/DistributionAdvisor.java  | 45 ++++--------------
 .../geode/internal/cache/DistributedRegion.java    | 50 +++++++++++++++++++-
 .../internal/cache/InitialImageOperation.java      |  4 +-
 .../internal/DistributionAdvisorTest.java          | 55 ++++++++++++++++++++++
 .../internal/cache/DistributedRegionTest.java      | 36 ++++++++++++++
 .../internal/cache/InitialImageOperationTest.java  |  7 +--
 6 files changed, 154 insertions(+), 43 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
index 78d33a4..7093360 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
@@ -41,7 +41,6 @@ import org.apache.geode.distributed.internal.membership.InternalDistributedMembe
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.DataSerializableFixedID;
 import org.apache.geode.internal.InternalDataSerializer;
-import org.apache.geode.internal.SystemTimer;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile;
 import org.apache.geode.internal.cache.DistributedRegion;
@@ -277,50 +276,22 @@ public class DistributionAdvisor {
     // interval. This allows client caches to retry an operation that might otherwise be recovered
     // through the sync operation. Without associated event information this could cause the
     // retried operation to be mishandled. See GEODE-5505
-    final long delay = dr.getGemFireCache().getCacheServers().stream()
-        .mapToLong(CacheServer::getMaximumTimeBetweenPings).max().orElse(0L);
-    dr.getGemFireCache().getCCPTimer().schedule(new SystemTimer.SystemTimerTask() {
-      @Override
-      public void run2() {
-        while (!dr.isInitialized()) {
-          if (dr.isDestroyed()) {
-            return;
-          } else {
-            try {
-              if (isDebugEnabled) {
-                logger.debug(
-                    "da.syncForCrashedMember waiting for region to finish initializing: {}", dr);
-              }
-              Thread.sleep(100);
-            } catch (InterruptedException e) {
-              return;
-            }
-          }
-        }
-        if (dr.getDataPolicy().withPersistence() && persistentId == null) {
-          // Fix for 46704. The lost member may be a replicate
-          // or an empty accessor. We don't need to do a synchronization
-          // in that case, because those members send their writes to
-          // a persistent member.
-          if (isDebugEnabled) {
-            logger.debug(
-                "da.syncForCrashedMember skipping sync because crashed member is not persistent: {}",
-                id);
-          }
-          return;
-        }
-        dr.synchronizeForLostMember(id, lostVersionID);
-      }
-    }, delay);
+    final long delay = getDelay(dr);
+    dr.scheduleSynchronizeForLostMember(id, lostVersionID, delay);
     if (dr.getConcurrencyChecksEnabled()) {
       dr.setRegionSynchronizeScheduled(lostVersionID);
     }
   }
 
-  private PersistentMemberID getPersistentID(CacheProfile cp) {
+  PersistentMemberID getPersistentID(CacheProfile cp) {
     return cp.persistentID;
   }
 
+  long getDelay(DistributedRegion dr) {
+    return dr.getGemFireCache().getCacheServers().stream()
+        .mapToLong(CacheServer::getMaximumTimeBetweenPings).max().orElse(0L);
+  }
+
   /** find the region for a delta-gii operation (synch) */
   public DistributedRegion getRegionForDeltaGII() {
     if (advisee instanceof DistributedRegion) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index 2898438..a8022b6 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -87,6 +87,7 @@ import org.apache.geode.distributed.internal.locks.DLockRemoteToken;
 import org.apache.geode.distributed.internal.locks.DLockService;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.Assert;
+import org.apache.geode.internal.SystemTimer;
 import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile;
 import org.apache.geode.internal.cache.InitialImageOperation.GIIStatus;
 import org.apache.geode.internal.cache.RegionMap.ARMLockTestHook;
@@ -1273,11 +1274,40 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
     }
   }
 
+  public void scheduleSynchronizeForLostMember(InternalDistributedMember member,
+      VersionSource lostVersionID, long delay) {
+    getGemFireCache().getCCPTimer().schedule(new SystemTimer.SystemTimerTask() {
+      @Override
+      public void run2() {
+        performSynchronizeForLostMemberTask(member, lostVersionID);
+      }
+    }, delay);
+  }
+
+  void performSynchronizeForLostMemberTask(InternalDistributedMember member,
+      VersionSource lostVersionID) {
+    waitUntilInitialized();
+
+    if (getDataPolicy().withPersistence() && getPersistentID() == null) {
+      // Fix for 46704. The lost member may be a replicate
+      // or an empty accessor. We don't need to do a synchronization
+      // in that case, because those members send their writes to
+      // a persistent member.
+      if (logger.isDebugEnabled()) {
+        logger.debug(
+            "da.syncForCrashedMember skipping sync because crashed member is not persistent: {}",
+            member);
+      }
+      return;
+    }
+    synchronizeForLostMember(member, lostVersionID);
+  }
+
   /**
    * If this region has concurrency controls enabled this will pull any missing changes from other
    * replicates using InitialImageOperation and a filtered chunking protocol.
    */
-  public void synchronizeForLostMember(InternalDistributedMember lostMember,
+  void synchronizeForLostMember(InternalDistributedMember lostMember,
       VersionSource lostVersionID) {
     if (!getConcurrencyChecksEnabled()) {
       return;
@@ -1328,6 +1358,24 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
     return false;
   }
 
+  void waitUntilInitialized() {
+    while (!isInitialized()) {
+      if (isDestroyed()) {
+        return;
+      } else {
+        try {
+          if (logger.isDebugEnabled()) {
+            logger.debug(
+                "da.syncForCrashedMember waiting for region to finish initializing: {}", this);
+          }
+          Thread.sleep(100);
+        } catch (InterruptedException e) {
+          return;
+        }
+      }
+    }
+  }
+
   /** remove any partial entries received in a failed GII */
   void cleanUpAfterFailedGII(boolean recoverFromDisk) {
     DiskRegion dskRgn = getDiskRegion();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java
index 072f97c..d0811c3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java
@@ -1918,9 +1918,9 @@ public class InitialImageOperation {
         InternalDistributedMember lostMember, VersionSource lostVersionSource) {
       if (region.setRegionSynchronizedWithIfNotScheduled(lostVersionSource)) {
         // if region synchronization has not been scheduled or performed,
-        // we do synchronization with others right away as we received the synchronization request
+        // we do synchronization with no delay as we received the synchronization request
         // indicating timed task has been triggered on other nodes
-        region.synchronizeForLostMember(lostMember, lostVersionSource);
+        region.scheduleSynchronizeForLostMember(lostMember, lostVersionSource, 0);
       }
     }
 
diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionAdvisorTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionAdvisorTest.java
index e92459c..a541338 100644
--- a/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionAdvisorTest.java
+++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionAdvisorTest.java
@@ -14,14 +14,40 @@
  */
 package org.apache.geode.distributed.internal;
 
+import static org.mockito.Mockito.doCallRealMethod;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
+import org.junit.Before;
 import org.junit.Test;
 
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.CacheDistributionAdvisor;
+import org.apache.geode.internal.cache.DistributedRegion;
+import org.apache.geode.internal.cache.persistence.PersistentMemberID;
+import org.apache.geode.internal.cache.versions.VersionSource;
+
 
 public class DistributionAdvisorTest {
+  private DistributionAdvisor distributionAdvisor;
+  private InternalDistributedMember member;
+  private DistributedRegion distributedRegion;
+  private DistributionAdvisor.Profile profile;
+  private VersionSource lostVersionID;
+  private PersistentMemberID persistentMemberID;
+  private long delay = 100;
+
+  @Before
+  public void setup() {
+    distributionAdvisor = mock(DistributionAdvisor.class);
+    member = mock(InternalDistributedMember.class);
+    distributedRegion = mock(DistributedRegion.class);
+    profile = mock(CacheDistributionAdvisor.CacheProfile.class);
+    lostVersionID = mock(VersionSource.class);
+    persistentMemberID = mock(PersistentMemberID.class);
+  }
 
   @Test
   public void shouldBeMockable() throws Exception {
@@ -29,4 +55,33 @@ public class DistributionAdvisorTest {
     mockDistributionAdvisor.initialize();
     verify(mockDistributionAdvisor, times(1)).initialize();
   }
+
+  @Test
+  public void regionSyncScheduledForLostMember() {
+    when(distributionAdvisor.getRegionForDeltaGII()).thenReturn(distributedRegion);
+    when(distributionAdvisor.getDelay(distributedRegion)).thenReturn(delay);
+    when(distributedRegion.getConcurrencyChecksEnabled()).thenReturn(true);
+    doCallRealMethod().when(distributionAdvisor).syncForCrashedMember(member, profile);
+
+    distributionAdvisor.syncForCrashedMember(member, profile);
+
+    verify(distributedRegion).scheduleSynchronizeForLostMember(member, member, delay);
+    verify(distributedRegion).setRegionSynchronizeScheduled(member);
+  }
+
+  @Test
+  public void regionSyncScheduledForLostPersistentMember() {
+    when(distributionAdvisor.getRegionForDeltaGII()).thenReturn(distributedRegion);
+    when(distributionAdvisor.getPersistentID((CacheDistributionAdvisor.CacheProfile) profile))
+        .thenReturn(persistentMemberID);
+    when(persistentMemberID.getVersionMember()).thenReturn(lostVersionID);
+    when(distributionAdvisor.getDelay(distributedRegion)).thenReturn(delay);
+    when(distributedRegion.getConcurrencyChecksEnabled()).thenReturn(true);
+    doCallRealMethod().when(distributionAdvisor).syncForCrashedMember(member, profile);
+
+    distributionAdvisor.syncForCrashedMember(member, profile);
+
+    verify(distributedRegion).scheduleSynchronizeForLostMember(member, lostVersionID, delay);
+    verify(distributedRegion).setRegionSynchronizeScheduled(lostVersionID);
+  }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionTest.java
index 143072f..23640a4 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionTest.java
@@ -19,6 +19,7 @@ import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
@@ -26,7 +27,10 @@ import static org.mockito.Mockito.when;
 
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.InOrder;
 
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.versions.RegionVersionHolder;
 import org.apache.geode.internal.cache.versions.RegionVersionVector;
 import org.apache.geode.internal.cache.versions.VersionSource;
@@ -36,12 +40,14 @@ public class DistributedRegionTest {
   private RegionVersionVector vector;
   private RegionVersionHolder holder;
   private VersionSource lostMemberVersionID;
+  private InternalDistributedMember member;
 
   @Before
   public void setup() {
     vector = mock(RegionVersionVector.class);
     holder = mock(RegionVersionHolder.class);
     lostMemberVersionID = mock(VersionSource.class);
+    member = mock(InternalDistributedMember.class);
   }
 
   @Test
@@ -146,4 +152,34 @@ public class DistributedRegionTest {
 
     verify(holder, never()).setRegionSynchronizeScheduledOrDoneIfNot();
   }
+
+  @Test
+  public void regionSyncInvokedInPerformSynchronizeForLostMemberTaskAfterRegionInitialized() {
+    DistributedRegion distributedRegion = mock(DistributedRegion.class);
+    when(distributedRegion.getDataPolicy()).thenReturn(mock(DataPolicy.class));
+    doCallRealMethod().when(distributedRegion).performSynchronizeForLostMemberTask(member,
+        lostMemberVersionID);
+    InOrder inOrder = inOrder(distributedRegion);
+
+    distributedRegion.performSynchronizeForLostMemberTask(member, lostMemberVersionID);
+
+    inOrder.verify(distributedRegion).waitUntilInitialized();
+    inOrder.verify(distributedRegion).synchronizeForLostMember(member, lostMemberVersionID);
+  }
+
+  @Test
+  public void emptyAccessorOfPersistentRegionDoesNotSynchronizeForLostMember() {
+    DataPolicy dataPolicy = mock(DataPolicy.class);
+    DistributedRegion distributedRegion = mock(DistributedRegion.class);
+    when(distributedRegion.getDataPolicy()).thenReturn(dataPolicy);
+    when(dataPolicy.withPersistence()).thenReturn(true);
+    when(distributedRegion.getPersistentID()).thenReturn(null);
+    doCallRealMethod().when(distributedRegion).performSynchronizeForLostMemberTask(member,
+        lostMemberVersionID);
+
+    distributedRegion.performSynchronizeForLostMemberTask(member, lostMemberVersionID);
+
+    verify(distributedRegion).waitUntilInitialized();
+    verify(distributedRegion, never()).synchronizeForLostMember(member, lostMemberVersionID);
+  }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/InitialImageOperationTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/InitialImageOperationTest.java
index 26663a5..61bed57 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/InitialImageOperationTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/InitialImageOperationTest.java
@@ -77,12 +77,12 @@ public class InitialImageOperationTest {
   }
 
   @Test
-  public void synchronizeForLostMemberIsInvokedIfRegionHasNotScheduledOrDoneSynchronization() {
+  public void scheduleSynchronizeForLostMemberIsInvokedIfRegionHasNotScheduledOrDoneSynchronization() {
     when(distributedRegion.setRegionSynchronizedWithIfNotScheduled(versionSource)).thenReturn(true);
 
     message.synchronizeIfNotScheduled(distributedRegion, lostMember, versionSource);
 
-    verify(distributedRegion).synchronizeForLostMember(lostMember, versionSource);
+    verify(distributedRegion).scheduleSynchronizeForLostMember(lostMember, versionSource, 0);
   }
 
   @Test
@@ -92,6 +92,7 @@ public class InitialImageOperationTest {
 
     message.synchronizeIfNotScheduled(distributedRegion, lostMember, versionSource);
 
-    verify(distributedRegion, never()).synchronizeForLostMember(lostMember, versionSource);
+    verify(distributedRegion, never()).scheduleSynchronizeForLostMember(lostMember, versionSource,
+        0);
   }
 }