You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2016/04/21 19:17:09 UTC

[34/50] [abbrv] incubator-geode git commit: GEODE-1207: Stopped WAN Locator Discovery Thread when locator is stopped

GEODE-1207: Stopped WAN Locator Discovery Thread when locator is stopped


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/7f306ffe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/7f306ffe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/7f306ffe

Branch: refs/heads/feature/GEODE-17-2
Commit: 7f306ffee84ef31d5a2a1326b8967581c5f6f895
Parents: ddee87f
Author: Barry Oglesby <bo...@pivotal.io>
Authored: Tue Apr 19 13:19:25 2016 -0700
Committer: Barry Oglesby <bo...@pivotal.io>
Committed: Wed Apr 20 11:06:46 2016 -0700

----------------------------------------------------------------------
 .../distributed/internal/InternalLocator.java   | 13 ++++--
 .../internal/WanLocatorDiscoverer.java          |  4 ++
 .../internal/locator/wan/LocatorDiscovery.java  | 16 +++++---
 .../locator/wan/WanLocatorDiscovererImpl.java   | 19 +++++++--
 .../wan/misc/WanAutoDiscoveryDUnitTest.java     | 42 +++++++++++++++++++-
 5 files changed, 81 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7f306ffe/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalLocator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalLocator.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalLocator.java
index 2aae84a..a5d269b 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalLocator.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalLocator.java
@@ -170,6 +170,8 @@ public class InternalLocator extends Locator implements ConnectListener {
   private DistributionConfigImpl config;
   
   private LocatorMembershipListener locatorListener;
+
+  private WanLocatorDiscoverer locatorDiscoverer;
   
   /** whether the locator was stopped during forced-disconnect processing but a reconnect will occur */
   private volatile boolean stoppedForReconnect;
@@ -844,9 +846,9 @@ public class InternalLocator extends Locator implements ConnectListener {
       InternalDistributedSystem.addConnectListener(this);
     }
     
-    WanLocatorDiscoverer s = WANServiceProvider.createLocatorDiscoverer();
-    if(s != null) {
-      s.discover(this.port, config, locatorListener);
+    this.locatorDiscoverer = WANServiceProvider.createLocatorDiscoverer();
+    if(this.locatorDiscoverer != null) {
+      this.locatorDiscoverer.discover(this.port, config, locatorListener);
     }
   }
   
@@ -951,6 +953,11 @@ public class InternalLocator extends Locator implements ConnectListener {
       return;
     }
 
+    if (this.locatorDiscoverer != null) {
+      this.locatorDiscoverer.stop();
+      this.locatorDiscoverer = null;
+    }
+
     if (this.server.isAlive()) {
       logger.info(LocalizedMessage.create(LocalizedStrings.InternalLocator_STOPPING__0, this));
       try {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7f306ffe/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/WanLocatorDiscoverer.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/WanLocatorDiscoverer.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/WanLocatorDiscoverer.java
index 4ad206d..450d3a1 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/WanLocatorDiscoverer.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/WanLocatorDiscoverer.java
@@ -28,4 +28,8 @@ public interface WanLocatorDiscoverer {
    */
   void discover(int port, DistributionConfigImpl config,
       LocatorMembershipListener locatorListener);
+
+  void stop();
+
+  boolean isStopped();
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7f306ffe/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorDiscovery.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorDiscovery.java b/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorDiscovery.java
index cae5030..168dc63 100644
--- a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorDiscovery.java
+++ b/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/LocatorDiscovery.java
@@ -41,7 +41,9 @@ import com.gemstone.gemfire.cache.client.internal.locator.wan.LocatorMembershipL
 public class LocatorDiscovery{
 
   private static final Logger logger = LogService.getLogger();
-  
+
+  private WanLocatorDiscoverer discoverer;
+
   private DistributionLocatorId locatorId;
   
   private LocatorMembershipListener locatorListener;
@@ -57,8 +59,9 @@ public class LocatorDiscovery{
   public static final int WAN_LOCATOR_PING_INTERVAL = Integer.getInteger(
       "WANLocator.PING_INTERVAL", 10000).intValue();
 
-  public LocatorDiscovery(DistributionLocatorId locotor,RemoteLocatorJoinRequest request,
+  public LocatorDiscovery(WanLocatorDiscoverer discoverer, DistributionLocatorId locotor,RemoteLocatorJoinRequest request,
       LocatorMembershipListener locatorListener) {
+    this.discoverer = discoverer;
     this.locatorId = locotor;
     this.request = request; 
     this.locatorListener = locatorListener;
@@ -119,11 +122,14 @@ public class LocatorDiscovery{
       exchangeRemoteLocators();
     }
   }
-  
+
+  private WanLocatorDiscoverer getDiscoverer() {
+    return this.discoverer;
+  }
   
   private void exchangeLocalLocators() {
     int retryAttempt = 1;
-    while (true) {
+    while (!getDiscoverer().isStopped()) {
       try {
         RemoteLocatorJoinResponse response = (RemoteLocatorJoinResponse)TcpClient
             .requestToServer(locatorId.getHost(), locatorId.getPort(), request,
@@ -169,7 +175,7 @@ public class LocatorDiscovery{
   public void exchangeRemoteLocators() {
     int retryAttempt = 1;
     DistributionLocatorId remoteLocator = this.locatorId;
-    while (true) {
+    while (!getDiscoverer().isStopped()) {
       RemoteLocatorJoinResponse response;
       try {
         response = (RemoteLocatorJoinResponse)TcpClient

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7f306ffe/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/WanLocatorDiscovererImpl.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/WanLocatorDiscovererImpl.java b/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/WanLocatorDiscovererImpl.java
index 23d21aa..f031143 100644
--- a/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/WanLocatorDiscovererImpl.java
+++ b/geode-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/locator/wan/WanLocatorDiscovererImpl.java
@@ -39,6 +39,8 @@ import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
 public class WanLocatorDiscovererImpl implements WanLocatorDiscoverer{
 
   private static final Logger logger = LogService.getLogger();
+
+  private volatile boolean stopped = false;
   
   private ExecutorService _executor;
   
@@ -65,8 +67,17 @@ public class WanLocatorDiscovererImpl implements WanLocatorDiscoverer{
     exchangeRemoteLocators(port, config, locatorListener);
     this._executor.shutdown();
   }
-  
-  
+
+  @Override
+  public void stop() {
+    this.stopped = true;
+  }
+
+  @Override
+  public boolean isStopped() {
+    return this.stopped;
+  }
+
   /**
    * For WAN 70 Exchange the locator information within the distributed system
    * 
@@ -90,7 +101,7 @@ public class WanLocatorDiscovererImpl implements WanLocatorDiscoverer{
       DistributionLocatorId localLocatorId = new DistributionLocatorId(
           locatorsOnThisVM.nextToken());
       if (!locatorId.equals(localLocatorId)) {
-        LocatorDiscovery localDiscovery = new LocatorDiscovery(localLocatorId, request, locatorListener);
+        LocatorDiscovery localDiscovery = new LocatorDiscovery(this, localLocatorId, request, locatorListener);
         LocatorDiscovery.LocalLocatorDiscovery localLocatorDiscovery = localDiscovery.new LocalLocatorDiscovery();
         this._executor.execute(localLocatorDiscovery);
       }
@@ -112,7 +123,7 @@ public class WanLocatorDiscovererImpl implements WanLocatorDiscoverer{
       while (remoteLocators.hasMoreTokens()) {
         DistributionLocatorId remoteLocatorId = new DistributionLocatorId(
             remoteLocators.nextToken());
-        LocatorDiscovery localDiscovery = new LocatorDiscovery(remoteLocatorId,
+        LocatorDiscovery localDiscovery = new LocatorDiscovery(this, remoteLocatorId,
             request, locatorListener);
         LocatorDiscovery.RemoteLocatorDiscovery remoteLocatorDiscovery = localDiscovery.new RemoteLocatorDiscovery();
         this._executor.execute(remoteLocatorDiscovery);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7f306ffe/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/WanAutoDiscoveryDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/WanAutoDiscoveryDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/WanAutoDiscoveryDUnitTest.java
index 73c7e1d..b808403 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/WanAutoDiscoveryDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/WanAutoDiscoveryDUnitTest.java
@@ -26,6 +26,7 @@ import java.util.Set;
 import com.gemstone.gemfire.GemFireConfigException;
 import com.gemstone.gemfire.IncompatibleSystemException;
 import com.gemstone.gemfire.internal.AvailablePortHelper;
+import com.gemstone.gemfire.internal.OSProcess;
 import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
 import com.gemstone.gemfire.test.dunit.Assert;
 import com.gemstone.gemfire.test.dunit.AsyncInvocation;
@@ -464,5 +465,44 @@ public class WanAutoDiscoveryDUnitTest extends WANTestBase {
     vm6.invoke(() -> WANTestBase.checkAllSiteMetaData( dsVsPort ));
         
   }
-  
+
+  public void testNoThreadLeftBehind() {
+    // Get active thread count before test
+    int activeThreadCountBefore = Thread.activeCount();
+
+    // Start / stop locator
+    int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+    WANTestBase.createFirstRemoteLocator( 2, port );
+    disconnectFromDS();
+
+    // Validate active thread count after test
+
+    // Wait up to 60 seconds for all threads started during the test
+    // (including the 'WAN Locator Discovery Thread') to stop
+    // Note: Awaitility is not being used since it adds threads
+    for (int i=0; i<60; i++) {
+      if (Thread.activeCount() > activeThreadCountBefore) {
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          fail("Caught the following exception waiting for threads to stop: " + e);
+        }
+      } else {
+        break;
+      }
+    }
+
+    // Fail if the active thread count after the test is greater than the active thread count before the test
+    if (Thread.activeCount() > activeThreadCountBefore) {
+      OSProcess.printStacks(0);
+      StringBuilder builder = new StringBuilder();
+      builder
+          .append("Expected ")
+          .append(activeThreadCountBefore)
+          .append(" threads but found ")
+          .append(Thread.activeCount())
+          .append(". Check log file for a thread dump.");
+      fail(builder.toString());
+    }
+  }
 }