You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/08/09 17:12:07 UTC

geode git commit: GEODE-3407: fix deadlock between JMX and Membership

Repository: geode
Updated Branches:
  refs/heads/develop 11a0b34cd -> 0fde215a1


GEODE-3407: fix deadlock between JMX and Membership

Change InternalClientMembership to not synchronize on CacheFactory
by accepting Cache parameter from CacheServerBridge.

New regression test confirms bug and this fix.

This closes #697


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/0fde215a
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/0fde215a
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/0fde215a

Branch: refs/heads/develop
Commit: 0fde215a11f61c12c24da519fd195b50e39f1ee5
Parents: 11a0b34
Author: Kirk Lund <kl...@apache.org>
Authored: Mon Aug 7 16:39:04 2017 -0700
Committer: Kirk Lund <kl...@apache.org>
Committed: Wed Aug 9 10:00:36 2017 -0700

----------------------------------------------------------------------
 .../cache/tier/InternalClientMembership.java    |  30 +++--
 .../internal/beans/CacheServerBridge.java       |  18 ++-
 .../internal/beans/ManagementAdapter.java       |   2 +-
 .../management/internal/beans/ServerBridge.java |  16 ++-
 ...verBridgeClientMembershipRegressionTest.java | 129 +++++++++++++++++++
 5 files changed, 173 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/0fde215a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/InternalClientMembership.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/InternalClientMembership.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/InternalClientMembership.java
index 504081d..6fac66f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/InternalClientMembership.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/InternalClientMembership.java
@@ -27,9 +27,6 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.net.SocketCreator;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.CancelException;
@@ -37,11 +34,14 @@ import org.apache.geode.SystemFailure;
 import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.client.PoolManager;
 import org.apache.geode.cache.client.internal.PoolImpl;
+import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.DistributedSystemDisconnectedException;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.ServerLocation;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl;
 import org.apache.geode.internal.cache.tier.sockets.ClientHealthMonitor;
 import org.apache.geode.internal.i18n.LocalizedStrings;
@@ -290,18 +290,22 @@ public class InternalClientMembership {
   }
 
   public static Map getClientQueueSizes() {
-    Map clientQueueSizes = new HashMap();
-    InternalCache c = (InternalCache) CacheFactory.getAnyInstance();
-    if (c == null) // Add a NULL Check
-      return clientQueueSizes;
+    return getClientQueueSizes((InternalCache) CacheFactory.getAnyInstance());
+  }
 
-    for (Iterator bsii = c.getCacheServers().iterator(); bsii.hasNext();) {
-      CacheServerImpl bsi = (CacheServerImpl) bsii.next();
-      AcceptorImpl ai = bsi.getAcceptor();
-      if (ai != null && ai.getCacheClientNotifier() != null) {
-        clientQueueSizes.putAll(ai.getCacheClientNotifier().getClientQueueSizes());
+  public static Map getClientQueueSizes(final InternalCache cache) {
+    if (cache == null) {
+      return Collections.emptyMap();
+    }
+
+    Map clientQueueSizes = new HashMap();
+    for (CacheServer cacheServer : cache.getCacheServers()) {
+      CacheServerImpl cacheServerImpl = (CacheServerImpl) cacheServer;
+      AcceptorImpl acceptor = cacheServerImpl.getAcceptor();
+      if (acceptor != null && acceptor.getCacheClientNotifier() != null) {
+        clientQueueSizes.putAll(acceptor.getCacheClientNotifier().getClientQueueSizes());
       }
-    } // for
+    }
     return clientQueueSizes;
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/0fde215a/geode-core/src/main/java/org/apache/geode/management/internal/beans/CacheServerBridge.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/CacheServerBridge.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/CacheServerBridge.java
index 728402c..3b49f65 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/CacheServerBridge.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/CacheServerBridge.java
@@ -57,6 +57,7 @@ import org.apache.geode.management.ClientHealthStatus;
 import org.apache.geode.management.ClientQueueDetail;
 import org.apache.geode.management.ServerLoadData;
 import org.apache.geode.management.internal.ManagementConstants;
+import org.apache.geode.management.internal.beans.stats.MBeanStatsMonitor;
 import org.apache.geode.management.internal.beans.stats.StatType;
 import org.apache.geode.management.internal.beans.stats.StatsAverageLatency;
 import org.apache.geode.management.internal.beans.stats.StatsKey;
@@ -101,7 +102,7 @@ public class CacheServerBridge extends ServerBridge {
     }
   }
 
-  public CacheServerBridge(CacheServer cacheServer, InternalCache cache) {
+  public CacheServerBridge(final InternalCache cache, final CacheServer cacheServer) {
     super(cacheServer);
     this.cacheServer = cacheServer;
     this.cache = cache;
@@ -110,7 +111,18 @@ public class CacheServerBridge extends ServerBridge {
     initializeCacheServerStats();
   }
 
-  // Dummy constructor for testing purpose only TODO why is this public then?
+  // For testing only
+  public CacheServerBridge(final InternalCache cache, final CacheServer cacheServer,
+      final AcceptorImpl acceptor, final MBeanStatsMonitor monitor) {
+    super(acceptor, monitor);
+    this.cacheServer = cacheServer;
+    this.cache = cache;
+    this.qs = cache.getQueryService();
+
+    initializeCacheServerStats();
+  }
+
+  // For testing only
   public CacheServerBridge() {
     super();
     initializeCacheServerStats();
@@ -648,7 +660,7 @@ public class CacheServerBridge extends ServerBridge {
   }
 
   public int getNumSubscriptions() {
-    Map clientProxyMembershipIDMap = InternalClientMembership.getClientQueueSizes();
+    Map clientProxyMembershipIDMap = InternalClientMembership.getClientQueueSizes(cache);
     return clientProxyMembershipIDMap.keySet().size();
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/0fde215a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java
index 003a8f3..8ea84f5 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java
@@ -633,7 +633,7 @@ public class ManagementAdapter {
       return;
     }
 
-    CacheServerBridge cacheServerBridge = new CacheServerBridge(cacheServer, internalCache);
+    CacheServerBridge cacheServerBridge = new CacheServerBridge(internalCache, cacheServer);
     cacheServerBridge.setMemberMBeanBridge(memberMBeanBridge);
 
     CacheServerMBean cacheServerMBean = new CacheServerMBean(cacheServerBridge);

http://git-wip-us.apache.org/repos/asf/geode/blob/0fde215a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ServerBridge.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ServerBridge.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ServerBridge.java
index 6834998..6008cdc 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ServerBridge.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ServerBridge.java
@@ -29,7 +29,6 @@ public class ServerBridge {
 
   protected MBeanStatsMonitor monitor;
 
-
   protected StatsRate getRequestRate;
 
   protected StatsRate putRequestRate;
@@ -38,13 +37,20 @@ public class ServerBridge {
 
   protected StatsAverageLatency putRequestAvgLatency;
 
-
   protected AcceptorImpl acceptor;
 
+  public ServerBridge(final CacheServer cacheServer) {
+    this((CacheServerImpl) cacheServer,
+        new MBeanStatsMonitor(ManagementStrings.SERVER_MONITOR.toLocalizedString()));
+  }
 
-  public ServerBridge(CacheServer cacheServer) {
-    this.monitor = new MBeanStatsMonitor(ManagementStrings.SERVER_MONITOR.toLocalizedString());
-    this.acceptor = ((CacheServerImpl) cacheServer).getAcceptor();
+  public ServerBridge(final CacheServerImpl cacheServer, final MBeanStatsMonitor monitor) {
+    this(cacheServer.getAcceptor(), monitor);
+  }
+
+  public ServerBridge(final AcceptorImpl acceptor, final MBeanStatsMonitor monitor) {
+    this.monitor = monitor;
+    this.acceptor = acceptor;
     initializeStats();
     startMonitor();
   }

http://git-wip-us.apache.org/repos/asf/geode/blob/0fde215a/geode-core/src/test/java/org/apache/geode/management/internal/beans/CacheServerBridgeClientMembershipRegressionTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/beans/CacheServerBridgeClientMembershipRegressionTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/beans/CacheServerBridgeClientMembershipRegressionTest.java
new file mode 100644
index 0000000..232df0a
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/management/internal/beans/CacheServerBridgeClientMembershipRegressionTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.beans;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl;
+import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
+import org.apache.geode.management.internal.beans.stats.MBeanStatsMonitor;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+/**
+ * Regression test that confirms bug GEODE-3407.
+ *
+ * <p>
+ * GEODE-3407: JMX and membership may deadlock on CacheFactory.getAnyInstance
+ */
+@Category(UnitTest.class)
+public class CacheServerBridgeClientMembershipRegressionTest {
+
+  private final AtomicBoolean after = new AtomicBoolean();
+  private final AtomicBoolean before = new AtomicBoolean();
+
+  private CacheServerBridge cacheServerBridge;
+
+  private ExecutorService synchronizing;
+  private ExecutorService blocking;
+  private CountDownLatch latch;
+
+  private InternalCache cache;
+  private CacheServerImpl cacheServer;
+  private AcceptorImpl acceptor;
+  private MBeanStatsMonitor monitor;
+
+  @Before
+  public void setUp() throws Exception {
+    this.synchronizing = Executors.newSingleThreadExecutor();
+    this.blocking = Executors.newSingleThreadExecutor();
+    this.latch = new CountDownLatch(1);
+
+    this.cache = mock(InternalCache.class);
+    this.cacheServer = mock(CacheServerImpl.class);
+    this.acceptor = mock(AcceptorImpl.class);
+    this.monitor = mock(MBeanStatsMonitor.class);
+
+    when(cache.getQueryService()).thenReturn(mock(QueryService.class));
+    when(acceptor.getStats()).thenReturn(mock(CacheServerStats.class));
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (latch.getCount() > 0) {
+      latch.countDown();
+    }
+  }
+
+  @Test
+  public void getNumSubscriptionsDeadlocksOnCacheFactory() throws Exception {
+    givenCacheFactoryIsSynchronized();
+    givenCacheServerBridge();
+
+    blocking.execute(() -> {
+      try {
+        before.set(true);
+
+        // getNumSubscriptions -> getClientQueueSizes -> synchronizes on CacheFactory
+        cacheServerBridge.getNumSubscriptions();
+
+      } catch (CacheClosedException ignored) {
+      } finally {
+        after.set(true);
+      }
+    });
+
+    await().atMost(10, SECONDS).until(() -> before.get());
+
+    // if deadlocked, then this line will throw ConditionTimeoutException
+    await().atMost(10, SECONDS).until(() -> assertThat(after.get()).isTrue());
+  }
+
+  private void givenCacheFactoryIsSynchronized() {
+    synchronizing.execute(() -> {
+      synchronized (CacheFactory.class) {
+        try {
+          latch.await(2, MINUTES);
+        } catch (InterruptedException e) {
+          throw new AssertionError(e);
+        }
+      }
+    });
+  }
+
+  private void givenCacheServerBridge() {
+    cacheServerBridge = new CacheServerBridge(cache, cacheServer, acceptor, monitor);
+  }
+
+}