You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2016/04/21 19:17:09 UTC
[34/50] [abbrv] incubator-geode git commit: GEODE-1207: Stopped WAN
Locator Discovery Thread when locator is stopped
GEODE-1207: Stopped WAN Locator Discovery Thread when locator is stopped
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/7f306ffe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/7f306ffe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/7f306ffe
Branch: refs/heads/feature/GEODE-17-2
Commit: 7f306ffee84ef31d5a2a1326b8967581c5f6f895
Parents: ddee87f
Author: Barry Oglesby <bo...@pivotal.io>
Authored: Tue Apr 19 13:19:25 2016 -0700
Committer: Barry Oglesby <bo...@pivotal.io>
Committed: Wed Apr 20 11:06:46 2016 -0700
----------------------------------------------------------------------
.../distributed/internal/InternalLocator.java | 13 ++++--
.../internal/WanLocatorDiscoverer.java | 4 ++
.../internal/locator/wan/LocatorDiscovery.java | 16 +++++---
.../locator/wan/WanLocatorDiscovererImpl.java | 19 +++++++--
.../wan/misc/WanAutoDiscoveryDUnitTest.java | 42 +++++++++++++++++++-
5 files changed, 81 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7f306ffe/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalLocator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalLocator.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalLocator.java
index 2aae84a..a5d269b 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalLocator.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalLocator.java
@@ -170,6 +170,8 @@ public class InternalLocator extends Locator implements ConnectListener {
private DistributionConfigImpl config;
private LocatorMembershipListener locatorListener;
+
+ private WanLocatorDiscoverer locatorDiscoverer;
/** whether the locator was stopped during forced-disconnect processing but a reconnect will occur */
private volatile boolean stoppedForReconnect;
@@ -844,9 +846,9 @@ public class InternalLocator extends Locator implements ConnectListener {
InternalDistributedSystem.addConnectListener(this);
}
- WanLocatorDiscoverer s = WANServiceProvider.createLocatorDiscoverer();
- if(s != null) {
- s.discover(this.port, config, locatorListener);
+ this.locatorDiscoverer = WANServiceProvider.createLocatorDiscoverer();
+ if(this.locatorDiscoverer != null) {
+ this.locatorDiscoverer.discover(this.port, config, locatorListener);
}
}
@@ -951,6 +953,11 @@ public class InternalLocator extends Locator implements ConnectListener {
return;
}
+ if (this.locatorDiscoverer != null) {
+ this.locatorDiscoverer.stop();
+ this.locatorDiscoverer = null;
+ }
+
if (this.server.isAlive()) {
logger.info(LocalizedMessage.create(LocalizedStrings.InternalLocator_STOPPING__0, this));
try {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7f306ffe/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/WanLocatorDiscoverer.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/WanLocatorDiscoverer.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/WanLocatorDiscoverer.java
index 4ad206d..450d3a1 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/WanLocatorDiscoverer.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/WanLocatorDiscoverer.java
@@ -28,4 +28,8 @@ public interface WanLocatorDiscoverer {
*/
void discover(int port, DistributionConfigImpl config,
LocatorMembershipListener locatorListener);
+
+ void stop();
+
+ boolean isStopped();
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7f306ffe/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorDiscovery.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorDiscovery.java b/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorDiscovery.java
index cae5030..168dc63 100644
--- a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorDiscovery.java
+++ b/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorDiscovery.java
@@ -41,7 +41,9 @@ import com.gemstone.gemfire.cache.client.internal.locator.wan.LocatorMembershipL
public class LocatorDiscovery{
private static final Logger logger = LogService.getLogger();
-
+
+ private WanLocatorDiscoverer discoverer;
+
private DistributionLocatorId locatorId;
private LocatorMembershipListener locatorListener;
@@ -57,8 +59,9 @@ public class LocatorDiscovery{
public static final int WAN_LOCATOR_PING_INTERVAL = Integer.getInteger(
"WANLocator.PING_INTERVAL", 10000).intValue();
- public LocatorDiscovery(DistributionLocatorId locotor,RemoteLocatorJoinRequest request,
+ public LocatorDiscovery(WanLocatorDiscoverer discoverer, DistributionLocatorId locotor,RemoteLocatorJoinRequest request,
LocatorMembershipListener locatorListener) {
+ this.discoverer = discoverer;
this.locatorId = locotor;
this.request = request;
this.locatorListener = locatorListener;
@@ -119,11 +122,14 @@ public class LocatorDiscovery{
exchangeRemoteLocators();
}
}
-
+
+ private WanLocatorDiscoverer getDiscoverer() {
+ return this.discoverer;
+ }
private void exchangeLocalLocators() {
int retryAttempt = 1;
- while (true) {
+ while (!getDiscoverer().isStopped()) {
try {
RemoteLocatorJoinResponse response = (RemoteLocatorJoinResponse)TcpClient
.requestToServer(locatorId.getHost(), locatorId.getPort(), request,
@@ -169,7 +175,7 @@ public class LocatorDiscovery{
public void exchangeRemoteLocators() {
int retryAttempt = 1;
DistributionLocatorId remoteLocator = this.locatorId;
- while (true) {
+ while (!getDiscoverer().isStopped()) {
RemoteLocatorJoinResponse response;
try {
response = (RemoteLocatorJoinResponse)TcpClient
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7f306ffe/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/WanLocatorDiscovererImpl.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/WanLocatorDiscovererImpl.java b/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/WanLocatorDiscovererImpl.java
index 23d21aa..f031143 100644
--- a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/WanLocatorDiscovererImpl.java
+++ b/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/WanLocatorDiscovererImpl.java
@@ -39,6 +39,8 @@ import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
public class WanLocatorDiscovererImpl implements WanLocatorDiscoverer{
private static final Logger logger = LogService.getLogger();
+
+ private volatile boolean stopped = false;
private ExecutorService _executor;
@@ -65,8 +67,17 @@ public class WanLocatorDiscovererImpl implements WanLocatorDiscoverer{
exchangeRemoteLocators(port, config, locatorListener);
this._executor.shutdown();
}
-
-
+
+ @Override
+ public void stop() {
+ this.stopped = true;
+ }
+
+ @Override
+ public boolean isStopped() {
+ return this.stopped;
+ }
+
/**
* For WAN 70 Exchange the locator information within the distributed system
*
@@ -90,7 +101,7 @@ public class WanLocatorDiscovererImpl implements WanLocatorDiscoverer{
DistributionLocatorId localLocatorId = new DistributionLocatorId(
locatorsOnThisVM.nextToken());
if (!locatorId.equals(localLocatorId)) {
- LocatorDiscovery localDiscovery = new LocatorDiscovery(localLocatorId, request, locatorListener);
+ LocatorDiscovery localDiscovery = new LocatorDiscovery(this, localLocatorId, request, locatorListener);
LocatorDiscovery.LocalLocatorDiscovery localLocatorDiscovery = localDiscovery.new LocalLocatorDiscovery();
this._executor.execute(localLocatorDiscovery);
}
@@ -112,7 +123,7 @@ public class WanLocatorDiscovererImpl implements WanLocatorDiscoverer{
while (remoteLocators.hasMoreTokens()) {
DistributionLocatorId remoteLocatorId = new DistributionLocatorId(
remoteLocators.nextToken());
- LocatorDiscovery localDiscovery = new LocatorDiscovery(remoteLocatorId,
+ LocatorDiscovery localDiscovery = new LocatorDiscovery(this, remoteLocatorId,
request, locatorListener);
LocatorDiscovery.RemoteLocatorDiscovery remoteLocatorDiscovery = localDiscovery.new RemoteLocatorDiscovery();
this._executor.execute(remoteLocatorDiscovery);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7f306ffe/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/WanAutoDiscoveryDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/WanAutoDiscoveryDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/WanAutoDiscoveryDUnitTest.java
index 73c7e1d..b808403 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/WanAutoDiscoveryDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/WanAutoDiscoveryDUnitTest.java
@@ -26,6 +26,7 @@ import java.util.Set;
import com.gemstone.gemfire.GemFireConfigException;
import com.gemstone.gemfire.IncompatibleSystemException;
import com.gemstone.gemfire.internal.AvailablePortHelper;
+import com.gemstone.gemfire.internal.OSProcess;
import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
import com.gemstone.gemfire.test.dunit.Assert;
import com.gemstone.gemfire.test.dunit.AsyncInvocation;
@@ -464,5 +465,44 @@ public class WanAutoDiscoveryDUnitTest extends WANTestBase {
vm6.invoke(() -> WANTestBase.checkAllSiteMetaData( dsVsPort ));
}
-
+
+ public void testNoThreadLeftBehind() {
+ // Get active thread count before test
+ int activeThreadCountBefore = Thread.activeCount();
+
+ // Start / stop locator
+ int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+ WANTestBase.createFirstRemoteLocator( 2, port );
+ disconnectFromDS();
+
+ // Validate active thread count after test
+
+ // Wait up to 60 seconds for all threads started during the test
+ // (including the 'WAN Locator Discovery Thread') to stop
+ // Note: Awaitility is not being used since it adds threads
+ for (int i=0; i<60; i++) {
+ if (Thread.activeCount() > activeThreadCountBefore) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ fail("Caught the following exception waiting for threads to stop: " + e);
+ }
+ } else {
+ break;
+ }
+ }
+
+ // Fail if the active thread count after the test is greater than the active thread count before the test
+ if (Thread.activeCount() > activeThreadCountBefore) {
+ OSProcess.printStacks(0);
+ StringBuilder builder = new StringBuilder();
+ builder
+ .append("Expected ")
+ .append(activeThreadCountBefore)
+ .append(" threads but found ")
+ .append(Thread.activeCount())
+ .append(". Check log file for a thread dump.");
+ fail(builder.toString());
+ }
+ }
}