You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2018/06/29 21:52:38 UTC

[geode] branch develop updated: GEODE-5349 State-flush operation may exit early allowing for cache inconsistency

This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new dfafad7  GEODE-5349 State-flush operation may exit early allowing for cache inconsistency
dfafad7 is described below

commit dfafad79c2700a0356faf2eed1fe9ff4248e4ed4
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Fri Jun 29 14:49:52 2018 -0700

    GEODE-5349 State-flush operation may exit early allowing for cache inconsistency
    
    Removed the ability for this method to exit without the operation count
    falling to zero.  Instead it issues a fatal-level log message, which
    translates into a severe-level alert for operators.  This can help tech
    support know which server a customer should terminate in order to break
    a distributed deadlock.
    
    I also added an info-level message that is issued if a warning/fatal message
    has been issued noting that the wait has completed.  This parallels what
    we do in ReplyProcessor21 if we've issued a warning that a cache-op response
    hasn't been received within the ack-wait-threshold period.
    
    This closes #2083
---
 .../distributed/internal/DistributionAdvisor.java  |  45 +++++-----
 .../geode/internal/cache/DistributedRegion.java    |   4 +-
 .../org/apache/geode/internal/cache/EventID.java   |   5 +-
 .../geode/internal/i18n/LocalizedStrings.java      |   3 -
 .../internal/DistributionAdvisorDUnitTest.java     | 100 +++++++++++++--------
 .../protobuf/v1/utilities/ProtobufUtilities.java   |   4 +-
 6 files changed, 90 insertions(+), 71 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
index 5ce42ed..9ac4563 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
@@ -45,7 +45,6 @@ import org.apache.geode.internal.cache.persistence.PersistentMemberID;
 import org.apache.geode.internal.cache.versions.VersionSource;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
-import org.apache.geode.internal.logging.log4j.LocalizedMessage;
 import org.apache.geode.internal.logging.log4j.LogMarker;
 import org.apache.geode.internal.util.ArrayUtils;
 
@@ -789,43 +788,44 @@ public class DistributionAdvisor {
     return membershipVersion;
   }
 
-  public void waitForCurrentOperations() {
-    long timeout =
-        1000L * this.getDistributionManager().getSystem().getConfig().getAckWaitThreshold();
-    waitForCurrentOperations(timeout);
-  }
-
   /**
    * wait for the current operations being sent on views prior to the joining of the given member to
    * be placed on communication channels before returning
    *
    * @since GemFire 5.1
    */
-  public void waitForCurrentOperations(long timeout) {
-    // CacheProfile profile = (CacheProfile)getProfile(member);
-    // long targetVersion = profile.initialMembershipVersion - 1;
+  public void waitForCurrentOperations() {
+    long timeout =
+        1000L * this.getDistributionManager().getSystem().getConfig().getAckWaitThreshold();
+    waitForCurrentOperations(logger, timeout, timeout * 2L);
+  }
 
+  public void waitForCurrentOperations(Logger alertLogger, long warnMS, long severeAlertMS) {
     // this may wait longer than it should if the membership version changes, dumping
     // more operations into the previousVersionOpCount
-    long startTime = System.currentTimeMillis();
-    long warnTime = startTime + timeout;
-    long quitTime = warnTime + timeout - 1000L;
+    final long startTime = System.currentTimeMillis();
+    final long warnTime = startTime + warnMS;
+    final long severeAlertTime = startTime + severeAlertMS;
     boolean warned = false;
+    boolean severeAlertIssued = false;
     final boolean isDebugEnabled_STATE_FLUSH_OP =
-        logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE);
+        DistributionAdvisor.logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE);
     while (true) {
       long opCount;
       synchronized (this.opCountLock) {
         opCount = this.previousVersionOpCount;
       }
       if (opCount <= 0) {
+        if (warned) {
+          alertLogger.info("Wait for current operations completed");
+        }
         break;
       }
       // The advisor's close() method will set the pVOC to zero. This loop
       // must not terminate due to cache closure until that happens.
       // See bug 34361 comment 79
       if (isDebugEnabled_STATE_FLUSH_OP) {
-        logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE,
+        DistributionAdvisor.logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE,
             "Waiting for current operations to finish({})", opCount);
       }
       try {
@@ -836,18 +836,19 @@ public class DistributionAdvisor {
       long now = System.currentTimeMillis();
       if ((!warned) && System.currentTimeMillis() >= warnTime) {
         warned = true;
-        logger.warn(LocalizedMessage.create(
-            LocalizedStrings.DistributionAdvisor_0_SEC_HAVE_ELAPSED_WHILE_WAITING_FOR_CURRENT_OPERATIONS_TO_DISTRIBUTE,
-            Long.toString((warnTime - startTime) / 1000L)));
-      } else if (warned && (now >= quitTime)) {
+        alertLogger.warn("This operation has been stalled for {} milliseconds waiting for "
+            + "current operations to complete.", warnMS);
+      } else if (warned && !severeAlertIssued && (now >= severeAlertTime)) {
         // OSProcess.printStacks(0);
-        throw new GemFireIOException(
-            "Current operations did not distribute within " + (now - startTime) + " milliseconds");
+        alertLogger.fatal("This operation has been stalled for {} milliseconds "
+            + "waiting for current operations to complete.  Something may be blocking operations.",
+            severeAlertMS);
+        severeAlertIssued = true;
       }
     }
     if (this.membershipClosed) {
       if (isDebugEnabled_STATE_FLUSH_OP) {
-        logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE,
+        DistributionAdvisor.logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE,
             "State Flush stopped waiting for operations to distribute because advisor has been closed");
       }
     }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index b9f3d16..2fba672 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -2480,8 +2480,6 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
     }
     // Fix for #48066 - make sure that region operations are completely
     // distributed to peers before destroying the region.
-    long timeout =
-        1000L * getCache().getInternalDistributedSystem().getConfig().getAckWaitThreshold();
     Boolean flushOnClose =
         !Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "no-flush-on-close"); // test hook
     if (!this.cache.forcedDisconnect() && flushOnClose
@@ -2489,7 +2487,7 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
         && this.getDistributionManager().getMembershipManager().isConnected()) {
       getDistributionAdvisor().forceNewMembershipVersion();
       try {
-        getDistributionAdvisor().waitForCurrentOperations(timeout);
+        getDistributionAdvisor().waitForCurrentOperations();
       } catch (Exception e) {
         // log this but try to close the region so that listeners are invoked
         logger.warn(LocalizedMessage.create(LocalizedStrings.GemFireCache_0_ERROR_CLOSING_REGION_1,
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java
index 46130ed..748977f 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java
@@ -45,7 +45,6 @@ import org.apache.geode.internal.cache.ha.ThreadIdentifier;
 import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
-import org.apache.geode.internal.logging.log4j.LogMarker;
 import org.apache.geode.internal.util.Breadcrumbs;
 
 /**
@@ -504,7 +503,7 @@ public class EventID implements DataSerializableFixedID, Serializable, Externali
 
   @Override
   public String toString() {
-    if (logger.isTraceEnabled(LogMarker.EVENT_ID_TO_STRING_VERBOSE)) {
+    if (logger.isDebugEnabled()) {
       return expensiveToString();
     } else {
       return cheapToString();
@@ -525,7 +524,7 @@ public class EventID implements DataSerializableFixedID, Serializable, Externali
       }
       buf.append(";");
     } else {
-      buf.append("id=").append(membershipID.length).append("bytes;");
+      buf.append("[id=").append(membershipID.length).append(" bytes;");
     }
     // buf.append(this.membershipID.toString());
     buf.append("threadID=");
diff --git a/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java b/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
index cd86460..9cb45c4 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
@@ -5816,9 +5816,6 @@ public class LocalizedStrings {
           "AutoConnectionSource UpdateLocatorListTask started with interval={0} ms.");
   public static final StringId AutoConnectionSourceImpl_COULD_NOT_CREATE_A_NEW_CONNECTION_TO_SERVER_0 =
       new StringId(4484, "Could not create a new connection to server: {0}");
-  public static final StringId DistributionAdvisor_0_SEC_HAVE_ELAPSED_WHILE_WAITING_FOR_CURRENT_OPERATIONS_TO_DISTRIBUTE =
-      new StringId(4485,
-          "{0} seconds have elapsed while waiting for current operations to distribute");
   public static final StringId DistributionManager_I_0_AM_THE_ELDER =
       new StringId(4486, "I, {0}, am the elder.");
   public static final StringId InternalLocator_STARTING_PEER_LOCATION_FOR_0 =
diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionAdvisorDUnitTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionAdvisorDUnitTest.java
index d773537..f17cbea 100644
--- a/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionAdvisorDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionAdvisorDUnitTest.java
@@ -15,17 +15,24 @@
 package org.apache.geode.distributed.internal;
 
 import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.logging.log4j.Logger;
+import org.awaitility.Awaitility;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import org.apache.geode.CancelCriterion;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.test.dunit.Invoke;
 import org.apache.geode.test.dunit.SerializableRunnable;
@@ -49,46 +56,17 @@ public class DistributionAdvisorDUnitTest extends JUnit4DistributedTestCase {
       }
     });
 
-    // reinitialize the advisor
-    this.advisor = DistributionAdvisor.createDistributionAdvisor(new DistributionAdvisee() {
-      public DistributionAdvisee getParentAdvisee() {
-        return null;
-      }
-
-      public InternalDistributedSystem getSystem() {
-        return DistributionAdvisorDUnitTest.this.getSystem();
-      }
-
-      public String getName() {
-        return "DistributionAdvisorDUnitTest";
-      }
-
-      public String getFullPath() {
-        return getName();
-      }
-
-      public DistributionManager getDistributionManager() {
-        return getSystem().getDistributionManager();
-      }
+    DistributionAdvisee advisee = mock(DistributionAdvisee.class);
+    when(advisee.getName()).thenReturn("DistributionAdvisorDUnitTest");
+    when(advisee.getSystem()).thenReturn(getSystem());
+    when(advisee.getFullPath()).thenReturn(getName());
+    when(advisee.getDistributionManager()).thenReturn(getSystem().getDistributionManager());
+    when(advisee.getCancelCriterion()).thenReturn(getSystem().getCancelCriterion());
 
-      public DistributionAdvisor getDistributionAdvisor() {
-        return DistributionAdvisorDUnitTest.this.advisor;
-      }
-
-      public DistributionAdvisor.Profile getProfile() {
-        return null;
-      }
+    advisor = DistributionAdvisor.createDistributionAdvisor(advisee);
 
-      public void fillInProfile(DistributionAdvisor.Profile profile) {}
+    when(advisee.getDistributionAdvisor()).thenReturn(advisor);
 
-      public int getSerialNumber() {
-        return 0;
-      }
-
-      public CancelCriterion getCancelCriterion() {
-        return DistributionAdvisorDUnitTest.this.getSystem().getCancelCriterion();
-      }
-    });
     Set ids = getSystem().getDistributionManager().getOtherNormalDistributionManagerIds();
     assertEquals(VM.getVMCount(), ids.size());
     List profileList = new ArrayList();
@@ -120,4 +98,50 @@ public class DistributionAdvisorDUnitTest extends JUnit4DistributedTestCase {
     }
     assertEquals(expected, advisor.adviseGeneric());
   }
+
+  @Test
+  public void advisorIssuesSevereAlertForStateFlush() throws Exception {
+    final long membershipVersion = advisor.startOperation();
+    advisor.forceNewMembershipVersion();
+
+    final Logger logger = mock(Logger.class);
+    final Exception exceptionHolder[] = new Exception[1];
+    Thread thread = new Thread(() -> {
+      try {
+        advisor.waitForCurrentOperations(logger, 2000, 4000);
+      } catch (RuntimeException e) {
+        synchronized (exceptionHolder) {
+          exceptionHolder[0] = e;
+        }
+      }
+    });
+    thread.setDaemon(true);
+    thread.start();
+
+    try {
+      Awaitility.await().atMost(15, TimeUnit.SECONDS).until(() -> {
+        verify(logger, atLeastOnce()).warn(isA(String.class), isA(Long.class));
+      });
+      Awaitility.await().atMost(15, TimeUnit.SECONDS).until(() -> {
+        verify(logger, atLeastOnce()).fatal(isA(String.class), isA(Long.class));
+      });
+      advisor.endOperation(membershipVersion);
+      Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> {
+        verify(logger, atLeastOnce()).info("Wait for current operations completed");
+      });
+      Awaitility.await().atMost(15, TimeUnit.SECONDS).until(() -> !thread.isAlive());
+    } finally {
+      if (thread.isAlive()) {
+        advisor.endOperation(membershipVersion);
+        thread.interrupt();
+        thread.join(10000);
+      } else {
+        synchronized (exceptionHolder) {
+          if (exceptionHolder[0] != null) {
+            throw exceptionHolder[0];
+          }
+        }
+      }
+    }
+  }
 }
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/utilities/ProtobufUtilities.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/utilities/ProtobufUtilities.java
index 877aac0..d044bf2 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/utilities/ProtobufUtilities.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/utilities/ProtobufUtilities.java
@@ -26,8 +26,8 @@ import org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.En
  * mainly focused on helper functions which can be used in building BasicTypes for use in other
  * messages or those used to create the top level Message objects.
  * <p>
- * Helper functions specific to creating ClientProtocol.Messages can be found at
- * {@link ProtobufRequestUtilities}
+ * Helper functions specific to creating ClientProtocol.Messages can be found in
+ * ProtobufRequestUtilities in the test source set of this module.
  */
 @Experimental
 public abstract class ProtobufUtilities {