You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/02/29 21:50:12 UTC
[04/19] incubator-geode git commit: GEODE-1006: View Creator thread
blocked by JGroups FlowControl when alerting
GEODE-1006: View Creator thread blocked by JGroups FlowControl when alerting
Added a test showing that e40962cf6 fixed the problem.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/bd3ac70d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/bd3ac70d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/bd3ac70d
Branch: refs/heads/feature/GEODE-953
Commit: bd3ac70da2d9167fc3f83bb704cfaedb487fafcc
Parents: 330ba74
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Fri Feb 26 11:54:46 2016 -0800
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Fri Feb 26 11:55:00 2016 -0800
----------------------------------------------------------------------
.../gms/messenger/JGroupsMessenger.java | 38 ++++----
.../internal/logging/log4j/AlertAppender.java | 95 +++++++++++---------
.../messenger/JGroupsMessengerJUnitTest.java | 19 ++++
3 files changed, 91 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd3ac70d/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
index 1db0c42..75db969 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
@@ -781,23 +781,7 @@ public class JGroupsMessenger implements Messenger {
Message msg = new Message();
msg.setDest(null);
msg.setSrc(src);
- // GemFire uses its own reply processors so there is no need
- // to maintain message order
- msg.setFlag(Flag.OOB);
- // Bundling is mostly only useful if we're doing no-ack work,
- // which is fairly rare
- msg.setFlag(Flag.DONT_BUNDLE);
-
- if (gfmsg.getProcessorType() == DistributionManager.HIGH_PRIORITY_EXECUTOR
- || gfmsg instanceof HighPriorityDistributionMessage
- || AlertAppender.isThreadAlerting()) {
- msg.setFlag(Flag.NO_FC);
- msg.setFlag(Flag.SKIP_BARRIER);
- }
- if (gfmsg instanceof DistributedCacheOperation.CacheOperationMessage) {
- // we don't want to see our own cache operation messages
- msg.setTransientFlag(Message.TransientFlag.DONT_LOOPBACK);
- }
+ setMessageFlags(gfmsg, msg);
try {
long start = services.getStatistics().startMsgSerialization();
HeapDataOutputStream out_stream =
@@ -822,6 +806,26 @@ public class JGroupsMessenger implements Messenger {
return msg;
}
+ void setMessageFlags(DistributionMessage gfmsg, Message msg) {
+ // GemFire uses its own reply processors so there is no need
+ // to maintain message order
+ msg.setFlag(Flag.OOB);
+ // Bundling is mostly only useful if we're doing no-ack work,
+ // which is fairly rare
+ msg.setFlag(Flag.DONT_BUNDLE);
+
+ if (gfmsg.getProcessorType() == DistributionManager.HIGH_PRIORITY_EXECUTOR
+ || gfmsg instanceof HighPriorityDistributionMessage
+ || AlertAppender.isThreadAlerting()) {
+ msg.setFlag(Flag.NO_FC);
+ msg.setFlag(Flag.SKIP_BARRIER);
+ }
+ if (gfmsg instanceof DistributedCacheOperation.CacheOperationMessage) {
+ // we don't want to see our own cache operation messages
+ msg.setTransientFlag(Message.TransientFlag.DONT_LOOPBACK);
+ }
+ }
+
/**
* deserialize a jgroups payload. If it's a DistributionMessage find
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd3ac70d/geode-core/src/main/java/com/gemstone/gemfire/internal/logging/log4j/AlertAppender.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/logging/log4j/AlertAppender.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/logging/log4j/AlertAppender.java
index e071511..9de3b46 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/logging/log4j/AlertAppender.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/logging/log4j/AlertAppender.java
@@ -90,6 +90,10 @@ public final class AlertAppender extends AbstractAppender implements PropertyCha
this.alertingDisabled = alertingDisabled;
}
+ public static void setIsAlerting(boolean isAlerting) {
+ alerting.set(isAlerting? Boolean.TRUE : Boolean.FALSE);
+ }
+
/**
* This method is optimized with the assumption that at least one listener
* has set a level which requires that the event be sent. This is ensured
@@ -106,57 +110,60 @@ public final class AlertAppender extends AbstractAppender implements PropertyCha
if ((alerting.get())) {
return;
}
- alerting.set(Boolean.TRUE);
-
- final boolean isDebugEnabled = logger.isDebugEnabled();
- if (isDebugEnabled) {
- logger.debug("Delivering an alert event: {}", event);
- }
-
- InternalDistributedSystem ds = InternalDistributedSystem.getConnectedInstance();
- if (ds == null) {
- // Use info level to avoid triggering another alert
- logger.info("Did not append alert event because the distributed system is set to null.");
- return;
- }
- DistributionManager distMgr = (DistributionManager) ds.getDistributionManager();
-
- final int intLevel = logLevelToAlertLevel(event.getLevel().intLevel());
- final Date date = new Date(event.getTimeMillis());
- final String threadName = event.getThreadName();
- final String logMessage = event.getMessage().getFormattedMessage();
- final String stackTrace = ThreadUtils.stackTraceToString(event.getThrown(), true);
- final String connectionName = ds.getConfig().getName();
+ setIsAlerting(true);
- for (Listener listener : this.listeners) {
- if (event.getLevel().intLevel() > listener.getLevel().intLevel()) {
- break;
+ try {
+
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+ if (isDebugEnabled) {
+ logger.debug("Delivering an alert event: {}", event);
}
- try {
- AlertListenerMessage alertMessage = AlertListenerMessage.create(listener.getMember(), intLevel, date,
- connectionName, threadName, Thread.currentThread().getId(), logMessage, stackTrace);
-
- if (listener.getMember().equals(distMgr.getDistributionManagerId())) {
- if (isDebugEnabled) {
- logger.debug("Delivering local alert message: {}, {}, {}, {}, {}, [{}], [{}].", listener.getMember(), intLevel, date,
- connectionName, threadName, logMessage, stackTrace);
- }
- alertMessage.process(distMgr);
- } else {
- if (isDebugEnabled) {
- logger.debug("Delivering remote alert message: {}, {}, {}, {}, {}, [{}], [{}].", listener.getMember(), intLevel, date,
- connectionName, threadName, logMessage, stackTrace);
+ InternalDistributedSystem ds = InternalDistributedSystem.getConnectedInstance();
+ if (ds == null) {
+ // Use info level to avoid triggering another alert
+ logger.info("Did not append alert event because the distributed system is set to null.");
+ return;
+ }
+ DistributionManager distMgr = (DistributionManager) ds.getDistributionManager();
+
+ final int intLevel = logLevelToAlertLevel(event.getLevel().intLevel());
+ final Date date = new Date(event.getTimeMillis());
+ final String threadName = event.getThreadName();
+ final String logMessage = event.getMessage().getFormattedMessage();
+ final String stackTrace = ThreadUtils.stackTraceToString(event.getThrown(), true);
+ final String connectionName = ds.getConfig().getName();
+
+ for (Listener listener : this.listeners) {
+ if (event.getLevel().intLevel() > listener.getLevel().intLevel()) {
+ break;
+ }
+
+ try {
+ AlertListenerMessage alertMessage = AlertListenerMessage.create(listener.getMember(), intLevel, date,
+ connectionName, threadName, Thread.currentThread().getId(), logMessage, stackTrace);
+
+ if (listener.getMember().equals(distMgr.getDistributionManagerId())) {
+ if (isDebugEnabled) {
+ logger.debug("Delivering local alert message: {}, {}, {}, {}, {}, [{}], [{}].", listener.getMember(), intLevel, date,
+ connectionName, threadName, logMessage, stackTrace);
+ }
+ alertMessage.process(distMgr);
+ } else {
+ if (isDebugEnabled) {
+ logger.debug("Delivering remote alert message: {}, {}, {}, {}, {}, [{}], [{}].", listener.getMember(), intLevel, date,
+ connectionName, threadName, logMessage, stackTrace);
+ }
+ distMgr.putOutgoing(alertMessage);
}
- distMgr.putOutgoing(alertMessage);
+ } catch (ReenteredConnectException e) {
+ // OK. We can't send to this recipient because we're in the middle of
+ // trying to connect to it.
}
- } catch (ReenteredConnectException e) {
- // OK. We can't send to this recipient because we're in the middle of
- // trying to connect to it.
}
+ } finally {
+ setIsAlerting(false);
}
-
- alerting.set(Boolean.FALSE);
}
public synchronized void addAlertListener(final DistributedMember member, final int alertLevel) {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd3ac70d/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
index 3dcf035..2ad970b 100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
@@ -37,6 +37,7 @@ import org.jgroups.Address;
import org.jgroups.Event;
import org.jgroups.JChannel;
import org.jgroups.Message;
+import org.jgroups.Message.Flag;
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.protocols.UNICAST3;
import org.jgroups.protocols.pbcast.NAKACK2;
@@ -79,6 +80,7 @@ import com.gemstone.gemfire.internal.HeapDataOutputStream;
import com.gemstone.gemfire.internal.Version;
import com.gemstone.gemfire.internal.admin.remote.RemoteTransportConfig;
import com.gemstone.gemfire.internal.cache.DistributedCacheOperation;
+import com.gemstone.gemfire.internal.logging.log4j.AlertAppender;
import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
import junit.framework.Assert;
@@ -197,6 +199,23 @@ public class JGroupsMessengerJUnitTest {
}
@Test
+ public void alertMessagesBypassFlowControl() throws Exception {
+ initMocks(false);
+ Message jgmsg = new Message();
+ DistributionMessage dmsg = mock(DistributionMessage.class);
+ when(dmsg.getProcessorType()).thenReturn(DistributionManager.SERIAL_EXECUTOR);
+ messenger.setMessageFlags(dmsg, jgmsg);
+ assertFalse("expected no_fc to not be set in " + jgmsg.getFlags(), jgmsg.isFlagSet(Message.Flag.NO_FC));
+ AlertAppender.setIsAlerting(true);
+ try {
+ messenger.setMessageFlags(dmsg, jgmsg);
+ assertTrue("expected no_fc to be set in " + jgmsg.getFlags(), jgmsg.isFlagSet(Message.Flag.NO_FC));
+ } finally {
+ AlertAppender.setIsAlerting(false);
+ }
+ }
+
+ @Test
public void testMemberWeightIsSerialized() throws Exception {
HeapDataOutputStream out = new HeapDataOutputStream(500, Version.CURRENT);
InternalDistributedMember mbr = createAddress(8888);