You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2015/11/17 21:33:23 UTC

incubator-geode git commit: GEODE-77: initiation of member verification on loss of shared/ordered connection

Repository: incubator-geode
Updated Branches:
  refs/heads/feature/GEODE-77 53c199bf5 -> fd98f62f8


GEODE-77: initiation of member verification on loss of shared/ordered connection

The initiation of member verification wasn't working correctly.  It wasn't
being triggered most of the time and when it was the check passed because
it was done so soon after the other member crashed that the timestamp
checks in GMSJoinLeave didn't think enough time had passed to declare the
member dead.  This changes the check to request a heartbeat first.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/fd98f62f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/fd98f62f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/fd98f62f

Branch: refs/heads/feature/GEODE-77
Commit: fd98f62f8a1d2eaa41c001f04cc95751f1bea636
Parents: 53c199b
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Tue Nov 17 12:32:50 2015 -0800
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Tue Nov 17 12:32:50 2015 -0800

----------------------------------------------------------------------
 .../internal/DistributionMessage.java           |  2 +-
 .../membership/gms/messenger/Transport.java     |  4 +-
 .../gms/mgr/GMSMembershipManager.java           | 10 +--
 .../gemfire/internal/cache/LocalRegion.java     |  2 +-
 .../gemfire/internal/tcp/Connection.java        | 22 +++---
 .../gemfire/internal/tcp/ConnectionTable.java   |  2 +-
 .../gms/membership/GMSJoinLeaveJUnitTest.java   | 23 +++++++
 .../internal/tcp/ConnectionJUnitTest.java       | 71 ++++++++++++++++++++
 8 files changed, 118 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/fd98f62f/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionMessage.java
index 551e47d..23f9dee 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionMessage.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionMessage.java
@@ -230,7 +230,7 @@ public abstract class DistributionMessage
     return this.multicast;
   }
   /**
-   * Return true of this message should be sent through JGroups instead of the
+   * Return true of this message should be sent via UDP instead of the
    * direct-channel.  This is typically only done for messages that are
    * broadcast to the full membership set.
    */

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/fd98f62f/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/Transport.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/Transport.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/Transport.java
index 8fcac7d..b25b363 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/Transport.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/Transport.java
@@ -72,7 +72,9 @@ public class Transport extends UDP {
         Thread.currentThread().interrupt(); // let someone else handle the interrupt
     }
     catch(SocketException e) {
-      log.error("Exception caught while sending message", e);
+      if (!stack.getChannel().isClosed()) {
+        log.error("Exception caught while sending message", e);
+      }
 //        log.trace(Util.getMessage("SendFailure"),
 //                  local_addr, (dest == null? "cluster" : dest), msg.size(), e.toString(), msg.printHeaders());
     }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/fd98f62f/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
index 79af74e..977d090 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
@@ -1891,20 +1891,20 @@ public class GMSMembershipManager implements MembershipManager, Manager
   
   public void suspectMembers(Set members, String reason) {
     for (Iterator it=members.iterator(); it.hasNext(); ) {
-      suspectMember((DistributedMember)it.next(), reason);
+      verifyMember((DistributedMember)it.next(), reason);
     }
   }
   
   public void suspectMember(DistributedMember mbr, String reason) {
     if (!this.shutdownInProgress && !this.shutdownMembers.containsKey(mbr)) {
-      this.services.getHealthMonitor().suspect((InternalDistributedMember)mbr, reason);
+      verifyMember(mbr, reason);
     }
   }
 
   /* like memberExists() this checks to see if the given ID is in the current
-   * membership view.  If it is in the view though we try to connect to it
-   * port to see if it's still around.  If we can't then
-   * suspect processing is initiated on the member with the given reason string.
+   * membership view.  If it is in the view though we try to contact it
+   * to see if it's still around.  If we can't contact it then
+   * suspect messages are sent to initiate final checks
    * @param mbr the member to verify
    * @param reason why the check is being done (must not be blank/null)
    * @return true if the member checks out

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/fd98f62f/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
index 782be28..5232dc9 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
@@ -4664,7 +4664,7 @@ public class LocalRegion extends AbstractRegion
               continue;
             } else {
               if(logger.isDebugEnabled()) {
-                logger.debug("refreshEntries key={} value={}", currentKey, entry);
+                logger.debug("refreshEntries key={} value={} version={}", currentKey, entry, tag);
               }
               if (tag == null) { // no version checks
                 localDestroyNoCallbacks(currentKey);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/fd98f62f/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
index b6b1f8b..c592133 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
@@ -557,7 +557,7 @@ public class Connection implements Runnable {
   /** creates a connection that we accepted (it was initiated by
    * an explicit connect being done on the other side).
    */
-  private Connection(ConnectionTable t, Socket s)
+  protected Connection(ConnectionTable t, Socket s)
     throws IOException, ConnectionException
   {
     if (t == null) {
@@ -1745,6 +1745,7 @@ public class Connection implements Runnable {
       if (logger.isDebugEnabled()) {
         logger.debug("Stopping {} for {}", p2pReaderName(), remoteId);
       }
+      initiateSuspicionIfSharedUnordered();
       if (this.isReceiver) {
         if (!this.sharedResource) {
           this.owner.owner.stats.incThreadOwnedReceivers(-1L, dominoCount.get());
@@ -1856,7 +1857,6 @@ public class Connection implements Runnable {
             continue;
           }
           if (amt < 0) {
-            initiateSuspicionIfSharedUnordered();
             this.readerShuttingDown = true;
             try {
               requestClose("SocketChannel.read returned EOF");
@@ -1894,14 +1894,12 @@ public class Connection implements Runnable {
         }
         catch (ClosedChannelException e) {
           this.readerShuttingDown = true;
-          initiateSuspicionIfSharedUnordered();
           try { 
             requestClose(LocalizedStrings.Connection_CLOSEDCHANNELEXCEPTION_IN_CHANNEL_READ_0.toLocalizedString(e));
           } catch (Exception ex) {}
           return;
         }
         catch (IOException e) {
-          initiateSuspicionIfSharedUnordered();
           if (! isSocketClosed()
                 && !"Socket closed".equalsIgnoreCase(e.getMessage()) // needed for Solaris jdk 1.4.2_08
                 ) {
@@ -1925,7 +1923,6 @@ public class Connection implements Runnable {
           if (!stopped && ! isSocketClosed() ) {
             logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_0_EXCEPTION_IN_CHANNEL_READ, p2pReaderName()), e);
           }
-          initiateSuspicionIfSharedUnordered();
           this.readerShuttingDown = true;
           try { 
             requestClose(LocalizedStrings.Connection_0_EXCEPTION_IN_CHANNEL_READ.toLocalizedString(e)); 
@@ -1950,7 +1947,7 @@ public class Connection implements Runnable {
   private void initiateSuspicionIfSharedUnordered() {
     if (this.isReceiver && this.handshakeRead && !this.preserveOrder && this.sharedResource) {
       if (this.owner.getConduit().getCancelCriterion().cancelInProgress() == null) {
-        String reason = "member shut down shared unordered connection";
+        String reason = "member unexpectedly shut down shared, unordered connection";
         this.owner.getDM().getMembershipManager().suspectMember(this.getRemoteAddress(),
             reason);
       }
@@ -2091,7 +2088,6 @@ public class Connection implements Runnable {
         }
         int len = 0;
         if (readFully(input, lenbytes, lenbytes.length) < 0) {
-          initiateSuspicionIfSharedUnordered();
           stopped = true;
           continue;
         }
@@ -2448,7 +2444,6 @@ public class Connection implements Runnable {
         this.stopped = true;
       }
       catch (IOException io) {
-        initiateSuspicionIfSharedUnordered();
         boolean closed = isSocketClosed()
                 || "Socket closed".equalsIgnoreCase(io.getMessage()); // needed for Solaris jdk 1.4.2_08
         if (!closed) {
@@ -2482,7 +2477,6 @@ public class Connection implements Runnable {
         if (!stopped && !(e instanceof InterruptedException) ) {
           logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_0_EXCEPTION_RECEIVED, p2pReaderName()), e);
         }
-        initiateSuspicionIfSharedUnordered();
         if (isSocketClosed()) {
           stopped = true;
         }
@@ -2634,6 +2628,16 @@ public class Connection implements Runnable {
     return origSocketInUse;
   }
   
+  /**
+   * For testing we want to configure the connection without having
+   * to read a handshake
+   */
+  protected void setSharedUnorderedForTest() {
+    this.preserveOrder = false;
+    this.sharedResource = true;
+    this.handshakeRead = true;
+  }
+  
 
   /** ensure that a task is running to monitor transmission and reading of acks */
   public synchronized void scheduleAckTimeouts() {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/fd98f62f/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
index fc14e01..c866797 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
@@ -690,7 +690,7 @@ public class ConnectionTable  {
     }
   }
 
-  protected final TCPConduit getConduit() {
+  protected TCPConduit getConduit() {
     return owner;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/fd98f62f/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
index 562fd70..f846916 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
@@ -3,6 +3,7 @@ package com.gemstone.gemfire.distributed.internal.membership.gms.membership;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.isA;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
@@ -20,6 +21,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.Timer;
 
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -116,6 +118,14 @@ public class GMSJoinLeaveJUnitTest {
     gmsJoinLeave.started();
   }
   
+  @After
+  public void tearDown() throws Exception {
+    if (gmsJoinLeave != null) {
+      gmsJoinLeave.stop();
+      gmsJoinLeave.stopped();
+    }
+  }
+  
   @Test
   public void testFindCoordinatorInView() throws Exception {
     initMocks();
@@ -754,5 +764,18 @@ public class GMSJoinLeaveJUnitTest {
     Assert.assertFalse(nextView.getMembers().contains(mockMembers[2]));
     assertTrue(nextView.getMembers().contains(mockMembers[3]));
   }
+  
+  @Test
+  public void testViewBroadcaster() throws Exception {
+    initMocks();
+    List<InternalDistributedMember> members = new ArrayList<>(Arrays.asList(mockMembers));
+    gmsJoinLeaveMemberId.setVmViewId(1);
+    members.add(gmsJoinLeaveMemberId);
+    prepareAndInstallView(gmsJoinLeaveMemberId, members);
+    gmsJoinLeave.becomeCoordinatorForTest();
+    GMSJoinLeave.ViewBroadcaster b = gmsJoinLeave.new ViewBroadcaster();
+    b.run();
+    verify(messenger).sendUnreliably(isA(InstallViewMessage.class));
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/fd98f62f/gemfire-core/src/test/java/com/gemstone/gemfire/internal/tcp/ConnectionJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/tcp/ConnectionJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/tcp/ConnectionJUnitTest.java
new file mode 100755
index 0000000..f19a9aa
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/tcp/ConnectionJUnitTest.java
@@ -0,0 +1,71 @@
+package com.gemstone.gemfire.internal.tcp;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.verify;
+
+import java.io.InputStream;
+import java.net.Socket;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.CancelCriterion;
+import com.gemstone.gemfire.distributed.internal.DM;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.distributed.internal.membership.MembershipManager;
+import com.gemstone.gemfire.internal.SocketCloser;
+import com.gemstone.gemfire.internal.SocketCreator;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class ConnectionJUnitTest {
+
+  /**
+   * Test whether suspicion is raised about a member that
+   * closes its shared/unordered TCPConduit connection
+   */
+  @Test
+  public void testSuspicionRaised() throws Exception {
+    // this test has to create a lot of mocks because Connection
+    // uses a lot of objects
+    
+    // mock the socket
+    ConnectionTable table = mock(ConnectionTable.class);
+    DM distMgr = mock(DM.class);
+    MembershipManager membership = mock(MembershipManager.class);
+    TCPConduit conduit = mock(TCPConduit.class);
+
+    // mock the connection table and conduit
+    
+    when(table.getConduit()).thenReturn(conduit);
+
+    CancelCriterion stopper = mock(CancelCriterion.class);
+    when(stopper.cancelInProgress()).thenReturn(null);
+    when(conduit.getCancelCriterion()).thenReturn(stopper);
+
+    when(conduit.getId()).thenReturn(new Stub(SocketCreator.getLocalHost(), 10337, 1));
+    
+    // NIO can't be mocked because SocketChannel has a final method that
+    // is used by Connection - configureBlocking
+    when(conduit.useNIO()).thenReturn(false);
+    
+    // mock the distribution manager and membership manager
+    when(distMgr.getMembershipManager()).thenReturn(membership);
+    when(conduit.getDM()).thenReturn(distMgr);
+    when(table.getDM()).thenReturn(distMgr);
+    SocketCloser closer = mock(SocketCloser.class);
+    when(table.getSocketCloser()).thenReturn(closer);
+
+    InputStream instream = mock(InputStream.class);
+    when(instream.read()).thenReturn(-1);
+    Socket socket = mock(Socket.class);
+    when(socket.getInputStream()).thenReturn(instream);
+    
+    Connection conn = new Connection(table, socket);
+    conn.setSharedUnorderedForTest();
+    conn.run();
+    verify(membership).suspectMember(any(InternalDistributedMember.class), any(String.class));
+  }
+}