You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2020/04/01 22:28:41 UTC

[geode] branch feature/GEODE-7921 created (now 0400983)

This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a change to branch feature/GEODE-7921
in repository https://gitbox.apache.org/repos/asf/geode.git.


      at 0400983  GEODE-7921: NullPointerExceptions logged during auto-reconnect

This branch includes the following new commits:

     new 0400983  GEODE-7921: NullPointerExceptions logged during auto-reconnect

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[geode] 01/01: GEODE-7921: NullPointerExceptions logged during auto-reconnect

Posted by bs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch feature/GEODE-7921
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 0400983ae2dd81898c2a416d41d57e89148ad8e2
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Wed Apr 1 15:24:22 2020 -0700

    GEODE-7921: NullPointerExceptions logged during auto-reconnect
    
    Don't deliver cache-level messages that were queued while disconnected
    for auto-reconnect.  During auto-reconnect there is a QuorumChecker that
    receives messages on the jgroups channel and tries to establish
    communications with a quorum of the old membership view.  It may also
    get JoinRequest messages and other membership-level messages but I
    observed one case where it also queued cache-level messages when the
    property disable-tcp was set to true (funnelling all comms through
    jgroups).
    
    I also added some null checks to the LatestLastAccessTimeMessage and a
    small test for that.
---
 .../cache/LatestLastAccessTimeMessage.java         | 26 +++++++-----
 .../cache/LatestLastAccessTimeMessageTest.java     | 43 +++++++++++++++++++
 .../membership/gms/GMSMembershipJUnitTest.java     | 30 --------------
 .../internal/membership/gms/TestMessage.java       | 48 ++++++++++++++++++++++
 .../gms/messenger/JGroupsMessengerJUnitTest.java   | 21 ++++++++++
 .../membership/gms/messenger/JGroupsMessenger.java | 12 ++++--
 6 files changed, 136 insertions(+), 44 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LatestLastAccessTimeMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LatestLastAccessTimeMessage.java
index cfaa24d..f012085 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LatestLastAccessTimeMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LatestLastAccessTimeMessage.java
@@ -62,17 +62,23 @@ public class LatestLastAccessTimeMessage<K> extends PooledDistributionMessage
   @Override
   protected void process(ClusterDistributionManager dm) {
     long latestLastAccessTime = 0L;
+    InternalCache cache = dm.getCache();
+    if (cache == null) {
+      return;
+    }
     InternalDistributedRegion region =
-        (InternalDistributedRegion) dm.getCache().getRegion(this.regionName);
-    if (region != null) {
-      RegionEntry entry = region.getRegionEntry(this.key);
-      if (entry != null) {
-        try {
-          latestLastAccessTime = entry.getLastAccessed();
-        } catch (InternalStatisticsDisabledException ignored) {
-          // last access time is not available
-        }
-      }
+        (InternalDistributedRegion) cache.getRegion(this.regionName);
+    if (region == null) {
+      return;
+    }
+    RegionEntry entry = region.getRegionEntry(this.key);
+    if (entry == null) {
+      return;
+    }
+    try {
+      latestLastAccessTime = entry.getLastAccessed();
+    } catch (InternalStatisticsDisabledException ignored) {
+      // last access time is not available
     }
     ReplyMessage.send(getSender(), this.processorId, latestLastAccessTime, dm);
   }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/LatestLastAccessTimeMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/LatestLastAccessTimeMessageTest.java
new file mode 100644
index 0000000..1ddd31e
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/LatestLastAccessTimeMessageTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.mockito.Mockito.mock;
+
+import java.util.Collections;
+import java.util.Set;
+
+import org.junit.Test;
+
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.inet.LocalHostUtil;
+
+public class LatestLastAccessTimeMessageTest {
+
+  @Test
+  public void processMessageShouldLookForNullCache() throws Exception {
+    final DistributionManager distributionManager = mock(DistributionManager.class);
+    final LatestLastAccessTimeReplyProcessor replyProcessor =
+        mock(LatestLastAccessTimeReplyProcessor.class);
+    final InternalDistributedRegion region = mock(InternalDistributedRegion.class);
+    Set<InternalDistributedMember> recipients = Collections.singleton(new InternalDistributedMember(
+        LocalHostUtil.getLocalHost(), 1234));
+    final LatestLastAccessTimeMessage<String> lastAccessTimeMessage =
+        new LatestLastAccessTimeMessage<>(replyProcessor, recipients, region, "foo");
+    lastAccessTimeMessage.process(mock(ClusterDistributionManager.class));
+  }
+}
diff --git a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMembershipJUnitTest.java b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMembershipJUnitTest.java
index 0e19f70..49b8b3a 100644
--- a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMembershipJUnitTest.java
+++ b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMembershipJUnitTest.java
@@ -28,9 +28,6 @@ import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -61,11 +58,7 @@ import org.apache.geode.distributed.internal.membership.gms.Services.Stopper;
 import org.apache.geode.distributed.internal.membership.gms.interfaces.HealthMonitor;
 import org.apache.geode.distributed.internal.membership.gms.interfaces.JoinLeave;
 import org.apache.geode.distributed.internal.membership.gms.interfaces.Messenger;
-import org.apache.geode.distributed.internal.membership.gms.messages.AbstractGMSMessage;
 import org.apache.geode.internal.serialization.DSFIDSerializer;
-import org.apache.geode.internal.serialization.DeserializationContext;
-import org.apache.geode.internal.serialization.SerializationContext;
-import org.apache.geode.internal.serialization.Version;
 import org.apache.geode.internal.serialization.internal.DSFIDSerializerImpl;
 import org.apache.geode.test.junit.categories.MembershipTest;
 
@@ -335,27 +328,4 @@ public class GMSMembershipJUnitTest {
     assertThat(spy.getStartupEvents()).isEmpty();
   }
 
-  public static class TestMessage extends AbstractGMSMessage {
-
-    @Override
-    public int getDSFID() {
-      return HIGH_PRIORITY_ACKED_MESSAGE;
-    }
-
-    @Override
-    public void toData(DataOutput out, SerializationContext context) throws IOException {
-
-    }
-
-    @Override
-    public void fromData(DataInput in, DeserializationContext context)
-        throws IOException, ClassNotFoundException {
-
-    }
-
-    @Override
-    public Version[] getSerializationVersions() {
-      return null;
-    }
-  }
 }
diff --git a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/TestMessage.java b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/TestMessage.java
new file mode 100644
index 0000000..8da2dac
--- /dev/null
+++ b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/TestMessage.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.distributed.internal.membership.gms;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.geode.distributed.internal.membership.gms.messages.AbstractGMSMessage;
+import org.apache.geode.internal.serialization.DeserializationContext;
+import org.apache.geode.internal.serialization.SerializationContext;
+import org.apache.geode.internal.serialization.Version;
+
+public class TestMessage extends AbstractGMSMessage {
+
+  @Override
+  public int getDSFID() {
+    return HIGH_PRIORITY_ACKED_MESSAGE;
+  }
+
+  @Override
+  public void toData(DataOutput out, SerializationContext context) throws IOException {
+
+  }
+
+  @Override
+  public void fromData(DataInput in, DeserializationContext context)
+      throws IOException, ClassNotFoundException {
+
+  }
+
+  @Override
+  public Version[] getSerializationVersions() {
+    return null;
+  }
+}
diff --git a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
index 0e0748d..fe52eb8 100755
--- a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
+++ b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
@@ -25,6 +25,7 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.isA;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
@@ -61,6 +62,7 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
 import org.apache.geode.distributed.internal.membership.api.MemberDisconnectedException;
@@ -74,6 +76,7 @@ import org.apache.geode.distributed.internal.membership.gms.GMSMembershipView;
 import org.apache.geode.distributed.internal.membership.gms.MemberIdentifierImpl;
 import org.apache.geode.distributed.internal.membership.gms.Services;
 import org.apache.geode.distributed.internal.membership.gms.Services.Stopper;
+import org.apache.geode.distributed.internal.membership.gms.TestMessage;
 import org.apache.geode.distributed.internal.membership.gms.interfaces.HealthMonitor;
 import org.apache.geode.distributed.internal.membership.gms.interfaces.JoinLeave;
 import org.apache.geode.distributed.internal.membership.gms.interfaces.Manager;
@@ -823,6 +826,24 @@ public class JGroupsMessengerJUnitTest {
     assertTrue(pinger.isPongMessage(m.getBuffer()));
   }
 
+  /**
+   * messages for the Manager that were queued by a quorum checker shouldn't be delivered to
+   * a Manager
+   */
+  @Test
+  public void testIgnoreManagerMessagesFromQuorumChecker() throws Exception {
+    initMocks(false);
+    MemberIdentifier memberIdentifier = createAddress(8888);
+    JGAddress jgAddress = new JGAddress(memberIdentifier);
+
+    ArgumentCaptor<Message> valueCapture = ArgumentCaptor.forClass(Message.class);
+    doNothing().when(manager).processMessage(valueCapture.capture());
+    org.jgroups.Message jgroupsMessage = messenger.createJGMessage(new TestMessage(), jgAddress,
+        memberIdentifier, Version.CURRENT_ORDINAL);
+    messenger.jgroupsReceiver.receive(jgroupsMessage, true);
+    assertThat(valueCapture.getAllValues()).isEmpty();
+  }
+
   @Test
   public void testJGroupsIOExceptionHandler() throws Exception {
     initMocks(false);
diff --git a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
index ca53906..da67d36 100644
--- a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
+++ b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
@@ -82,6 +82,7 @@ import org.apache.geode.distributed.internal.membership.gms.GMSMembershipView;
 import org.apache.geode.distributed.internal.membership.gms.GMSUtil;
 import org.apache.geode.distributed.internal.membership.gms.Services;
 import org.apache.geode.distributed.internal.membership.gms.interfaces.HealthMonitor;
+import org.apache.geode.distributed.internal.membership.gms.interfaces.Manager;
 import org.apache.geode.distributed.internal.membership.gms.interfaces.MessageHandler;
 import org.apache.geode.distributed.internal.membership.gms.interfaces.Messenger;
 import org.apache.geode.distributed.internal.membership.gms.locator.FindCoordinatorRequest;
@@ -178,7 +179,7 @@ public class JGroupsMessenger<ID extends MemberIdentifier> implements Messenger<
    * The JGroupsReceiver is handed messages by the JGroups Channel. It is responsible
    * for deserializating and dispatching those messages to the appropriate handler
    */
-  private JGroupsReceiver jgroupsReceiver;
+  protected JGroupsReceiver jgroupsReceiver;
 
   public static void setChannelReceiver(JChannel channel, Receiver r) {
     try {
@@ -1266,7 +1267,7 @@ public class JGroupsMessenger<ID extends MemberIdentifier> implements Messenger<
       receive(jgmsg, false);
     }
 
-    private void receive(org.jgroups.Message jgmsg, boolean fromQuorumChecker) {
+    protected void receive(org.jgroups.Message jgmsg, boolean fromQuorumChecker) {
       long startTime = services.getStatistics().startUDPDispatchRequest();
       try {
         if (services.getManager().shutdownInProgress()) {
@@ -1320,9 +1321,12 @@ public class JGroupsMessenger<ID extends MemberIdentifier> implements Messenger<
           }
           filterIncomingMessage(msg);
           MessageHandler<Message<ID>> handler = getMessageHandler(msg);
-          if (fromQuorumChecker && handler instanceof HealthMonitor) {
+          if (fromQuorumChecker
+              && (handler instanceof HealthMonitor || handler instanceof Manager)) {
             // ignore suspect / heartbeat messages that happened during
-            // auto-reconnect because they very likely have old member IDs in them
+            // auto-reconnect because they very likely have old member IDs in them.
+            // Also ignore non-membership messages because we weren't a member when we received
+            // them.
           } else {
             handler.processMessage(msg);
           }