You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2020/04/01 22:28:42 UTC

[geode] 01/01: GEODE-7921: NullPointerExceptions logged during auto-reconnect

This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch feature/GEODE-7921
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 0400983ae2dd81898c2a416d41d57e89148ad8e2
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Wed Apr 1 15:24:22 2020 -0700

    GEODE-7921: NullPointerExceptions logged during auto-reconnect
    
    Don't deliver cache-level messages that were queued while disconnected
    for auto-reconnect.  During auto-reconnect there is a QuorumChecker that
    receives messages on the jgroups channel and tries to establish
    communications with a quorum of the old membership view.  It may also
    get JoinRequest messages and other membership-level messages but I
    observed one case where it also queued cache-level messages when the
    property disable-tcp was set to true (funnelling all comms through
    jgroups).
    
    I also added some null checks to the LatestLastAccessTimeMessage and a
    small test for that.
---
 .../cache/LatestLastAccessTimeMessage.java         | 26 +++++++-----
 .../cache/LatestLastAccessTimeMessageTest.java     | 43 +++++++++++++++++++
 .../membership/gms/GMSMembershipJUnitTest.java     | 30 --------------
 .../internal/membership/gms/TestMessage.java       | 48 ++++++++++++++++++++++
 .../gms/messenger/JGroupsMessengerJUnitTest.java   | 21 ++++++++++
 .../membership/gms/messenger/JGroupsMessenger.java | 12 ++++--
 6 files changed, 136 insertions(+), 44 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LatestLastAccessTimeMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LatestLastAccessTimeMessage.java
index cfaa24d..f012085 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LatestLastAccessTimeMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LatestLastAccessTimeMessage.java
@@ -62,17 +62,23 @@ public class LatestLastAccessTimeMessage<K> extends PooledDistributionMessage
   @Override
   protected void process(ClusterDistributionManager dm) {
     long latestLastAccessTime = 0L;
+    InternalCache cache = dm.getCache();
+    if (cache == null) {
+      return;
+    }
     InternalDistributedRegion region =
-        (InternalDistributedRegion) dm.getCache().getRegion(this.regionName);
-    if (region != null) {
-      RegionEntry entry = region.getRegionEntry(this.key);
-      if (entry != null) {
-        try {
-          latestLastAccessTime = entry.getLastAccessed();
-        } catch (InternalStatisticsDisabledException ignored) {
-          // last access time is not available
-        }
-      }
+        (InternalDistributedRegion) cache.getRegion(this.regionName);
+    if (region == null) {
+      return;
+    }
+    RegionEntry entry = region.getRegionEntry(this.key);
+    if (entry == null) {
+      return;
+    }
+    try {
+      latestLastAccessTime = entry.getLastAccessed();
+    } catch (InternalStatisticsDisabledException ignored) {
+      // last access time is not available
     }
     ReplyMessage.send(getSender(), this.processorId, latestLastAccessTime, dm);
   }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/LatestLastAccessTimeMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/LatestLastAccessTimeMessageTest.java
new file mode 100644
index 0000000..1ddd31e
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/LatestLastAccessTimeMessageTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.mockito.Mockito.mock;
+
+import java.util.Collections;
+import java.util.Set;
+
+import org.junit.Test;
+
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.inet.LocalHostUtil;
+
+public class LatestLastAccessTimeMessageTest {
+
+  @Test
+  public void processMessageShouldLookForNullCache() throws Exception {
+    final DistributionManager distributionManager = mock(DistributionManager.class);
+    final LatestLastAccessTimeReplyProcessor replyProcessor =
+        mock(LatestLastAccessTimeReplyProcessor.class);
+    final InternalDistributedRegion region = mock(InternalDistributedRegion.class);
+    Set<InternalDistributedMember> recipients = Collections.singleton(new InternalDistributedMember(
+        LocalHostUtil.getLocalHost(), 1234));
+    final LatestLastAccessTimeMessage<String> lastAccessTimeMessage =
+        new LatestLastAccessTimeMessage<>(replyProcessor, recipients, region, "foo");
+    lastAccessTimeMessage.process(mock(ClusterDistributionManager.class));
+  }
+}
diff --git a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMembershipJUnitTest.java b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMembershipJUnitTest.java
index 0e19f70..49b8b3a 100644
--- a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMembershipJUnitTest.java
+++ b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMembershipJUnitTest.java
@@ -28,9 +28,6 @@ import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -61,11 +58,7 @@ import org.apache.geode.distributed.internal.membership.gms.Services.Stopper;
 import org.apache.geode.distributed.internal.membership.gms.interfaces.HealthMonitor;
 import org.apache.geode.distributed.internal.membership.gms.interfaces.JoinLeave;
 import org.apache.geode.distributed.internal.membership.gms.interfaces.Messenger;
-import org.apache.geode.distributed.internal.membership.gms.messages.AbstractGMSMessage;
 import org.apache.geode.internal.serialization.DSFIDSerializer;
-import org.apache.geode.internal.serialization.DeserializationContext;
-import org.apache.geode.internal.serialization.SerializationContext;
-import org.apache.geode.internal.serialization.Version;
 import org.apache.geode.internal.serialization.internal.DSFIDSerializerImpl;
 import org.apache.geode.test.junit.categories.MembershipTest;
 
@@ -335,27 +328,4 @@ public class GMSMembershipJUnitTest {
     assertThat(spy.getStartupEvents()).isEmpty();
   }
 
-  public static class TestMessage extends AbstractGMSMessage {
-
-    @Override
-    public int getDSFID() {
-      return HIGH_PRIORITY_ACKED_MESSAGE;
-    }
-
-    @Override
-    public void toData(DataOutput out, SerializationContext context) throws IOException {
-
-    }
-
-    @Override
-    public void fromData(DataInput in, DeserializationContext context)
-        throws IOException, ClassNotFoundException {
-
-    }
-
-    @Override
-    public Version[] getSerializationVersions() {
-      return null;
-    }
-  }
 }
diff --git a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/TestMessage.java b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/TestMessage.java
new file mode 100644
index 0000000..8da2dac
--- /dev/null
+++ b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/TestMessage.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.distributed.internal.membership.gms;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.geode.distributed.internal.membership.gms.messages.AbstractGMSMessage;
+import org.apache.geode.internal.serialization.DeserializationContext;
+import org.apache.geode.internal.serialization.SerializationContext;
+import org.apache.geode.internal.serialization.Version;
+
+public class TestMessage extends AbstractGMSMessage {
+
+  @Override
+  public int getDSFID() {
+    return HIGH_PRIORITY_ACKED_MESSAGE;
+  }
+
+  @Override
+  public void toData(DataOutput out, SerializationContext context) throws IOException {
+
+  }
+
+  @Override
+  public void fromData(DataInput in, DeserializationContext context)
+      throws IOException, ClassNotFoundException {
+
+  }
+
+  @Override
+  public Version[] getSerializationVersions() {
+    return null;
+  }
+}
diff --git a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
index 0e0748d..fe52eb8 100755
--- a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
+++ b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
@@ -25,6 +25,7 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.isA;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
@@ -61,6 +62,7 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
 import org.apache.geode.distributed.internal.membership.api.MemberDisconnectedException;
@@ -74,6 +76,7 @@ import org.apache.geode.distributed.internal.membership.gms.GMSMembershipView;
 import org.apache.geode.distributed.internal.membership.gms.MemberIdentifierImpl;
 import org.apache.geode.distributed.internal.membership.gms.Services;
 import org.apache.geode.distributed.internal.membership.gms.Services.Stopper;
+import org.apache.geode.distributed.internal.membership.gms.TestMessage;
 import org.apache.geode.distributed.internal.membership.gms.interfaces.HealthMonitor;
 import org.apache.geode.distributed.internal.membership.gms.interfaces.JoinLeave;
 import org.apache.geode.distributed.internal.membership.gms.interfaces.Manager;
@@ -823,6 +826,24 @@ public class JGroupsMessengerJUnitTest {
     assertTrue(pinger.isPongMessage(m.getBuffer()));
   }
 
+  /**
+   * messages for the Manager that were queued by a quorum checker shouldn't be delivered to
+   * a Manager
+   */
+  @Test
+  public void testIgnoreManagerMessagesFromQuorumChecker() throws Exception {
+    initMocks(false);
+    MemberIdentifier memberIdentifier = createAddress(8888);
+    JGAddress jgAddress = new JGAddress(memberIdentifier);
+
+    ArgumentCaptor<Message> valueCapture = ArgumentCaptor.forClass(Message.class);
+    doNothing().when(manager).processMessage(valueCapture.capture());
+    org.jgroups.Message jgroupsMessage = messenger.createJGMessage(new TestMessage(), jgAddress,
+        memberIdentifier, Version.CURRENT_ORDINAL);
+    messenger.jgroupsReceiver.receive(jgroupsMessage, true);
+    assertThat(valueCapture.getAllValues()).isEmpty();
+  }
+
   @Test
   public void testJGroupsIOExceptionHandler() throws Exception {
     initMocks(false);
diff --git a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
index ca53906..da67d36 100644
--- a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
+++ b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
@@ -82,6 +82,7 @@ import org.apache.geode.distributed.internal.membership.gms.GMSMembershipView;
 import org.apache.geode.distributed.internal.membership.gms.GMSUtil;
 import org.apache.geode.distributed.internal.membership.gms.Services;
 import org.apache.geode.distributed.internal.membership.gms.interfaces.HealthMonitor;
+import org.apache.geode.distributed.internal.membership.gms.interfaces.Manager;
 import org.apache.geode.distributed.internal.membership.gms.interfaces.MessageHandler;
 import org.apache.geode.distributed.internal.membership.gms.interfaces.Messenger;
 import org.apache.geode.distributed.internal.membership.gms.locator.FindCoordinatorRequest;
@@ -178,7 +179,7 @@ public class JGroupsMessenger<ID extends MemberIdentifier> implements Messenger<
    * The JGroupsReceiver is handed messages by the JGroups Channel. It is responsible
    * for deserializating and dispatching those messages to the appropriate handler
    */
-  private JGroupsReceiver jgroupsReceiver;
+  protected JGroupsReceiver jgroupsReceiver;
 
   public static void setChannelReceiver(JChannel channel, Receiver r) {
     try {
@@ -1266,7 +1267,7 @@ public class JGroupsMessenger<ID extends MemberIdentifier> implements Messenger<
       receive(jgmsg, false);
     }
 
-    private void receive(org.jgroups.Message jgmsg, boolean fromQuorumChecker) {
+    protected void receive(org.jgroups.Message jgmsg, boolean fromQuorumChecker) {
       long startTime = services.getStatistics().startUDPDispatchRequest();
       try {
         if (services.getManager().shutdownInProgress()) {
@@ -1320,9 +1321,12 @@ public class JGroupsMessenger<ID extends MemberIdentifier> implements Messenger<
           }
           filterIncomingMessage(msg);
           MessageHandler<Message<ID>> handler = getMessageHandler(msg);
-          if (fromQuorumChecker && handler instanceof HealthMonitor) {
+          if (fromQuorumChecker
+              && (handler instanceof HealthMonitor || handler instanceof Manager)) {
             // ignore suspect / heartbeat messages that happened during
-            // auto-reconnect because they very likely have old member IDs in them
+            // auto-reconnect because they very likely have old member IDs in them.
+            // Also ignore non-membership messages because we weren't a member when we received
+            // them.
           } else {
             handler.processMessage(msg);
           }