You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/08/09 17:12:07 UTC
geode git commit: GEODE-3407: fix deadlock between JMX and Membership
Repository: geode
Updated Branches:
refs/heads/develop 11a0b34cd -> 0fde215a1
GEODE-3407: fix deadlock between JMX and Membership
Change InternalClientMembership to not synchronize on CacheFactory
by accepting Cache parameter from CacheServerBridge.
New regression test confirms bug and this fix.
This closes #697
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/0fde215a
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/0fde215a
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/0fde215a
Branch: refs/heads/develop
Commit: 0fde215a11f61c12c24da519fd195b50e39f1ee5
Parents: 11a0b34
Author: Kirk Lund <kl...@apache.org>
Authored: Mon Aug 7 16:39:04 2017 -0700
Committer: Kirk Lund <kl...@apache.org>
Committed: Wed Aug 9 10:00:36 2017 -0700
----------------------------------------------------------------------
.../cache/tier/InternalClientMembership.java | 30 +++--
.../internal/beans/CacheServerBridge.java | 18 ++-
.../internal/beans/ManagementAdapter.java | 2 +-
.../management/internal/beans/ServerBridge.java | 16 ++-
...verBridgeClientMembershipRegressionTest.java | 129 +++++++++++++++++++
5 files changed, 173 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/0fde215a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/InternalClientMembership.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/InternalClientMembership.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/InternalClientMembership.java
index 504081d..6fac66f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/InternalClientMembership.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/InternalClientMembership.java
@@ -27,9 +27,6 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.net.SocketCreator;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
@@ -37,11 +34,14 @@ import org.apache.geode.SystemFailure;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache.client.internal.PoolImpl;
+import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ServerLocation;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl;
import org.apache.geode.internal.cache.tier.sockets.ClientHealthMonitor;
import org.apache.geode.internal.i18n.LocalizedStrings;
@@ -290,18 +290,22 @@ public class InternalClientMembership {
}
public static Map getClientQueueSizes() {
- Map clientQueueSizes = new HashMap();
- InternalCache c = (InternalCache) CacheFactory.getAnyInstance();
- if (c == null) // Add a NULL Check
- return clientQueueSizes;
+ return getClientQueueSizes((InternalCache) CacheFactory.getAnyInstance());
+ }
- for (Iterator bsii = c.getCacheServers().iterator(); bsii.hasNext();) {
- CacheServerImpl bsi = (CacheServerImpl) bsii.next();
- AcceptorImpl ai = bsi.getAcceptor();
- if (ai != null && ai.getCacheClientNotifier() != null) {
- clientQueueSizes.putAll(ai.getCacheClientNotifier().getClientQueueSizes());
+ public static Map getClientQueueSizes(final InternalCache cache) {
+ if (cache == null) {
+ return Collections.emptyMap();
+ }
+
+ Map clientQueueSizes = new HashMap();
+ for (CacheServer cacheServer : cache.getCacheServers()) {
+ CacheServerImpl cacheServerImpl = (CacheServerImpl) cacheServer;
+ AcceptorImpl acceptor = cacheServerImpl.getAcceptor();
+ if (acceptor != null && acceptor.getCacheClientNotifier() != null) {
+ clientQueueSizes.putAll(acceptor.getCacheClientNotifier().getClientQueueSizes());
}
- } // for
+ }
return clientQueueSizes;
}
http://git-wip-us.apache.org/repos/asf/geode/blob/0fde215a/geode-core/src/main/java/org/apache/geode/management/internal/beans/CacheServerBridge.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/CacheServerBridge.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/CacheServerBridge.java
index 728402c..3b49f65 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/CacheServerBridge.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/CacheServerBridge.java
@@ -57,6 +57,7 @@ import org.apache.geode.management.ClientHealthStatus;
import org.apache.geode.management.ClientQueueDetail;
import org.apache.geode.management.ServerLoadData;
import org.apache.geode.management.internal.ManagementConstants;
+import org.apache.geode.management.internal.beans.stats.MBeanStatsMonitor;
import org.apache.geode.management.internal.beans.stats.StatType;
import org.apache.geode.management.internal.beans.stats.StatsAverageLatency;
import org.apache.geode.management.internal.beans.stats.StatsKey;
@@ -101,7 +102,7 @@ public class CacheServerBridge extends ServerBridge {
}
}
- public CacheServerBridge(CacheServer cacheServer, InternalCache cache) {
+ public CacheServerBridge(final InternalCache cache, final CacheServer cacheServer) {
super(cacheServer);
this.cacheServer = cacheServer;
this.cache = cache;
@@ -110,7 +111,18 @@ public class CacheServerBridge extends ServerBridge {
initializeCacheServerStats();
}
- // Dummy constructor for testing purpose only TODO why is this public then?
+ // For testing only
+ public CacheServerBridge(final InternalCache cache, final CacheServer cacheServer,
+ final AcceptorImpl acceptor, final MBeanStatsMonitor monitor) {
+ super(acceptor, monitor);
+ this.cacheServer = cacheServer;
+ this.cache = cache;
+ this.qs = cache.getQueryService();
+
+ initializeCacheServerStats();
+ }
+
+ // For testing only
public CacheServerBridge() {
super();
initializeCacheServerStats();
@@ -648,7 +660,7 @@ public class CacheServerBridge extends ServerBridge {
}
public int getNumSubscriptions() {
- Map clientProxyMembershipIDMap = InternalClientMembership.getClientQueueSizes();
+ Map clientProxyMembershipIDMap = InternalClientMembership.getClientQueueSizes(cache);
return clientProxyMembershipIDMap.keySet().size();
}
http://git-wip-us.apache.org/repos/asf/geode/blob/0fde215a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java
index 003a8f3..8ea84f5 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java
@@ -633,7 +633,7 @@ public class ManagementAdapter {
return;
}
- CacheServerBridge cacheServerBridge = new CacheServerBridge(cacheServer, internalCache);
+ CacheServerBridge cacheServerBridge = new CacheServerBridge(internalCache, cacheServer);
cacheServerBridge.setMemberMBeanBridge(memberMBeanBridge);
CacheServerMBean cacheServerMBean = new CacheServerMBean(cacheServerBridge);
http://git-wip-us.apache.org/repos/asf/geode/blob/0fde215a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ServerBridge.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ServerBridge.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ServerBridge.java
index 6834998..6008cdc 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ServerBridge.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ServerBridge.java
@@ -29,7 +29,6 @@ public class ServerBridge {
protected MBeanStatsMonitor monitor;
-
protected StatsRate getRequestRate;
protected StatsRate putRequestRate;
@@ -38,13 +37,20 @@ public class ServerBridge {
protected StatsAverageLatency putRequestAvgLatency;
-
protected AcceptorImpl acceptor;
+ public ServerBridge(final CacheServer cacheServer) {
+ this((CacheServerImpl) cacheServer,
+ new MBeanStatsMonitor(ManagementStrings.SERVER_MONITOR.toLocalizedString()));
+ }
- public ServerBridge(CacheServer cacheServer) {
- this.monitor = new MBeanStatsMonitor(ManagementStrings.SERVER_MONITOR.toLocalizedString());
- this.acceptor = ((CacheServerImpl) cacheServer).getAcceptor();
+ public ServerBridge(final CacheServerImpl cacheServer, final MBeanStatsMonitor monitor) {
+ this(cacheServer.getAcceptor(), monitor);
+ }
+
+ public ServerBridge(final AcceptorImpl acceptor, final MBeanStatsMonitor monitor) {
+ this.monitor = monitor;
+ this.acceptor = acceptor;
initializeStats();
startMonitor();
}
http://git-wip-us.apache.org/repos/asf/geode/blob/0fde215a/geode-core/src/test/java/org/apache/geode/management/internal/beans/CacheServerBridgeClientMembershipRegressionTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/beans/CacheServerBridgeClientMembershipRegressionTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/beans/CacheServerBridgeClientMembershipRegressionTest.java
new file mode 100644
index 0000000..232df0a
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/management/internal/beans/CacheServerBridgeClientMembershipRegressionTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.beans;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl;
+import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
+import org.apache.geode.management.internal.beans.stats.MBeanStatsMonitor;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+/**
+ * Regression test that confirms bug GEODE-3407.
+ *
+ * <p>
+ * GEODE-3407: JMX and membership may deadlock on CacheFactory.getAnyInstance
+ */
+@Category(UnitTest.class)
+public class CacheServerBridgeClientMembershipRegressionTest {
+
+ private final AtomicBoolean after = new AtomicBoolean();
+ private final AtomicBoolean before = new AtomicBoolean();
+
+ private CacheServerBridge cacheServerBridge;
+
+ private ExecutorService synchronizing;
+ private ExecutorService blocking;
+ private CountDownLatch latch;
+
+ private InternalCache cache;
+ private CacheServerImpl cacheServer;
+ private AcceptorImpl acceptor;
+ private MBeanStatsMonitor monitor;
+
+ @Before
+ public void setUp() throws Exception {
+ this.synchronizing = Executors.newSingleThreadExecutor();
+ this.blocking = Executors.newSingleThreadExecutor();
+ this.latch = new CountDownLatch(1);
+
+ this.cache = mock(InternalCache.class);
+ this.cacheServer = mock(CacheServerImpl.class);
+ this.acceptor = mock(AcceptorImpl.class);
+ this.monitor = mock(MBeanStatsMonitor.class);
+
+ when(cache.getQueryService()).thenReturn(mock(QueryService.class));
+ when(acceptor.getStats()).thenReturn(mock(CacheServerStats.class));
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (latch.getCount() > 0) {
+ latch.countDown();
+ }
+ }
+
+ @Test
+ public void getNumSubscriptionsDeadlocksOnCacheFactory() throws Exception {
+ givenCacheFactoryIsSynchronized();
+ givenCacheServerBridge();
+
+ blocking.execute(() -> {
+ try {
+ before.set(true);
+
+ // getNumSubscriptions -> getClientQueueSizes -> synchronizes on CacheFactory
+ cacheServerBridge.getNumSubscriptions();
+
+ } catch (CacheClosedException ignored) {
+ } finally {
+ after.set(true);
+ }
+ });
+
+ await().atMost(10, SECONDS).until(() -> before.get());
+
+ // if deadlocked, then this line will throw ConditionTimeoutException
+ await().atMost(10, SECONDS).until(() -> assertThat(after.get()).isTrue());
+ }
+
+ private void givenCacheFactoryIsSynchronized() {
+ synchronizing.execute(() -> {
+ synchronized (CacheFactory.class) {
+ try {
+ latch.await(2, MINUTES);
+ } catch (InterruptedException e) {
+ throw new AssertionError(e);
+ }
+ }
+ });
+ }
+
+ private void givenCacheServerBridge() {
+ cacheServerBridge = new CacheServerBridge(cache, cacheServer, acceptor, monitor);
+ }
+
+}