You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by hi...@apache.org on 2015/12/04 18:06:40 UTC

[1/2] incubator-geode git commit: With disable tcp(udp) now we don't throttle serial executor queue

Repository: incubator-geode
Updated Branches:
  refs/heads/develop 74e136401 -> e0bf6858a


With disable tcp(udp) now we don't throttle serial executor queue


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/fba68678
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/fba68678
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/fba68678

Branch: refs/heads/develop
Commit: fba68678d268cf6c142d2f9dc275a54c0d6974af
Parents: 74e1364
Author: Hitesh Khamesra <hi...@yahoo.com>
Authored: Fri Dec 4 09:02:17 2015 -0800
Committer: Hitesh Khamesra <hi...@yahoo.com>
Committed: Fri Dec 4 09:05:40 2015 -0800

----------------------------------------------------------------------
 .../distributed/internal/DistributionManager.java       | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/fba68678/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java
index 5d3bdce..7a9f7c0 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java
@@ -842,7 +842,10 @@ public class DistributionManager
             " SERIAL_QUEUE_SIZE_THROTTLE :" + SERIAL_QUEUE_SIZE_THROTTLE
         ); 
       }
-      this.serialQueuedExecutorPool = new SerialQueuedExecutorPool(this.threadGroup, this.stats);
+      //  when TCP/IP is disabled we can't throttle the serial queue or we run the risk of 
+      // distributed deadlock when we block the UDP reader thread
+      boolean throttlingDisabled = system.getConfig().getDisableTcp();
+      this.serialQueuedExecutorPool = new SerialQueuedExecutorPool(this.threadGroup, this.stats, throttlingDisabled);
     }
       
     {
@@ -4119,14 +4122,17 @@ public class DistributionManager
     DistributionStats stats;
     ThreadGroup threadGroup;
     
+    final boolean throttlingDisabled;
+    
     /**
      * Constructor.
      * @param group thread group to which the threads will belog to.
      * @param stats 
      */
-    SerialQueuedExecutorPool(ThreadGroup group, DistributionStats stats) {
+    SerialQueuedExecutorPool(ThreadGroup group, DistributionStats stats, boolean throttlingDisabled) {
       this.threadGroup = group;
       this.stats = stats;
+      this.throttlingDisabled = throttlingDisabled;
     }
 
     /*
@@ -4250,7 +4256,7 @@ public class DistributionManager
       
       BlockingQueue poolQueue;
       
-      if (SERIAL_QUEUE_BYTE_LIMIT == 0) {
+      if (SERIAL_QUEUE_BYTE_LIMIT == 0 || this.throttlingDisabled) {
         poolQueue = new OverflowQueueWithDMStats(stats.getSerialQueueHelper());
       } else {
         poolQueue = new ThrottlingMemLinkedQueueWithDMStats(SERIAL_QUEUE_BYTE_LIMIT, SERIAL_QUEUE_THROTTLE, SERIAL_QUEUE_SIZE_LIMIT, SERIAL_QUEUE_SIZE_THROTTLE, this.stats.getSerialQueueHelper());


[2/2] incubator-geode git commit: Added GMSJoinLeave tests

Posted by hi...@apache.org.
Added GMSJoinLeave tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/e0bf6858
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/e0bf6858
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/e0bf6858

Branch: refs/heads/develop
Commit: e0bf6858ab3db9c043853cef32cba0cffc1f1b68
Parents: fba6867
Author: Hitesh Khamesra <hi...@yahoo.com>
Authored: Fri Dec 4 09:04:18 2015 -0800
Committer: Hitesh Khamesra <hi...@yahoo.com>
Committed: Fri Dec 4 09:05:41 2015 -0800

----------------------------------------------------------------------
 .../membership/gms/membership/GMSJoinLeave.java |  17 ++-
 .../gms/membership/GMSJoinLeaveJUnitTest.java   | 149 +++++++++++++++++++
 2 files changed, 165 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e0bf6858/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
index 2f9c514..2986238 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
@@ -1476,12 +1476,24 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     return result;
   }
 
+  /***
+   * test method
+   * @return ViewReplyProcessor
+   */
+  protected ViewReplyProcessor getPrepareViewReplyProcessor() {
+    return prepareProcessor;
+  }
+  
+  protected boolean testPrepareProcessorWaiting(){
+    return prepareProcessor.isWaiting();
+  }
+  
   class ViewReplyProcessor {
     volatile int viewId = -1;
     final Set<InternalDistributedMember> notRepliedYet = new HashSet<>();
     NetView conflictingView;
     InternalDistributedMember conflictingViewSender;
-    boolean waiting;
+    volatile boolean waiting;
     final boolean isPrepareViewProcessor;
     final Set<InternalDistributedMember> pendingRemovals = new HashSet<>();
 
@@ -1498,6 +1510,9 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
       pendingRemovals.clear();
     }
 
+    boolean isWaiting(){
+      return waiting;
+    }
     synchronized void processPendingRequests(Set<InternalDistributedMember> pendingLeaves, Set<InternalDistributedMember> pendingRemovals) {
       // there's no point in waiting for members who have already
       // requested to leave or who have been declared crashed.

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e0bf6858/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
index e49e4ae..abc7a2f 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
@@ -41,8 +41,14 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.mockito.ArgumentCaptor;
+import org.mockito.internal.verification.Times;
+import org.mockito.internal.verification.api.VerificationData;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import org.mockito.verification.Timeout;
+import org.mockito.verification.VerificationMode;
+import org.mockito.verification.VerificationWithTimeout;
 
 import com.gemstone.gemfire.distributed.internal.DistributionConfig;
 import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
@@ -57,13 +63,17 @@ import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Manag
 import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Messenger;
 import com.gemstone.gemfire.distributed.internal.membership.gms.locator.FindCoordinatorResponse;
 import com.gemstone.gemfire.distributed.internal.membership.gms.membership.GMSJoinLeave.SearchState;
+import com.gemstone.gemfire.distributed.internal.membership.gms.membership.GMSJoinLeave.ViewReplyProcessor;
 import com.gemstone.gemfire.distributed.internal.membership.gms.messages.InstallViewMessage;
 import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinRequestMessage;
 import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinResponseMessage;
 import com.gemstone.gemfire.distributed.internal.membership.gms.messages.LeaveRequestMessage;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.NetworkPartitionMessage;
 import com.gemstone.gemfire.distributed.internal.membership.gms.messages.RemoveMemberMessage;
 import com.gemstone.gemfire.distributed.internal.membership.gms.messages.ViewAckMessage;
 import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.admin.remote.AddStatListenerResponse;
+import com.gemstone.gemfire.internal.admin.remote.StatListenerMessage;
 import com.gemstone.gemfire.security.AuthenticationFailedException;
 import com.gemstone.gemfire.test.junit.categories.UnitTest;
 
@@ -793,5 +803,144 @@ public class GMSJoinLeaveJUnitTest {
     b.run();
     verify(messenger).sendUnreliably(isA(InstallViewMessage.class));
   }
+  
+  private void installView(int viewId,InternalDistributedMember coordinator, List<InternalDistributedMember> members) throws IOException {
+    Set<InternalDistributedMember> shutdowns = new HashSet<>();
+    Set<InternalDistributedMember> crashes = new HashSet<>();
+    
+    when(services.getMessenger()).thenReturn(messenger);
+    
+    //prepare the view
+    NetView netView = new NetView(coordinator, viewId, members, shutdowns, crashes);
+    InstallViewMessage installViewMessage = new InstallViewMessage(netView, credentials, false);
+    gmsJoinLeave.processMessage(installViewMessage);
+   // verify(messenger).send(any(ViewAckMessage.class));
+  }
+  
+  @Test
+  public void testIgnoreoldView() throws Exception {
+    initMocks(false);
+    installView(3, mockMembers[0], createMemberList(mockMembers[0], mockMembers[1], mockMembers[2], gmsJoinLeaveMemberId, mockMembers[3]));
+    //now try to intall old view..
+    installView(1, mockMembers[0], createMemberList(mockMembers[0], mockMembers[1], mockMembers[2], gmsJoinLeaveMemberId, mockMembers[3]));
+    
+    assertFalse("Expected view id is 3 but found " + gmsJoinLeave.getView().getViewId(), gmsJoinLeave.getView().getViewId() == 1);
+  }
+  
+  @Test
+  public void testClearViewRequests() throws Exception {
+    try {
+    initMocks(false);
+    System.setProperty(GMSJoinLeave.BYPASS_DISCOVERY_PROPERTY, "true");
+    gmsJoinLeave.join();
+    gmsJoinLeave.processMessage(new JoinRequestMessage(mockMembers[0], mockMembers[0], credentials, -1));
+    int viewRequests = gmsJoinLeave.getViewRequests().size();
+    
+    assertTrue( "There should be 1 viewRequest but found " + viewRequests, viewRequests == 1);
+    Thread.sleep(2 * GMSJoinLeave.MEMBER_REQUEST_COLLECTION_INTERVAL);
+    
+    viewRequests = gmsJoinLeave.getViewRequests().size();
+    assertTrue( "There should be 0 viewRequest but found " + viewRequests, viewRequests == 0);
+    }finally {
+      System.getProperties().remove(GMSJoinLeave.BYPASS_DISCOVERY_PROPERTY);
+    }
+  }
+  
+  /***
+   * validating ViewReplyProcessor's memberSuspected, 
+   * processLeaveRequest, processRemoveRequest, processViewResponse method
+   */
+  @Test
+  public void testViewReplyProcessor() throws Exception {
+    try {
+      initMocks(false);
+      System.setProperty(GMSJoinLeave.BYPASS_DISCOVERY_PROPERTY, "true");
+      gmsJoinLeave.join();
+      Set<InternalDistributedMember> recips = new HashSet<>();
+      recips.add(mockMembers[0]);
+      recips.add(mockMembers[1]);
+      recips.add(mockMembers[2]);
+      recips.add(mockMembers[3]);
+      ViewReplyProcessor prepareProcessor = gmsJoinLeave.getPrepareViewReplyProcessor(); 
+      prepareProcessor.initialize( 1, recips);
+      assertTrue("Prepare processor should be waiting ", gmsJoinLeave.testPrepareProcessorWaiting());
+      
+      prepareProcessor.memberSuspected(gmsJoinLeaveMemberId, mockMembers[0]);
+      prepareProcessor.processLeaveRequest(mockMembers[1]);
+      prepareProcessor.processRemoveRequest(mockMembers[2]);
+      prepareProcessor.processViewResponse(1, mockMembers[3], null);
+      
+      assertFalse("Prepare processor should not be waiting ", gmsJoinLeave.testPrepareProcessorWaiting());
+      }finally {
+        System.getProperties().remove(GMSJoinLeave.BYPASS_DISCOVERY_PROPERTY);
+      }
+  }
+  
+  /***
+   * validating ViewReplyProcessor's processPendingRequests method
+   */
+  @Test
+  public void testViewReplyProcessor2() throws Exception {
+    try {
+      initMocks(false);
+      System.setProperty(GMSJoinLeave.BYPASS_DISCOVERY_PROPERTY, "true");
+      gmsJoinLeave.join();
+      Set<InternalDistributedMember> recips = new HashSet<>();
+      recips.add(mockMembers[0]);
+      recips.add(mockMembers[1]);
+      recips.add(mockMembers[2]);
+      recips.add(mockMembers[3]);
+      ViewReplyProcessor prepareProcessor = gmsJoinLeave.getPrepareViewReplyProcessor();
+      prepareProcessor.initialize(1, recips);
+      assertTrue("Prepare processor should be waiting ", gmsJoinLeave.testPrepareProcessorWaiting());
+      Set<InternalDistributedMember> pendingLeaves = new HashSet<>();
+      pendingLeaves.add(mockMembers[0]);
+      Set<InternalDistributedMember> pendingRemovals = new HashSet<>();
+      pendingRemovals.add(mockMembers[1]);
+      
+      prepareProcessor.processPendingRequests(pendingLeaves, pendingRemovals);
+      
+      prepareProcessor.processViewResponse(1, mockMembers[2], null);
+      prepareProcessor.processViewResponse(1, mockMembers[3], null);
+      
+      assertFalse("Prepare processor should not be waiting ", gmsJoinLeave.testPrepareProcessorWaiting());
+      }finally {
+        System.getProperties().remove(GMSJoinLeave.BYPASS_DISCOVERY_PROPERTY);
+      }
+  }
+  
+  @Test
+  public void testJoinResponseMsgWithBecomeCoordinator() throws Exception {
+    initMocks(false);
+    gmsJoinLeaveMemberId.getNetMember().setPreferredForCoordinator(false);
+    JoinRequestMessage reqMsg = new JoinRequestMessage(gmsJoinLeaveMemberId, mockMembers[0], null, 56734);
+    InternalDistributedMember ids = new InternalDistributedMember("localhost", 97898);
+    ids.getNetMember().setPreferredForCoordinator(true);
+    gmsJoinLeave.processMessage(reqMsg);
+    ArgumentCaptor<JoinResponseMessage> ac = new ArgumentCaptor<>();
+    verify(messenger).send(ac.capture());
+    
+    assertTrue("Should have asked for becoming a coordinator", ac.getValue().getBecomeCoordinator());
+  }
+  
+  @Test
+  public void testNetworkPartionMessage() throws Exception {
+    try {
+      initMocks(true);
+      System.setProperty(GMSJoinLeave.BYPASS_DISCOVERY_PROPERTY, "true");
+      gmsJoinLeave.join();
+      installView(1, gmsJoinLeaveMemberId, createMemberList(mockMembers[0], mockMembers[1], mockMembers[2], gmsJoinLeaveMemberId, mockMembers[3]));
+      for(int i = 1; i < 4; i++) {
+        RemoveMemberMessage msg = new RemoveMemberMessage(gmsJoinLeaveMemberId, mockMembers[i], "crashed");
+        msg.setSender(gmsJoinLeaveMemberId);
+        gmsJoinLeave.processMessage(msg);
+      }
+      Timeout to = new Timeout(2 * GMSJoinLeave.MEMBER_REQUEST_COLLECTION_INTERVAL, new Times(1));
+      verify(messenger, to).send( isA(NetworkPartitionMessage.class));
+                 
+    }finally {
+      System.getProperties().remove(GMSJoinLeave.BYPASS_DISCOVERY_PROPERTY);
+    }    
+  }
 }