You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2020/06/26 21:32:12 UTC

[geode] branch support/1.12 updated (17c26fe -> 0f95d0b)

This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a change to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git.


    from 17c26fe  First pass at switching to liberica jdk. (#5312)
     new 5d2181d  GEODE-8195: ConcurrentModificationException from LocatorMembershipListenerImpl (#5306)
     new 0f95d0b  fixed compilation error in cherry-pick

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../locator/wan/LocatorMembershipListenerImpl.java | 14 ++--
 .../locator/wan/LocatorMembershipListenerTest.java | 88 ++++++++++++++++------
 2 files changed, 72 insertions(+), 30 deletions(-)


[geode] 02/02: fixed compilation error in cherry-pick

Posted by bs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 0f95d0b4fac88816c80204f7539784d9db88e56a
Author: Bruce Schuchardt <br...@vmware.com>
AuthorDate: Fri Jun 26 14:30:26 2020 -0700

    fixed compilation error in cherry-pick
---
 .../client/internal/locator/wan/LocatorMembershipListenerTest.java      | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/geode-wan/src/test/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerTest.java b/geode-wan/src/test/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerTest.java
index b2b3c14..fabb128 100644
--- a/geode-wan/src/test/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerTest.java
@@ -356,7 +356,7 @@ public class LocatorMembershipListenerTest {
     // The sendMessage loop in the listener will try to send 4 messages. Two to the remoteLocators
     // and two to the joiningLocator. The retry loop will try to send the messages again and
     // fail (4 more messages) and then it will succeed (4 more messages, for a total of 12).
-    verify(tcpClient, times(12)).requestToServer(isA(HostAndPort.class),
+    verify(tcpClient, times(12)).requestToServer(isA(InetSocketAddress.class),
         isA(LocatorJoinMessage.class), isA(Integer.class), isA(Boolean.class));
   }
 


[geode] 01/02: GEODE-8195: ConcurrentModificationException from LocatorMembershipListenerImpl (#5306)

Posted by bs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 5d2181d6a259035aaa77a73daa0dcbacefb56038
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Fri Jun 26 07:47:23 2020 -0700

    GEODE-8195: ConcurrentModificationException from LocatorMembershipListenerImpl (#5306)
    
    I've replaced the "for" loop using an implicit Iterator with one using an
    explicit Iterator so that its safe "remove()" method can be used.  The
    Iterator method is stated as being the only safe way to modify the
    collection while iterating over its contents.
    
    I've also modified a test to validate the fix.  The test forces a
    failure to send two messages to an address.  The failures are then
    handled in the code that was throwing the
    ConcurrentModificationException and, since there are two failures,
    it causes two removals to be performedon the failedMessages collection.
    
    (cherry picked from commit 3cda1b1a213f2195ff0b97361883f6a6c3972b14)
---
 .../locator/wan/LocatorMembershipListenerImpl.java | 14 ++--
 .../locator/wan/LocatorMembershipListenerTest.java | 88 ++++++++++++++++------
 2 files changed, 72 insertions(+), 30 deletions(-)

diff --git a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java
index 31315ed..434231d 100644
--- a/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java
+++ b/geode-wan/src/main/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerImpl.java
@@ -18,6 +18,7 @@ package org.apache.geode.cache.client.internal.locator.wan;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -291,15 +292,15 @@ public class LocatorMembershipListenerImpl implements LocatorMembershipListener
     public void run() {
       Map<DistributionLocatorId, Set<LocatorJoinMessage>> failedMessages = new HashMap<>();
       for (Map.Entry<Integer, Set<DistributionLocatorId>> entry : remoteLocators.entrySet()) {
-        for (DistributionLocatorId value : entry.getValue()) {
+        for (DistributionLocatorId remoteLocator : entry.getValue()) {
           // Notify known remote locator about the advertised locator.
           LocatorJoinMessage advertiseNewLocatorMessage = new LocatorJoinMessage(
               joiningLocatorDistributedSystemId, joiningLocator, localLocatorId, "");
-          sendMessage(value, advertiseNewLocatorMessage, failedMessages);
+          sendMessage(remoteLocator, advertiseNewLocatorMessage, failedMessages);
 
           // Notify the advertised locator about remote known locator.
           LocatorJoinMessage advertiseKnownLocatorMessage =
-              new LocatorJoinMessage(entry.getKey(), value, localLocatorId, "");
+              new LocatorJoinMessage(entry.getKey(), remoteLocator, localLocatorId, "");
           sendMessage(joiningLocator, advertiseKnownLocatorMessage, failedMessages);
         }
       }
@@ -313,9 +314,11 @@ public class LocatorMembershipListenerImpl implements LocatorMembershipListener
             DistributionLocatorId targetLocator = entry.getKey();
             Set<LocatorJoinMessage> joinMessages = entry.getValue();
 
-            for (LocatorJoinMessage locatorJoinMessage : joinMessages) {
+            for (Iterator<LocatorJoinMessage> iterator = joinMessages.iterator(); iterator
+                .hasNext();) {
+              LocatorJoinMessage locatorJoinMessage = iterator.next();
               if (retryMessage(targetLocator, locatorJoinMessage, attempt)) {
-                joinMessages.remove(locatorJoinMessage);
+                iterator.remove();
               } else {
                 // Sleep between retries.
                 try {
@@ -324,6 +327,7 @@ public class LocatorMembershipListenerImpl implements LocatorMembershipListener
                   Thread.currentThread().interrupt();
                   logger.warn(
                       "Locator Membership listener permanently failed to exchange locator information due to interruption.");
+                  return;
                 }
               }
             }
diff --git a/geode-wan/src/test/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerTest.java b/geode-wan/src/test/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerTest.java
index f3770a6..b2b3c14 100644
--- a/geode-wan/src/test/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/cache/client/internal/locator/wan/LocatorMembershipListenerTest.java
@@ -21,6 +21,7 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.ArgumentMatchers.isA;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
@@ -47,7 +48,9 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.contrib.java.lang.system.SystemOutRule;
 
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.tcpserver.TcpClient;
@@ -55,9 +58,13 @@ import org.apache.geode.internal.admin.remote.DistributionLocatorId;
 import org.apache.geode.test.junit.ResultCaptor;
 
 public class LocatorMembershipListenerTest {
+  public static final int TIMEOUT = 500;
   private TcpClient tcpClient;
   private LocatorMembershipListenerImpl locatorMembershipListener;
 
+  @Rule
+  public SystemOutRule systemOutRule = new SystemOutRule();
+
   private DistributionLocatorId buildDistributionLocatorId(int port) {
     return new DistributionLocatorId("localhost[" + port + "]");
   }
@@ -86,10 +93,10 @@ public class LocatorMembershipListenerTest {
       throws IOException, ClassNotFoundException {
     verify(tcpClient).requestToServer(initialTargetLocator.getHost(),
         new LocatorJoinMessage(advertisedLocatorDsId, advertisedLocator, sourceLocator, ""),
-        DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+        TIMEOUT, false);
     verify(tcpClient).requestToServer(advertisedLocator.getHost(),
         new LocatorJoinMessage(initialTargetLocatorDsId, initialTargetLocator, sourceLocator, ""),
-        DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+        TIMEOUT, false);
   }
 
   private void joinLocatorsDistributorThread(ResultCaptor<Thread> resultCaptor) {
@@ -103,7 +110,7 @@ public class LocatorMembershipListenerTest {
     DistributionConfig distributionConfig = mock(DistributionConfig.class);
     when(distributionConfig.getStartLocator()).thenReturn(DistributionConfig.DEFAULT_START_LOCATOR);
     when(distributionConfig.getMemberTimeout())
-        .thenReturn(DistributionConfig.DEFAULT_MEMBER_TIMEOUT);
+        .thenReturn(TIMEOUT);
 
     tcpClient = mock(TcpClient.class);
     locatorMembershipListener = spy(new LocatorMembershipListenerImpl(tcpClient));
@@ -275,7 +282,7 @@ public class LocatorMembershipListenerTest {
     when(locatorMembershipListener.getAllLocatorsInfo()).thenReturn(allLocatorsInfo);
     when(tcpClient.requestToServer(locator3Site1.getHost(),
         new LocatorJoinMessage(1, joiningLocator, locator1Site1, ""),
-        DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false))
+        TIMEOUT, false))
             .thenThrow(new EOFException("Mock Exception"));
 
     ResultCaptor<Thread> resultCaptor = new ResultCaptor<>();
@@ -287,39 +294,70 @@ public class LocatorMembershipListenerTest {
     verify(tcpClient, times(LOCATOR_DISTRIBUTION_RETRY_ATTEMPTS + 1)).requestToServer(
         locator3Site1.getHost(),
         new LocatorJoinMessage(1, joiningLocator, locator1Site1, ""),
-        DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+        TIMEOUT, false);
     verify(tcpClient).requestToServer(joiningLocator.getHost(),
         new LocatorJoinMessage(1, locator3Site1, locator1Site1, ""),
-        DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+        TIMEOUT, false);
   }
 
   @Test
   public void locatorJoinedShouldNotRetryAgainAfterSuccessfulRetryOnConnectionFailures()
       throws IOException, ClassNotFoundException {
+    systemOutRule.enableLog();
     ConcurrentMap<Integer, Set<DistributionLocatorId>> allLocatorsInfo = new ConcurrentHashMap<>();
+    DistributionLocatorId localLocatorID = buildDistributionLocatorId(10101);
     DistributionLocatorId joiningLocator = buildDistributionLocatorId(10102);
-    DistributionLocatorId locator1Site1 = buildDistributionLocatorId(10101);
-    DistributionLocatorId locator3Site1 = buildDistributionLocatorId(10103);
-    allLocatorsInfo.put(1, new HashSet<>(Collections.singletonList(locator3Site1)));
+    DistributionLocatorId remoteLocator1 = buildDistributionLocatorId(10103);
+    DistributionLocatorId remoteLocator2 = buildDistributionLocatorId(10104);
+    final HashSet<DistributionLocatorId> remoteLocators =
+        new HashSet<>(Arrays.asList(new DistributionLocatorId[] {remoteLocator1, remoteLocator2}));
+    allLocatorsInfo.put(1, remoteLocators);
     when(locatorMembershipListener.getAllLocatorsInfo()).thenReturn(allLocatorsInfo);
-    when(tcpClient.requestToServer(locator3Site1.getHost(),
-        new LocatorJoinMessage(1, joiningLocator, locator1Site1, ""),
-        DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false))
+    // have messaging fail twice so that LocatorMembershipListenerImpl's retryMessage logic is
+    // exercised
+    when(tcpClient.requestToServer(remoteLocator1.getHost(),
+        new LocatorJoinMessage(1, joiningLocator, localLocatorID, ""),
+        TIMEOUT, false))
+            .thenThrow(new EOFException("Test Exception"))
+            .thenThrow(new EOFException("Test Exception"))
+            .thenReturn(null);
+    when(tcpClient.requestToServer(remoteLocator2.getHost(),
+        new LocatorJoinMessage(1, joiningLocator, localLocatorID, ""),
+        TIMEOUT, false))
+            .thenThrow(new EOFException("Test Exception"))
+            .thenThrow(new EOFException("Test Exception"))
+            .thenReturn(null);
+    when(tcpClient.requestToServer(joiningLocator.getHost(),
+        new LocatorJoinMessage(1, remoteLocator1, localLocatorID, ""),
+        TIMEOUT, false))
+            .thenThrow(new EOFException("Test Exception"))
+            .thenThrow(new EOFException("Test Exception"))
+            .thenReturn(null);
+    // also have the joining locator fail to receive messages so we can test that code path.
+    // It will fail to receive messages informing it of remoteLocator1 and remoteLocator2, so it
+    // will have
+    // two failed messages to retry. The others will each have one message to retry, informing
+    // them about the joiningLocator.
+    when(tcpClient.requestToServer(joiningLocator.getHost(),
+        new LocatorJoinMessage(1, remoteLocator2, localLocatorID, ""),
+        TIMEOUT, false))
+            .thenThrow(new EOFException("Mock Exception"))
             .thenThrow(new EOFException("Mock Exception"))
             .thenReturn(null);
 
     ResultCaptor<Thread> resultCaptor = new ResultCaptor<>();
     doAnswer(resultCaptor).when(locatorMembershipListener).buildLocatorsDistributorThread(
         any(DistributionLocatorId.class), anyMap(), any(DistributionLocatorId.class), anyInt());
-    locatorMembershipListener.locatorJoined(1, joiningLocator, locator1Site1);
+    locatorMembershipListener.locatorJoined(1, joiningLocator, localLocatorID);
     joinLocatorsDistributorThread(resultCaptor);
 
-    verify(tcpClient, times(2)).requestToServer(locator3Site1.getHost(),
-        new LocatorJoinMessage(1, joiningLocator, locator1Site1, ""),
-        DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
-    verify(tcpClient).requestToServer(joiningLocator.getHost(),
-        new LocatorJoinMessage(1, locator3Site1, locator1Site1, ""),
-        DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+    assertThat(systemOutRule.getLog()).doesNotContain("ConcurrentModificationException");
+
+    // The sendMessage loop in the listener will try to send 4 messages. Two to the remoteLocators
+    // and two to the joiningLocator. The retry loop will try to send the messages again and
+    // fail (4 more messages) and then it will succeed (4 more messages, for a total of 12).
+    verify(tcpClient, times(12)).requestToServer(isA(HostAndPort.class),
+        isA(LocatorJoinMessage.class), isA(Integer.class), isA(Boolean.class));
   }
 
   @Test
@@ -339,14 +377,14 @@ public class LocatorMembershipListenerTest {
     // Fail on first 2 attempts and succeed on third attempt.
     when(tcpClient.requestToServer(locator3Site1.getHost(),
         new LocatorJoinMessage(1, joiningLocator, locator1Site1, ""),
-        DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false))
+        TIMEOUT, false))
             .thenThrow(new EOFException("Mock Exception"))
             .thenThrow(new EOFException("Mock Exception")).thenReturn(null);
 
     // Fail always.
     when(tcpClient.requestToServer(joiningLocator.getHost(),
         new LocatorJoinMessage(3, locator1Site3, locator1Site1, ""),
-        DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false))
+        TIMEOUT, false))
             .thenThrow(new EOFException("Mock Exception"));
 
     ResultCaptor<Thread> resultCaptor = new ResultCaptor<>();
@@ -358,17 +396,17 @@ public class LocatorMembershipListenerTest {
     verifyMessagesSentBothWays(locator1Site1, 1, joiningLocator, 2, locator1Site2);
     verify(tcpClient, times(3)).requestToServer(locator3Site1.getHost(),
         new LocatorJoinMessage(1, joiningLocator, locator1Site1, ""),
-        DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+        TIMEOUT, false);
     verify(tcpClient).requestToServer(joiningLocator.getHost(),
         new LocatorJoinMessage(1, locator3Site1, locator1Site1, ""),
-        DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+        TIMEOUT, false);
     verify(tcpClient).requestToServer(locator1Site3.getHost(),
         new LocatorJoinMessage(1, joiningLocator, locator1Site1, ""),
-        DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+        TIMEOUT, false);
     verify(tcpClient, times(LOCATOR_DISTRIBUTION_RETRY_ATTEMPTS + 1)).requestToServer(
         joiningLocator.getHost(),
         new LocatorJoinMessage(3, locator1Site3, locator1Site1, ""),
-        DistributionConfig.DEFAULT_MEMBER_TIMEOUT, false);
+        TIMEOUT, false);
   }
 
   private static class HandlerCallable implements Callable<Object> {