You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/02/29 21:50:12 UTC

[04/19] incubator-geode git commit: GEODE-1006: View Creator thread blocked by JGroups FlowControl when alerting

GEODE-1006: View Creator thread blocked by JGroups FlowControl when alerting

Added a test showing that e40962cf6 fixed the problem.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/bd3ac70d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/bd3ac70d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/bd3ac70d

Branch: refs/heads/feature/GEODE-953
Commit: bd3ac70da2d9167fc3f83bb704cfaedb487fafcc
Parents: 330ba74
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Fri Feb 26 11:54:46 2016 -0800
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Fri Feb 26 11:55:00 2016 -0800

----------------------------------------------------------------------
 .../gms/messenger/JGroupsMessenger.java         | 38 ++++----
 .../internal/logging/log4j/AlertAppender.java   | 95 +++++++++++---------
 .../messenger/JGroupsMessengerJUnitTest.java    | 19 ++++
 3 files changed, 91 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd3ac70d/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
index 1db0c42..75db969 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
@@ -781,23 +781,7 @@ public class JGroupsMessenger implements Messenger {
     Message msg = new Message();
     msg.setDest(null);
     msg.setSrc(src);
-    // GemFire uses its own reply processors so there is no need
-    // to maintain message order
-    msg.setFlag(Flag.OOB);
-    // Bundling is mostly only useful if we're doing no-ack work,
-    // which is fairly rare
-    msg.setFlag(Flag.DONT_BUNDLE);
-
-    if (gfmsg.getProcessorType() == DistributionManager.HIGH_PRIORITY_EXECUTOR
-        || gfmsg instanceof HighPriorityDistributionMessage
-        || AlertAppender.isThreadAlerting()) {
-      msg.setFlag(Flag.NO_FC);
-      msg.setFlag(Flag.SKIP_BARRIER);
-    }
-    if (gfmsg instanceof DistributedCacheOperation.CacheOperationMessage) {
-      // we don't want to see our own cache operation messages
-      msg.setTransientFlag(Message.TransientFlag.DONT_LOOPBACK);
-    }
+    setMessageFlags(gfmsg, msg);
     try {
       long start = services.getStatistics().startMsgSerialization();
       HeapDataOutputStream out_stream =
@@ -822,6 +806,26 @@ public class JGroupsMessenger implements Messenger {
     return msg;
   }
 
+  void setMessageFlags(DistributionMessage gfmsg, Message msg) {
+    // GemFire uses its own reply processors so there is no need
+    // to maintain message order
+    msg.setFlag(Flag.OOB);
+    // Bundling is mostly only useful if we're doing no-ack work,
+    // which is fairly rare
+    msg.setFlag(Flag.DONT_BUNDLE);
+
+    if (gfmsg.getProcessorType() == DistributionManager.HIGH_PRIORITY_EXECUTOR
+        || gfmsg instanceof HighPriorityDistributionMessage
+        || AlertAppender.isThreadAlerting()) {
+      msg.setFlag(Flag.NO_FC);
+      msg.setFlag(Flag.SKIP_BARRIER);
+    }
+    if (gfmsg instanceof DistributedCacheOperation.CacheOperationMessage) {
+      // we don't want to see our own cache operation messages
+      msg.setTransientFlag(Message.TransientFlag.DONT_LOOPBACK);
+    }
+  }
+
 
   /**
    * deserialize a jgroups payload.  If it's a DistributionMessage find

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd3ac70d/geode-core/src/main/java/com/gemstone/gemfire/internal/logging/log4j/AlertAppender.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/logging/log4j/AlertAppender.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/logging/log4j/AlertAppender.java
index e071511..9de3b46 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/logging/log4j/AlertAppender.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/logging/log4j/AlertAppender.java
@@ -90,6 +90,10 @@ public final class AlertAppender extends AbstractAppender implements PropertyCha
     this.alertingDisabled = alertingDisabled;
   }
   
+  public static void setIsAlerting(boolean isAlerting) {
+    alerting.set(isAlerting? Boolean.TRUE : Boolean.FALSE);
+  }
+  
   /**
    * This method is optimized with the assumption that at least one listener
    * has set a level which requires that the event be sent. This is ensured
@@ -106,57 +110,60 @@ public final class AlertAppender extends AbstractAppender implements PropertyCha
     if ((alerting.get())) {
       return;
     }
-    alerting.set(Boolean.TRUE);
-    
-    final boolean isDebugEnabled = logger.isDebugEnabled();
-    if (isDebugEnabled) {
-      logger.debug("Delivering an alert event: {}", event);
-    }
-        
-    InternalDistributedSystem ds = InternalDistributedSystem.getConnectedInstance();
-    if (ds == null) {
-      // Use info level to avoid triggering another alert
-      logger.info("Did not append alert event because the distributed system is set to null.");
-      return;
-    }
-    DistributionManager distMgr = (DistributionManager) ds.getDistributionManager();
-    
-    final int intLevel = logLevelToAlertLevel(event.getLevel().intLevel());
-    final Date date = new Date(event.getTimeMillis());
-    final String threadName = event.getThreadName();
-    final String logMessage = event.getMessage().getFormattedMessage();
-    final String stackTrace = ThreadUtils.stackTraceToString(event.getThrown(), true);
-    final String connectionName = ds.getConfig().getName();
+    setIsAlerting(true);
     
-    for (Listener listener : this.listeners) {
-      if (event.getLevel().intLevel() > listener.getLevel().intLevel()) {
-        break;
+    try {
+      
+      final boolean isDebugEnabled = logger.isDebugEnabled();
+      if (isDebugEnabled) {
+        logger.debug("Delivering an alert event: {}", event);
       }
           
-      try {
-        AlertListenerMessage alertMessage = AlertListenerMessage.create(listener.getMember(), intLevel, date,
-            connectionName, threadName, Thread.currentThread().getId(), logMessage, stackTrace);
-        
-        if (listener.getMember().equals(distMgr.getDistributionManagerId())) {
-          if (isDebugEnabled) {
-            logger.debug("Delivering local alert message: {}, {}, {}, {}, {}, [{}], [{}].", listener.getMember(), intLevel, date,
-                connectionName, threadName, logMessage, stackTrace);
-          }
-          alertMessage.process(distMgr);
-        } else {
-          if (isDebugEnabled) {
-            logger.debug("Delivering remote alert message: {}, {}, {}, {}, {}, [{}], [{}].", listener.getMember(), intLevel, date,
-                connectionName, threadName, logMessage, stackTrace);
+      InternalDistributedSystem ds = InternalDistributedSystem.getConnectedInstance();
+      if (ds == null) {
+        // Use info level to avoid triggering another alert
+        logger.info("Did not append alert event because the distributed system is set to null.");
+        return;
+      }
+      DistributionManager distMgr = (DistributionManager) ds.getDistributionManager();
+      
+      final int intLevel = logLevelToAlertLevel(event.getLevel().intLevel());
+      final Date date = new Date(event.getTimeMillis());
+      final String threadName = event.getThreadName();
+      final String logMessage = event.getMessage().getFormattedMessage();
+      final String stackTrace = ThreadUtils.stackTraceToString(event.getThrown(), true);
+      final String connectionName = ds.getConfig().getName();
+      
+      for (Listener listener : this.listeners) {
+        if (event.getLevel().intLevel() > listener.getLevel().intLevel()) {
+          break;
+        }
+            
+        try {
+          AlertListenerMessage alertMessage = AlertListenerMessage.create(listener.getMember(), intLevel, date,
+              connectionName, threadName, Thread.currentThread().getId(), logMessage, stackTrace);
+          
+          if (listener.getMember().equals(distMgr.getDistributionManagerId())) {
+            if (isDebugEnabled) {
+              logger.debug("Delivering local alert message: {}, {}, {}, {}, {}, [{}], [{}].", listener.getMember(), intLevel, date,
+                  connectionName, threadName, logMessage, stackTrace);
+            }
+            alertMessage.process(distMgr);
+          } else {
+            if (isDebugEnabled) {
+              logger.debug("Delivering remote alert message: {}, {}, {}, {}, {}, [{}], [{}].", listener.getMember(), intLevel, date,
+                  connectionName, threadName, logMessage, stackTrace);
+            }
+            distMgr.putOutgoing(alertMessage);
           }
-          distMgr.putOutgoing(alertMessage);
+        } catch (ReenteredConnectException e) {
+          // OK. We can't send to this recipient because we're in the middle of
+          // trying to connect to it.
         }
-      } catch (ReenteredConnectException e) {
-        // OK. We can't send to this recipient because we're in the middle of
-        // trying to connect to it.
       }
+    } finally {
+      setIsAlerting(false);
     }
-    
-    alerting.set(Boolean.FALSE);
   }
 
   public synchronized void addAlertListener(final DistributedMember member, final int alertLevel) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd3ac70d/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
index 3dcf035..2ad970b 100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
@@ -37,6 +37,7 @@ import org.jgroups.Address;
 import org.jgroups.Event;
 import org.jgroups.JChannel;
 import org.jgroups.Message;
+import org.jgroups.Message.Flag;
 import org.jgroups.conf.ClassConfigurator;
 import org.jgroups.protocols.UNICAST3;
 import org.jgroups.protocols.pbcast.NAKACK2;
@@ -79,6 +80,7 @@ import com.gemstone.gemfire.internal.HeapDataOutputStream;
 import com.gemstone.gemfire.internal.Version;
 import com.gemstone.gemfire.internal.admin.remote.RemoteTransportConfig;
 import com.gemstone.gemfire.internal.cache.DistributedCacheOperation;
+import com.gemstone.gemfire.internal.logging.log4j.AlertAppender;
 import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
 
 import junit.framework.Assert;
@@ -197,6 +199,23 @@ public class JGroupsMessengerJUnitTest {
   }
   
   @Test
+  public void alertMessagesBypassFlowControl() throws Exception {
+    initMocks(false);
+    Message jgmsg = new Message();
+    DistributionMessage dmsg = mock(DistributionMessage.class);
+    when(dmsg.getProcessorType()).thenReturn(DistributionManager.SERIAL_EXECUTOR);
+    messenger.setMessageFlags(dmsg, jgmsg);
+    assertFalse("expected no_fc to not be set in " + jgmsg.getFlags(), jgmsg.isFlagSet(Message.Flag.NO_FC));
+    AlertAppender.setIsAlerting(true);
+    try {
+      messenger.setMessageFlags(dmsg, jgmsg);
+      assertTrue("expected no_fc to be set in " + jgmsg.getFlags(), jgmsg.isFlagSet(Message.Flag.NO_FC));
+    } finally {
+      AlertAppender.setIsAlerting(false);
+    }
+  }
+  
+  @Test
   public void testMemberWeightIsSerialized() throws Exception {
     HeapDataOutputStream out = new HeapDataOutputStream(500, Version.CURRENT);
     InternalDistributedMember mbr = createAddress(8888);