You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2019/05/31 21:09:47 UTC

[geode] 01/01: GEODE-6823 Hang in ElderInitProcessor.init(

This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch feature/GEODE-6823
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 49997f3ea91109a8f8e17404f2fac5e7af2c19f3
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Fri May 31 13:55:19 2019 -0700

    GEODE-6823 Hang in ElderInitProcessor.init(
    
    This corrects elder init processing to use the isCloseInProgress to
    check for shutdown.  A coding error during refactoring caused it to check the
    isCloseInProgress() method, which did more than just return the value of
    the isCloseInProgress variable and was incorrectly reporting a close in progress
    during startup operations.
    
    I've renamed the old isCloseInProgress() method to avoid similar coding
    errors in the future and added a new implementation that merely returns
    the value of the field, as you'd expect it to do.
    
    While writing tests I found that the ClusterElderManagerTest was leaving
    blocked threads behind because the waitForElder() method in
    ClusterElderManager was not interruptable.  I've changed that method to
    be interruptable.  We don't interrupt message-processing threads so this
    should be a safe change.
---
 .../internal/ClusterDistributionManager.java       | 27 +++++++-------
 .../distributed/internal/ClusterElderManager.java  | 27 ++++++--------
 .../distributed/internal/DistributionManager.java  |  2 +-
 .../internal/locks/ElderInitProcessor.java         | 38 +++++++++++---------
 .../internal/locks/GrantorRequestProcessor.java    | 17 +++++++--
 .../membership/gms/mgr/GMSMembershipManager.java   |  2 +-
 .../internal/ClusterElderManagerTest.java          | 41 +++++++++++++++++-----
 7 files changed, 94 insertions(+), 60 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
index a8d1c08..22d714b 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
@@ -968,7 +968,7 @@ public class ClusterDistributionManager implements DistributionManager {
       throw err;
     } catch (Throwable t) {
       SystemFailure.checkFailure();
-      if (isCloseInProgress()) {
+      if (shouldInhibitMembershipWarnings()) {
         logger.debug("Caught unusual exception during shutdown: {}", t.getMessage(), t);
       } else {
         logger.warn("Task failed with exception", t);
@@ -1975,18 +1975,18 @@ public class ClusterDistributionManager implements DistributionManager {
     }
   }
 
-  /**
-   * Returns true if this DM or the DistributedSystem owned by it is closing or is closed.
-   */
-  boolean isCloseInProgress() {
-    if (closeInProgress) {
+  private boolean shouldInhibitMembershipWarnings() {
+    if (isCloseInProgress()) {
       return true;
     }
     InternalDistributedSystem ds = getSystem();
     return ds != null && ds.isDisconnecting();
   }
 
-  public boolean isShutdownStarted() {
+  /**
+   * Returns true if this distribution manager has initiated shutdown
+   */
+  public boolean isCloseInProgress() {
     return closeInProgress;
   }
 
@@ -2051,7 +2051,7 @@ public class ClusterDistributionManager implements DistributionManager {
               membershipEventQueue.take();
           handleMemberEvent(ev);
         } catch (InterruptedException e) {
-          if (isCloseInProgress()) {
+          if (shouldInhibitMembershipWarnings()) {
             if (logger.isTraceEnabled()) {
               logger.trace("MemberEventInvoker: InterruptedException during shutdown");
             }
@@ -2062,7 +2062,7 @@ public class ClusterDistributionManager implements DistributionManager {
         } catch (DistributedSystemDisconnectedException e) {
           break;
         } catch (CancelException e) {
-          if (isCloseInProgress()) {
+          if (shouldInhibitMembershipWarnings()) {
             if (logger.isTraceEnabled()) {
               logger.trace("MemberEventInvoker: cancelled");
             }
@@ -2653,7 +2653,7 @@ public class ClusterDistributionManager implements DistributionManager {
         stats.incNodes(-1);
       }
       String msg;
-      if (p_crashed && !isCloseInProgress()) {
+      if (p_crashed && !shouldInhibitMembershipWarnings()) {
         msg =
             "Member at {} unexpectedly left the distributed cache: {}";
         addMemberEvent(new MemberCrashedEvent(theId, p_reason));
@@ -2881,7 +2881,7 @@ public class ClusterDistributionManager implements DistributionManager {
   }
 
   @Override
-  public ElderState getElderState(boolean waitToBecomeElder) {
+  public ElderState getElderState(boolean waitToBecomeElder) throws InterruptedException {
     return clusterElderManager.getElderState(waitToBecomeElder);
   }
 
@@ -2890,7 +2890,8 @@ public class ClusterDistributionManager implements DistributionManager {
    *
    * @return true if newElder is the elder; false if it is no longer a member or we are the elder.
    */
-  public boolean waitForElder(final InternalDistributedMember desiredElder) {
+  public boolean waitForElder(final InternalDistributedMember desiredElder)
+      throws InterruptedException {
 
     return clusterElderManager.waitForElder(desiredElder);
   }
@@ -3476,7 +3477,7 @@ public class ClusterDistributionManager implements DistributionManager {
         try {
           handleEvent(manager, listener);
         } catch (CancelException e) {
-          if (manager.isCloseInProgress()) {
+          if (manager.shouldInhibitMembershipWarnings()) {
             if (logger.isTraceEnabled()) {
               logger.trace("MemberEventInvoker: cancelled");
             }
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterElderManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterElderManager.java
index f18ea45..debe3e1 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterElderManager.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterElderManager.java
@@ -82,7 +82,7 @@ public class ClusterElderManager {
     return clusterDistributionManager.getId().equals(getElderCandidate());
   }
 
-  public ElderState getElderState(boolean waitToBecomeElder) {
+  public ElderState getElderState(boolean waitToBecomeElder) throws InterruptedException {
     if (waitToBecomeElder) {
       // This should always return true.
       waitForElder(clusterDistributionManager.getId());
@@ -121,22 +121,16 @@ public class ClusterElderManager {
    * @return true if desiredElder is the elder; false if it is no longer a member or the local
    *         member is the elder
    */
-  public boolean waitForElder(final InternalDistributedMember desiredElder) {
+  public boolean waitForElder(final InternalDistributedMember desiredElder)
+      throws InterruptedException {
     MembershipChangeListener changeListener =
         new MembershipChangeListener();
 
     clusterDistributionManager.addMembershipListener(changeListener);
 
-    boolean interrupted = false;
     InternalDistributedMember currentElder;
 
     try {
-      if (logger.isDebugEnabled()) {
-        currentElder = getElderCandidate();
-        logger.debug("Waiting for Elder to change. Expecting Elder to be {}, is {}.",
-            desiredElder, currentElder);
-      }
-
       while (true) {
         if (clusterDistributionManager.isCloseInProgress()) {
           return false;
@@ -145,6 +139,10 @@ public class ClusterElderManager {
         if (desiredElder.equals(currentElder)) {
           return true;
         }
+        if (logger.isDebugEnabled()) {
+          logger.debug("Expecting Elder to be {} but it is {}.",
+              desiredElder, currentElder);
+        }
         if (!clusterDistributionManager.isCurrentMember(desiredElder)) {
           return false; // no longer present
         }
@@ -155,18 +153,13 @@ public class ClusterElderManager {
           return false;
         }
 
-        try {
-          changeListener.waitForMembershipChange();
-        } catch (InterruptedException e) {
-          interrupted = true;
+        if (logger.isDebugEnabled()) {
+          logger.debug("Waiting for membership to change");
         }
+        changeListener.waitForMembershipChange();
       }
     } finally {
       clusterDistributionManager.removeMembershipListener(changeListener);
-
-      if (interrupted) {
-        Thread.currentThread().interrupt();
-      }
     }
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
index 0dd6330..2e52900 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
@@ -166,7 +166,7 @@ public interface DistributionManager extends ReplySender {
    * @throws IllegalStateException if elder try lock fails
    * @since GemFire 4.0
    */
-  ElderState getElderState(boolean force);
+  ElderState getElderState(boolean force) throws InterruptedException;
 
   /**
    * Returns the membership port of the underlying distribution manager used for communication.
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/ElderInitProcessor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/ElderInitProcessor.java
index a693f9f..8c56e2f 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/ElderInitProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/ElderInitProcessor.java
@@ -169,21 +169,27 @@ public class ElderInitProcessor extends ReplyProcessor21 {
       ArrayList grantorVersions = new ArrayList(); // grantor versions
       ArrayList grantorSerialNumbers = new ArrayList(); // serial numbers of grantor svcs
       ArrayList nonGrantors = new ArrayList(); // svc names non-grantor for
-      if (dm.waitForElder(this.getSender())) {
-        GrantorRequestProcessor.readyForElderRecovery(dm.getSystem(), this.getSender(), null);
-        DLockService.recoverRmtElder(grantors, grantorVersions, grantorSerialNumbers, nonGrantors);
-        reply(dm, grantors, grantorVersions, grantorSerialNumbers, nonGrantors);
-      } else if (dm.getOtherNormalDistributionManagerIds().isEmpty()) {
-        // Either we're alone (and received a message from an unknown member) or else we haven't
-        // yet processed a view. In either case, we clearly don't have any grantors,
-        // so we return empty lists.
-
-        logger.info(LogMarker.DLS_MARKER,
-            "{}: returning empty lists because I know of no other members.",
-            this);
-        reply(dm, grantors, grantorVersions, grantorSerialNumbers, nonGrantors);
-      } else {
-        logger.info(LogMarker.DLS_MARKER, "{}: disregarding request from departed member.", this);
+      try {
+        if (dm.waitForElder(this.getSender())) {
+          GrantorRequestProcessor.readyForElderRecovery(dm.getSystem(), this.getSender(), null);
+          DLockService
+              .recoverRmtElder(grantors, grantorVersions, grantorSerialNumbers, nonGrantors);
+          reply(dm, grantors, grantorVersions, grantorSerialNumbers, nonGrantors);
+        } else if (dm.getOtherNormalDistributionManagerIds().isEmpty()) {
+          // Either we're alone (and received a message from an unknown member) or else we haven't
+          // yet processed a view. In either case, we clearly don't have any grantors,
+          // so we return empty lists.
+
+          logger.info(LogMarker.DLS_MARKER,
+              "{}: returning empty lists because I know of no other members.",
+              this);
+          reply(dm, grantors, grantorVersions, grantorSerialNumbers, nonGrantors);
+        } else {
+          logger.info(LogMarker.DLS_MARKER, "{}: disregarding request from departed member.", this);
+        }
+      } catch (InterruptedException e) {
+        // shutting down
+        logger.info("Elder initialization interrupted - will not send a reply");
       }
     }
 
@@ -207,7 +213,7 @@ public class ElderInitProcessor extends ReplyProcessor21 {
     @Override
     public String toString() {
       StringBuffer buff = new StringBuffer();
-      buff.append("ElderInitMessage (processorId='").append(this.processorId).append(")");
+      buff.append("ElderInitMessage (processorId=").append(this.processorId).append(")");
       return buff.toString();
     }
   }
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/GrantorRequestProcessor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/GrantorRequestProcessor.java
index cc8dbee..85ce578 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/GrantorRequestProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/GrantorRequestProcessor.java
@@ -209,7 +209,8 @@ public class GrantorRequestProcessor extends ReplyProcessor21 {
    * Sets currentElder to the memberId of the current elder if elder is remote; null if elder is in
    * our vm.
    */
-  private static ElderState startElderCall(InternalDistributedSystem sys, DLockService dls) {
+  private static ElderState startElderCall(InternalDistributedSystem sys, DLockService dls)
+      throws InterruptedException {
     InternalDistributedMember elder;
     ElderState es = null;
 
@@ -328,7 +329,12 @@ public class GrantorRequestProcessor extends ReplyProcessor21 {
     try {
       do {
         tryNewElder = false;
-        final ElderState es = startElderCall(system, service);
+        ElderState es = null;
+        try {
+          es = startElderCall(system, service);
+        } catch (InterruptedException e) {
+          interrupted = true;
+        }
         dm.throwIfDistributionStopped();
         try {
           if (es != null) {
@@ -491,7 +497,12 @@ public class GrantorRequestProcessor extends ReplyProcessor21 {
 
     protected void basicProcess(final DistributionManager dm) {
       // we should be in the elder
-      ElderState es = dm.getElderState(true);
+      ElderState es = null;
+      try {
+        dm.getElderState(true);
+      } catch (InterruptedException e) {
+        logger.info("Interrupted while processing {}", this);
+      }
       switch (this.opCode) {
         case GET_OP:
           replyGrantorInfo(dm, es.getGrantor(this.serviceName, getSender(), this.dlsSerialNumber));
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
index f9352ea..46110e2 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
@@ -2620,7 +2620,7 @@ public class GMSMembershipManager implements MembershipManager, Manager {
   @Override
   public boolean isShutdownStarted() {
     ClusterDistributionManager dm = listener.getDM();
-    return shutdownInProgress || (dm != null && dm.isShutdownStarted());
+    return shutdownInProgress || (dm != null && dm.isCloseInProgress());
   }
 
   @Override
diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/ClusterElderManagerTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/ClusterElderManagerTest.java
index 8f9cad9..48b0a1f 100644
--- a/geode-core/src/test/java/org/apache/geode/distributed/internal/ClusterElderManagerTest.java
+++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/ClusterElderManagerTest.java
@@ -123,7 +123,7 @@ public class ClusterElderManagerTest {
   }
 
   @Test
-  public void waitForElderReturnsTrueIfAnotherMemberIsElder() {
+  public void waitForElderReturnsTrueIfAnotherMemberIsElder() throws InterruptedException {
     ClusterElderManager clusterElderManager = new ClusterElderManager(clusterDistributionManager);
     when(clusterDistributionManager.getId()).thenReturn(member0);
     when(clusterDistributionManager.getViewMembers()).thenReturn(Arrays.asList(member1, member0));
@@ -131,7 +131,7 @@ public class ClusterElderManagerTest {
   }
 
   @Test
-  public void waitForElderReturnsFalseIfWeAreElder() {
+  public void waitForElderReturnsFalseIfWeAreElder() throws InterruptedException {
     ClusterElderManager clusterElderManager = new ClusterElderManager(clusterDistributionManager);
     when(clusterDistributionManager.getId()).thenReturn(member0);
     when(clusterDistributionManager.isCurrentMember(eq(member1))).thenReturn(true);
@@ -140,7 +140,8 @@ public class ClusterElderManagerTest {
   }
 
   @Test
-  public void waitForElderReturnsFalseIfDesiredElderIsNotACurrentMember() {
+  public void waitForElderReturnsFalseIfDesiredElderIsNotACurrentMember()
+      throws InterruptedException {
     ClusterElderManager clusterElderManager = new ClusterElderManager(clusterDistributionManager);
     when(clusterDistributionManager.getId()).thenReturn(member0);
     when(clusterDistributionManager.getViewMembers())
@@ -154,8 +155,24 @@ public class ClusterElderManagerTest {
     when(clusterDistributionManager.getId()).thenReturn(member0);
     when(clusterDistributionManager.getViewMembers()).thenReturn(Arrays.asList(member1, member0));
     when(clusterDistributionManager.isCurrentMember(eq(member0))).thenReturn(true);
+    when(clusterDistributionManager.isCloseInProgress()).thenReturn(false);
+
+    assertThatInterruptableRunnableWaits(() -> {
+      try {
+        clusterElderManager.waitForElder(member0);
+      } catch (InterruptedException e) {
+      }
+    });
+  }
 
-    assertThatRunnableWaits(() -> clusterElderManager.waitForElder(member0));
+  @Test
+  public void waitForElderDoesNotWaitIfShuttingDown() throws InterruptedException {
+    ClusterElderManager clusterElderManager = new ClusterElderManager(clusterDistributionManager);
+    when(clusterDistributionManager.getId()).thenReturn(member0);
+    when(clusterDistributionManager.getViewMembers()).thenReturn(Arrays.asList(member1, member0));
+    when(clusterDistributionManager.isCurrentMember(eq(member0))).thenReturn(true);
+    when(clusterDistributionManager.isCloseInProgress()).thenReturn(true);
+    assertThat(clusterElderManager.waitForElder(member0)).isFalse();
   }
 
   @Test
@@ -193,7 +210,7 @@ public class ClusterElderManagerTest {
   }
 
   @Test
-  public void getElderStateAsElder() {
+  public void getElderStateAsElder() throws InterruptedException {
     Supplier<ElderState> elderStateSupplier = mock(Supplier.class);
     ElderState elderState = mock(ElderState.class);
     when(elderStateSupplier.get()).thenReturn(elderState);
@@ -207,7 +224,7 @@ public class ClusterElderManagerTest {
   }
 
   @Test
-  public void getElderStateGetsBuiltOnceAsElder() {
+  public void getElderStateGetsBuiltOnceAsElder() throws InterruptedException {
     Supplier<ElderState> elderStateSupplier = mock(Supplier.class);
     ElderState elderState = mock(ElderState.class);
     when(elderStateSupplier.get()).thenReturn(elderState);
@@ -244,7 +261,7 @@ public class ClusterElderManagerTest {
   }
 
   @Test
-  public void getElderStateNotAsElder() {
+  public void getElderStateNotAsElder() throws InterruptedException {
     Supplier<ElderState> elderStateSupplier = mock(Supplier.class);
     ClusterElderManager clusterElderManager =
         new ClusterElderManager(clusterDistributionManager, elderStateSupplier);
@@ -264,12 +281,17 @@ public class ClusterElderManagerTest {
     when(clusterDistributionManager.getViewMembers()).thenReturn(Arrays.asList(member1, member0));
     when(clusterDistributionManager.isCurrentMember(eq(member0))).thenReturn(true);
 
-    assertThatRunnableWaits(() -> clusterElderManager.getElderState(true));
+    assertThatInterruptableRunnableWaits(() -> {
+      try {
+        clusterElderManager.getElderState(true);
+      } catch (InterruptedException e) {
+      }
+    });
 
     verify(elderStateSupplier, times(0)).get();
   }
 
-  private void assertThatRunnableWaits(Runnable runnable) {
+  private void assertThatInterruptableRunnableWaits(Runnable runnable) {
     Thread waitThread = new Thread(runnable);
 
     waitThread.start();
@@ -281,6 +303,7 @@ public class ClusterElderManagerTest {
           .until(() -> waitingStates.contains(waitThread.getState()));
     } finally {
       waitThread.interrupt();
+      await().until(() -> !waitThread.isAlive());
     }
   }
 }