You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2018/06/29 21:52:38 UTC
[geode] branch develop updated: GEODE-5349 State-flush operation
may exit early allowing for cache inconsistency
This is an automated email from the ASF dual-hosted git repository.
bschuchardt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new dfafad7 GEODE-5349 State-flush operation may exit early allowing for cache inconsistency
dfafad7 is described below
commit dfafad79c2700a0356faf2eed1fe9ff4248e4ed4
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Fri Jun 29 14:49:52 2018 -0700
GEODE-5349 State-flush operation may exit early allowing for cache inconsistency
Removed the ability for this method to exit without the operation count
falling to zero. Instead it issues a fatal-level log message, which
translates into a severe-level alert for operators. This can help tech
support know which server a customer should terminate in order to break
a distributed deadlock.
I also added an info-level message that is issued if a warning/fatal message
has been issued noting that the wait has completed. This parallels what
we do in ReplyProcessor21 if we've issued a warning that a cache-op response
hasn't been received within the ack-wait-threshold period.
This closes #2083
---
.../distributed/internal/DistributionAdvisor.java | 45 +++++-----
.../geode/internal/cache/DistributedRegion.java | 4 +-
.../org/apache/geode/internal/cache/EventID.java | 5 +-
.../geode/internal/i18n/LocalizedStrings.java | 3 -
.../internal/DistributionAdvisorDUnitTest.java | 100 +++++++++++++--------
.../protobuf/v1/utilities/ProtobufUtilities.java | 4 +-
6 files changed, 90 insertions(+), 71 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
index 5ce42ed..9ac4563 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
@@ -45,7 +45,6 @@ import org.apache.geode.internal.cache.persistence.PersistentMemberID;
import org.apache.geode.internal.cache.versions.VersionSource;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
-import org.apache.geode.internal.logging.log4j.LocalizedMessage;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.util.ArrayUtils;
@@ -789,43 +788,44 @@ public class DistributionAdvisor {
return membershipVersion;
}
- public void waitForCurrentOperations() {
- long timeout =
- 1000L * this.getDistributionManager().getSystem().getConfig().getAckWaitThreshold();
- waitForCurrentOperations(timeout);
- }
-
/**
* wait for the current operations being sent on views prior to the joining of the given member to
* be placed on communication channels before returning
*
* @since GemFire 5.1
*/
- public void waitForCurrentOperations(long timeout) {
- // CacheProfile profile = (CacheProfile)getProfile(member);
- // long targetVersion = profile.initialMembershipVersion - 1;
+ public void waitForCurrentOperations() {
+ long timeout =
+ 1000L * this.getDistributionManager().getSystem().getConfig().getAckWaitThreshold();
+ waitForCurrentOperations(logger, timeout, timeout * 2L);
+ }
+ public void waitForCurrentOperations(Logger alertLogger, long warnMS, long severeAlertMS) {
// this may wait longer than it should if the membership version changes, dumping
// more operations into the previousVersionOpCount
- long startTime = System.currentTimeMillis();
- long warnTime = startTime + timeout;
- long quitTime = warnTime + timeout - 1000L;
+ final long startTime = System.currentTimeMillis();
+ final long warnTime = startTime + warnMS;
+ final long severeAlertTime = startTime + severeAlertMS;
boolean warned = false;
+ boolean severeAlertIssued = false;
final boolean isDebugEnabled_STATE_FLUSH_OP =
- logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE);
+ DistributionAdvisor.logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE);
while (true) {
long opCount;
synchronized (this.opCountLock) {
opCount = this.previousVersionOpCount;
}
if (opCount <= 0) {
+ if (warned) {
+ alertLogger.info("Wait for current operations completed");
+ }
break;
}
// The advisor's close() method will set the pVOC to zero. This loop
// must not terminate due to cache closure until that happens.
// See bug 34361 comment 79
if (isDebugEnabled_STATE_FLUSH_OP) {
- logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE,
+ DistributionAdvisor.logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE,
"Waiting for current operations to finish({})", opCount);
}
try {
@@ -836,18 +836,19 @@ public class DistributionAdvisor {
long now = System.currentTimeMillis();
if ((!warned) && System.currentTimeMillis() >= warnTime) {
warned = true;
- logger.warn(LocalizedMessage.create(
- LocalizedStrings.DistributionAdvisor_0_SEC_HAVE_ELAPSED_WHILE_WAITING_FOR_CURRENT_OPERATIONS_TO_DISTRIBUTE,
- Long.toString((warnTime - startTime) / 1000L)));
- } else if (warned && (now >= quitTime)) {
+ alertLogger.warn("This operation has been stalled for {} milliseconds waiting for "
+ + "current operations to complete.", warnMS);
+ } else if (warned && !severeAlertIssued && (now >= severeAlertTime)) {
// OSProcess.printStacks(0);
- throw new GemFireIOException(
- "Current operations did not distribute within " + (now - startTime) + " milliseconds");
+ alertLogger.fatal("This operation has been stalled for {} milliseconds "
+ + "waiting for current operations to complete. Something may be blocking operations.",
+ severeAlertMS);
+ severeAlertIssued = true;
}
}
if (this.membershipClosed) {
if (isDebugEnabled_STATE_FLUSH_OP) {
- logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE,
+ DistributionAdvisor.logger.trace(LogMarker.STATE_FLUSH_OP_VERBOSE,
"State Flush stopped waiting for operations to distribute because advisor has been closed");
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index b9f3d16..2fba672 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -2480,8 +2480,6 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
}
// Fix for #48066 - make sure that region operations are completely
// distributed to peers before destroying the region.
- long timeout =
- 1000L * getCache().getInternalDistributedSystem().getConfig().getAckWaitThreshold();
Boolean flushOnClose =
!Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "no-flush-on-close"); // test hook
if (!this.cache.forcedDisconnect() && flushOnClose
@@ -2489,7 +2487,7 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
&& this.getDistributionManager().getMembershipManager().isConnected()) {
getDistributionAdvisor().forceNewMembershipVersion();
try {
- getDistributionAdvisor().waitForCurrentOperations(timeout);
+ getDistributionAdvisor().waitForCurrentOperations();
} catch (Exception e) {
// log this but try to close the region so that listeners are invoked
logger.warn(LocalizedMessage.create(LocalizedStrings.GemFireCache_0_ERROR_CLOSING_REGION_1,
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java
index 46130ed..748977f 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java
@@ -45,7 +45,6 @@ import org.apache.geode.internal.cache.ha.ThreadIdentifier;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
-import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.util.Breadcrumbs;
/**
@@ -504,7 +503,7 @@ public class EventID implements DataSerializableFixedID, Serializable, Externali
@Override
public String toString() {
- if (logger.isTraceEnabled(LogMarker.EVENT_ID_TO_STRING_VERBOSE)) {
+ if (logger.isDebugEnabled()) {
return expensiveToString();
} else {
return cheapToString();
@@ -525,7 +524,7 @@ public class EventID implements DataSerializableFixedID, Serializable, Externali
}
buf.append(";");
} else {
- buf.append("id=").append(membershipID.length).append("bytes;");
+ buf.append("[id=").append(membershipID.length).append(" bytes;");
}
// buf.append(this.membershipID.toString());
buf.append("threadID=");
diff --git a/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java b/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
index cd86460..9cb45c4 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
@@ -5816,9 +5816,6 @@ public class LocalizedStrings {
"AutoConnectionSource UpdateLocatorListTask started with interval={0} ms.");
public static final StringId AutoConnectionSourceImpl_COULD_NOT_CREATE_A_NEW_CONNECTION_TO_SERVER_0 =
new StringId(4484, "Could not create a new connection to server: {0}");
- public static final StringId DistributionAdvisor_0_SEC_HAVE_ELAPSED_WHILE_WAITING_FOR_CURRENT_OPERATIONS_TO_DISTRIBUTE =
- new StringId(4485,
- "{0} seconds have elapsed while waiting for current operations to distribute");
public static final StringId DistributionManager_I_0_AM_THE_ELDER =
new StringId(4486, "I, {0}, am the elder.");
public static final StringId InternalLocator_STARTING_PEER_LOCATION_FOR_0 =
diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionAdvisorDUnitTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionAdvisorDUnitTest.java
index d773537..f17cbea 100644
--- a/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionAdvisorDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionAdvisorDUnitTest.java
@@ -15,17 +15,24 @@
package org.apache.geode.distributed.internal;
import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.logging.log4j.Logger;
+import org.awaitility.Awaitility;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.apache.geode.CancelCriterion;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.SerializableRunnable;
@@ -49,46 +56,17 @@ public class DistributionAdvisorDUnitTest extends JUnit4DistributedTestCase {
}
});
- // reinitialize the advisor
- this.advisor = DistributionAdvisor.createDistributionAdvisor(new DistributionAdvisee() {
- public DistributionAdvisee getParentAdvisee() {
- return null;
- }
-
- public InternalDistributedSystem getSystem() {
- return DistributionAdvisorDUnitTest.this.getSystem();
- }
-
- public String getName() {
- return "DistributionAdvisorDUnitTest";
- }
-
- public String getFullPath() {
- return getName();
- }
-
- public DistributionManager getDistributionManager() {
- return getSystem().getDistributionManager();
- }
+ DistributionAdvisee advisee = mock(DistributionAdvisee.class);
+ when(advisee.getName()).thenReturn("DistributionAdvisorDUnitTest");
+ when(advisee.getSystem()).thenReturn(getSystem());
+ when(advisee.getFullPath()).thenReturn(getName());
+ when(advisee.getDistributionManager()).thenReturn(getSystem().getDistributionManager());
+ when(advisee.getCancelCriterion()).thenReturn(getSystem().getCancelCriterion());
- public DistributionAdvisor getDistributionAdvisor() {
- return DistributionAdvisorDUnitTest.this.advisor;
- }
-
- public DistributionAdvisor.Profile getProfile() {
- return null;
- }
+ advisor = DistributionAdvisor.createDistributionAdvisor(advisee);
- public void fillInProfile(DistributionAdvisor.Profile profile) {}
+ when(advisee.getDistributionAdvisor()).thenReturn(advisor);
- public int getSerialNumber() {
- return 0;
- }
-
- public CancelCriterion getCancelCriterion() {
- return DistributionAdvisorDUnitTest.this.getSystem().getCancelCriterion();
- }
- });
Set ids = getSystem().getDistributionManager().getOtherNormalDistributionManagerIds();
assertEquals(VM.getVMCount(), ids.size());
List profileList = new ArrayList();
@@ -120,4 +98,50 @@ public class DistributionAdvisorDUnitTest extends JUnit4DistributedTestCase {
}
assertEquals(expected, advisor.adviseGeneric());
}
+
+ @Test
+ public void advisorIssuesSevereAlertForStateFlush() throws Exception {
+ final long membershipVersion = advisor.startOperation();
+ advisor.forceNewMembershipVersion();
+
+ final Logger logger = mock(Logger.class);
+ final Exception exceptionHolder[] = new Exception[1];
+ Thread thread = new Thread(() -> {
+ try {
+ advisor.waitForCurrentOperations(logger, 2000, 4000);
+ } catch (RuntimeException e) {
+ synchronized (exceptionHolder) {
+ exceptionHolder[0] = e;
+ }
+ }
+ });
+ thread.setDaemon(true);
+ thread.start();
+
+ try {
+ Awaitility.await().atMost(15, TimeUnit.SECONDS).until(() -> {
+ verify(logger, atLeastOnce()).warn(isA(String.class), isA(Long.class));
+ });
+ Awaitility.await().atMost(15, TimeUnit.SECONDS).until(() -> {
+ verify(logger, atLeastOnce()).fatal(isA(String.class), isA(Long.class));
+ });
+ advisor.endOperation(membershipVersion);
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> {
+ verify(logger, atLeastOnce()).info("Wait for current operations completed");
+ });
+ Awaitility.await().atMost(15, TimeUnit.SECONDS).until(() -> !thread.isAlive());
+ } finally {
+ if (thread.isAlive()) {
+ advisor.endOperation(membershipVersion);
+ thread.interrupt();
+ thread.join(10000);
+ } else {
+ synchronized (exceptionHolder) {
+ if (exceptionHolder[0] != null) {
+ throw exceptionHolder[0];
+ }
+ }
+ }
+ }
+ }
}
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/utilities/ProtobufUtilities.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/utilities/ProtobufUtilities.java
index 877aac0..d044bf2 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/utilities/ProtobufUtilities.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/utilities/ProtobufUtilities.java
@@ -26,8 +26,8 @@ import org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.En
* mainly focused on helper functions which can be used in building BasicTypes for use in other
* messages or those used to create the top level Message objects.
* <p>
- * Helper functions specific to creating ClientProtocol.Messages can be found at
- * {@link ProtobufRequestUtilities}
+ * Helper functions specific to creating ClientProtocol.Messages can be found in
+ * ProtobufRequestUtilities in the test source set of this module.
*/
@Experimental
public abstract class ProtobufUtilities {