You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2019/07/31 21:09:58 UTC

[geode] branch feature/GEODE-7038 created (now 2372796)

This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a change to branch feature/GEODE-7038
in repository https://gitbox.apache.org/repos/asf/geode.git.


      at 2372796  GEODE-7038: After auto-reconnect a server's multicat communications aren't working correctly

This branch includes the following new commits:

     new 2372796  GEODE-7038: After auto-reconnect a server's multicat communications aren't working correctly

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[geode] 01/01: GEODE-7038: After auto-reconnect a server's multicat communications aren't working correctly

Posted by bs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch feature/GEODE-7038
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 23727962252d5cbdf5dd7cccbd91ab65eca3ab19
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Wed Jul 31 14:05:03 2019 -0700

    GEODE-7038: After auto-reconnect a server's multicat communications aren't working correctly
    
    Ensure that a JoinResponseMessage is sent if multicast is enabled.  This
    allows JGroupsMessenger to piggy-back a multicast message digest on the
    response that the new process can install in its JGroups stack to ensure
    that multicast messaging is properly initialized.
    
    I've also replaced complex checks for whether UDP security is enabled
    with a simpler check on ServiceConfig.  When UDP security is enabled we
    are already sending a JoinResponseMessage and so we don't need to send
    another one if multicast is enabled.
---
 .../DistributedMulticastRegionDUnitTest.java       | 94 ++++++++++++++++++----
 .../internal/membership/gms/ServiceConfig.java     |  4 +
 .../membership/gms/membership/GMSJoinLeave.java    | 24 ++++--
 .../gms/messages/JoinRequestMessage.java           |  9 +++
 .../gms/messages/JoinResponseMessage.java          |  1 -
 .../membership/gms/messenger/JGroupsMessenger.java |  4 +-
 6 files changed, 114 insertions(+), 22 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache30/DistributedMulticastRegionDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache30/DistributedMulticastRegionDUnitTest.java
index 22561c0..9cfaa47 100755
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache30/DistributedMulticastRegionDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache30/DistributedMulticastRegionDUnitTest.java
@@ -14,21 +14,26 @@
  */
 package org.apache.geode.cache30;
 
+import static org.apache.geode.distributed.ConfigurationProperties.DISABLE_AUTO_RECONNECT;
 import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_TIME_STATISTICS;
 import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
 import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
+import static org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
 import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
 import static org.apache.geode.distributed.ConfigurationProperties.MCAST_TTL;
 import static org.apache.geode.distributed.ConfigurationProperties.NAME;
 import static org.apache.geode.distributed.ConfigurationProperties.STATISTIC_ARCHIVE_FILE;
 import static org.apache.geode.distributed.ConfigurationProperties.STATISTIC_SAMPLING_ENABLED;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
+import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.geode.cache.AttributesFactory;
@@ -40,11 +45,15 @@ import org.apache.geode.cache.Scope;
 import org.apache.geode.distributed.Locator;
 import org.apache.geode.distributed.internal.InternalLocator;
 import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.internal.OSProcess;
 import org.apache.geode.internal.cache.CachedDeserializableFactory;
+import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.pdx.PdxReader;
 import org.apache.geode.pdx.PdxSerializable;
 import org.apache.geode.pdx.PdxSerializationException;
 import org.apache.geode.pdx.PdxWriter;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.DistributedTestUtils;
 import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.Invoke;
 import org.apache.geode.test.dunit.SerializableCallable;
@@ -55,12 +64,17 @@ import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
 
 public class DistributedMulticastRegionDUnitTest extends JUnit4CacheTestCase {
 
-  static int locatorVM = 3;
-  static String mcastport = "42786";
-  static String mcastttl = "0";
+  int locatorVM = 3;
+  String mcastport = "0";
+  String mcastttl = "0";
 
   private int locatorPort;
 
+  @Before
+  public void setup() {
+    mcastport = String.valueOf(AvailablePortHelper.getRandomAvailableUDPPort());
+  }
+
   @Override
   public final void preSetUp() throws Exception {
     clean();
@@ -111,24 +125,74 @@ public class DistributedMulticastRegionDUnitTest extends JUnit4CacheTestCase {
       @Override
       public void run2() throws CacheException {
         final Region region = getRootRegion().getSubregion(name);
-        for (int i = 0; i < 5; i++) {
+        for (int i = 0; i < 50; i++) {
           region.put(i, i);
         }
       }
     };
 
     vm0.invoke(doPuts);
+    vm0.invoke(() -> validateMulticastOpsAfterRegionOps());
+    vm1.invoke(() -> validateMulticastOpsAfterRegionOps());
 
-    SerializableRunnable validateMulticastAfterRegionOps =
-        new CacheSerializableRunnable("validateMulticast after region ops") {
-          @Override
-          public void run2() throws CacheException {
-            validateMulticastOpsAfterRegionOps();
-          }
-        };
+    closeLocator();
+  }
+
+  @Test
+  public void testMulticastAfterReconnect() {
+    final String name = "mcastRegion";
+    SerializableRunnable create = new CacheSerializableRunnable("Create Region") {
+      @Override
+      public void run2() throws CacheException {
+        createRegion(name, getRegionAttributes());
+      }
+    };
+
+    locatorPort = startLocator();
+    Host host = Host.getHost(0);
+    final VM vm0 = host.getVM(0);
+    final VM vm1 = host.getVM(1);
+    // 1. start locator with mcast port
+    vm0.invoke(create);
+    vm1.invoke(create);
+    // There is possibility that you may get this packet from other tests
+    /*
+     * SerializableRunnable validateMulticastBeforeRegionOps = new
+     * CacheSerializableRunnable("validateMulticast before region ops") { public void run2() throws
+     * CacheException { validateMulticastOpsBeforeRegionOps(); } };
+     *
+     * vm0.invoke(validateMulticastBeforeRegionOps); vm1.invoke(validateMulticastBeforeRegionOps);
+     */
+
+    SerializableRunnable doPuts = new CacheSerializableRunnable("do put") {
+      @Override
+      public void run2() throws CacheException {
+        final Region region = getRootRegion().getSubregion(name);
+        for (int i = 0; i < 50; i++) {
+          region.put(i, i);
+        }
+      }
+    };
+
+    vm0.invoke(doPuts);
+
+    DistributedTestUtils.crashDistributedSystem(vm1);
+    vm0.invoke(doPuts);
+    vm1.invoke(() -> {
+      basicGetCache().waitUntilReconnected(30, TimeUnit.SECONDS);
+      assertNotNull(basicGetCache().getReconnectedCache());
+      cache = (InternalCache) basicGetCache().getReconnectedCache();
+      system = cache.getInternalDistributedSystem();
+    });
+    vm0.invoke(doPuts);
+    vm0.invoke(() -> {
+      GeodeAwaitility.await().until(() -> {
+        getCache().close();
+        return true;
+      });
+    });
 
-    vm0.invoke(validateMulticastAfterRegionOps);
-    vm1.invoke(validateMulticastAfterRegionOps);
+    vm1.invoke(() -> validateMulticastOpsAfterRegionOps());
 
     closeLocator();
   }
@@ -220,8 +284,10 @@ public class DistributedMulticastRegionDUnitTest extends JUnit4CacheTestCase {
   @Override
   public Properties getDistributedSystemProperties() {
     Properties p = new Properties();
+    p.put(DISABLE_AUTO_RECONNECT, "false");
+    p.put(MAX_WAIT_TIME_RECONNECT, "20");
     p.put(STATISTIC_SAMPLING_ENABLED, "true");
-    p.put(STATISTIC_ARCHIVE_FILE, "multicast");
+    p.put(STATISTIC_ARCHIVE_FILE, "multicast" + OSProcess.getId());
     p.put(ENABLE_TIME_STATISTICS, "true");
     p.put(MCAST_PORT, mcastport);
     p.put(MCAST_TTL, mcastttl);
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/ServiceConfig.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/ServiceConfig.java
index 0fe55f0..b70e48a 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/ServiceConfig.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/ServiceConfig.java
@@ -85,6 +85,10 @@ public class ServiceConfig {
     return networkPartitionDetectionEnabled;
   }
 
+  public boolean isUDPSecurityEnabled() {
+    return !dconfig.getSecurityUDPDHAlgo().isEmpty();
+  }
+
   public boolean areLocatorsPreferredAsCoordinators() {
     boolean locatorsAreCoordinators;
 
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeave.java
index 32452c3..f38b20e 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeave.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeave.java
@@ -732,13 +732,14 @@ public class GMSJoinLeave implements JoinLeave {
       synchronized (viewRequests) {
         if (request instanceof JoinRequestMessage) {
           if (isCoordinator
-              && !services.getConfig().getDistributionConfig().getSecurityUDPDHAlgo().isEmpty()) {
+              && services.getConfig().isUDPSecurityEnabled()) {
             services.getMessenger().initClusterKey();
             JoinRequestMessage jreq = (JoinRequestMessage) request;
             // this will inform about cluster-secret key, as we have authenticated at this point
             JoinResponseMessage response = new JoinResponseMessage(jreq.getSender(),
                 services.getMessenger().getClusterSecretKey(), jreq.getRequestId());
             services.getMessenger().send(response);
+            jreq.setResponseSent();
           }
         }
         logger.debug("Recording the request to be processed in the next membership view");
@@ -753,17 +754,20 @@ public class GMSJoinLeave implements JoinLeave {
 
   private void sendDHKeys() {
     if (isCoordinator
-        && !services.getConfig().getDistributionConfig().getSecurityUDPDHAlgo().isEmpty()) {
+        && services.getConfig().isUDPSecurityEnabled()) {
       synchronized (viewRequests) {
         for (DistributionMessage request : viewRequests) {
           if (request instanceof JoinRequestMessage) {
 
             services.getMessenger().initClusterKey();
             JoinRequestMessage jreq = (JoinRequestMessage) request;
-            // this will inform about cluster-secret key, as we have authenticated at this point
-            JoinResponseMessage response = new JoinResponseMessage(jreq.getSender(),
-                services.getMessenger().getClusterSecretKey(), jreq.getRequestId());
-            services.getMessenger().send(response);
+            if (!jreq.isResponseSent()) {
+              // this will inform about cluster-secret key, as we have authenticated at this point
+              JoinResponseMessage response = new JoinResponseMessage(jreq.getSender(),
+                  services.getMessenger().getClusterSecretKey(), jreq.getRequestId());
+              services.getMessenger().send(response);
+              jreq.setResponseSent();
+            }
           }
         }
       }
@@ -2459,6 +2463,14 @@ public class GMSJoinLeave implements JoinLeave {
               } else {
                 joinReqs.add(mbr);
                 joinPorts.put(mbr, port);
+                if (!jmsg.isResponseSent()
+                    && services.getConfig().getTransport().isMcastEnabled()) {
+                  // send a join response so the new member can get the multicast messaging digest.
+                  JoinResponseMessage response = new JoinResponseMessage(jmsg.getSender(),
+                      services.getMessenger().getClusterSecretKey(), jmsg.getRequestId());
+                  services.getMessenger().send(response);
+                  jmsg.setResponseSent();
+                }
               }
             }
             break;
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messages/JoinRequestMessage.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messages/JoinRequestMessage.java
index 1c565d5..922d0ba 100755
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messages/JoinRequestMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messages/JoinRequestMessage.java
@@ -30,6 +30,7 @@ public class JoinRequestMessage extends HighPriorityDistributionMessage {
   private Object credentials;
   private int failureDetectionPort = -1;
   private int requestId;
+  private transient boolean responseSent;
 
   public JoinRequestMessage(InternalDistributedMember coord, InternalDistributedMember id,
       Object credentials, int fdPort, int requestId) {
@@ -133,4 +134,12 @@ public class JoinRequestMessage extends HighPriorityDistributionMessage {
       return false;
     return true;
   }
+
+  public void setResponseSent() {
+    responseSent = true;
+  }
+
+  public boolean isResponseSent() {
+    return responseSent;
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messages/JoinResponseMessage.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messages/JoinResponseMessage.java
index b97d11a..e105d6a 100755
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messages/JoinResponseMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messages/JoinResponseMessage.java
@@ -174,5 +174,4 @@ public class JoinResponseMessage extends HighPriorityDistributionMessage {
     return true;
   }
 
-
 }
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
index c0561f3..7ed141d 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
@@ -412,7 +412,7 @@ public class JGroupsMessenger implements Messenger {
 
   @Override
   public void started() {
-    if (queuedMessagesFromReconnect != null) {
+    if (queuedMessagesFromReconnect != null && !services.getConfig().isUDPSecurityEnabled()) {
       logger.info("Delivering {} messages queued by quorum checker",
           queuedMessagesFromReconnect.size());
       for (Message message : queuedMessagesFromReconnect) {
@@ -1196,6 +1196,8 @@ public class JGroupsMessenger implements Messenger {
             this.myChannel.getProtocolStack().getTopProtocol()
                 .down(new Event(Event.MERGE_DIGEST, digest));
             jrsp.setMessengerData(null);
+            digest = (Digest) this.myChannel.getProtocolStack().getTopProtocol()
+                .down(Event.GET_DIGEST_EVT);
           } catch (Exception e) {
             logger.fatal("Unable to read JGroups messaging digest", e);
           }