You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2020/04/03 15:25:10 UTC
[geode] branch develop updated: GEODE-7921: NullPointerExceptions
logged during auto-reconnect (#4898)
This is an automated email from the ASF dual-hosted git repository.
bschuchardt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 2ac3de7 GEODE-7921: NullPointerExceptions logged during auto-reconnect (#4898)
2ac3de7 is described below
commit 2ac3de788502d1a40189bed06ce86779ee47bdf8
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Fri Apr 3 08:24:34 2020 -0700
GEODE-7921: NullPointerExceptions logged during auto-reconnect (#4898)
Don't deliver cache-level messages that were queued while disconnected
for auto-reconnect. During auto-reconnect there is a QuorumChecker that
receives messages on the jgroups channel and tries to establish
communications with a quorum of the old membership view. It may also
get JoinRequest messages and other membership-level messages but I
observed one case where it also queued cache-level messages when the
property disable-tcp was set to true (funnelling all comms through
jgroups).
I also added some null checks to the LatestLastAccessTimeMessage and a
small test for that.
---
.../cache/LatestLastAccessTimeMessage.java | 26 +++++++-----
.../cache/LatestLastAccessTimeMessageTest.java | 43 +++++++++++++++++++
.../membership/gms/GMSMembershipJUnitTest.java | 30 --------------
.../internal/membership/gms/TestMessage.java | 48 ++++++++++++++++++++++
.../gms/messenger/JGroupsMessengerJUnitTest.java | 21 ++++++++++
.../membership/gms/messenger/JGroupsMessenger.java | 12 ++++--
6 files changed, 136 insertions(+), 44 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LatestLastAccessTimeMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LatestLastAccessTimeMessage.java
index cfaa24d..f012085 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LatestLastAccessTimeMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LatestLastAccessTimeMessage.java
@@ -62,17 +62,23 @@ public class LatestLastAccessTimeMessage<K> extends PooledDistributionMessage
@Override
protected void process(ClusterDistributionManager dm) {
long latestLastAccessTime = 0L;
+ InternalCache cache = dm.getCache();
+ if (cache == null) {
+ return;
+ }
InternalDistributedRegion region =
- (InternalDistributedRegion) dm.getCache().getRegion(this.regionName);
- if (region != null) {
- RegionEntry entry = region.getRegionEntry(this.key);
- if (entry != null) {
- try {
- latestLastAccessTime = entry.getLastAccessed();
- } catch (InternalStatisticsDisabledException ignored) {
- // last access time is not available
- }
- }
+ (InternalDistributedRegion) cache.getRegion(this.regionName);
+ if (region == null) {
+ return;
+ }
+ RegionEntry entry = region.getRegionEntry(this.key);
+ if (entry == null) {
+ return;
+ }
+ try {
+ latestLastAccessTime = entry.getLastAccessed();
+ } catch (InternalStatisticsDisabledException ignored) {
+ // last access time is not available
}
ReplyMessage.send(getSender(), this.processorId, latestLastAccessTime, dm);
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/LatestLastAccessTimeMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/LatestLastAccessTimeMessageTest.java
new file mode 100644
index 0000000..1ddd31e
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/LatestLastAccessTimeMessageTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.mockito.Mockito.mock;
+
+import java.util.Collections;
+import java.util.Set;
+
+import org.junit.Test;
+
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.inet.LocalHostUtil;
+
+public class LatestLastAccessTimeMessageTest {
+
+ @Test
+ public void processMessageShouldLookForNullCache() throws Exception {
+ final DistributionManager distributionManager = mock(DistributionManager.class);
+ final LatestLastAccessTimeReplyProcessor replyProcessor =
+ mock(LatestLastAccessTimeReplyProcessor.class);
+ final InternalDistributedRegion region = mock(InternalDistributedRegion.class);
+ Set<InternalDistributedMember> recipients = Collections.singleton(new InternalDistributedMember(
+ LocalHostUtil.getLocalHost(), 1234));
+ final LatestLastAccessTimeMessage<String> lastAccessTimeMessage =
+ new LatestLastAccessTimeMessage<>(replyProcessor, recipients, region, "foo");
+ lastAccessTimeMessage.process(mock(ClusterDistributionManager.class));
+ }
+}
diff --git a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMembershipJUnitTest.java b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMembershipJUnitTest.java
index 0e19f70..49b8b3a 100644
--- a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMembershipJUnitTest.java
+++ b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/GMSMembershipJUnitTest.java
@@ -28,9 +28,6 @@ import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -61,11 +58,7 @@ import org.apache.geode.distributed.internal.membership.gms.Services.Stopper;
import org.apache.geode.distributed.internal.membership.gms.interfaces.HealthMonitor;
import org.apache.geode.distributed.internal.membership.gms.interfaces.JoinLeave;
import org.apache.geode.distributed.internal.membership.gms.interfaces.Messenger;
-import org.apache.geode.distributed.internal.membership.gms.messages.AbstractGMSMessage;
import org.apache.geode.internal.serialization.DSFIDSerializer;
-import org.apache.geode.internal.serialization.DeserializationContext;
-import org.apache.geode.internal.serialization.SerializationContext;
-import org.apache.geode.internal.serialization.Version;
import org.apache.geode.internal.serialization.internal.DSFIDSerializerImpl;
import org.apache.geode.test.junit.categories.MembershipTest;
@@ -335,27 +328,4 @@ public class GMSMembershipJUnitTest {
assertThat(spy.getStartupEvents()).isEmpty();
}
- public static class TestMessage extends AbstractGMSMessage {
-
- @Override
- public int getDSFID() {
- return HIGH_PRIORITY_ACKED_MESSAGE;
- }
-
- @Override
- public void toData(DataOutput out, SerializationContext context) throws IOException {
-
- }
-
- @Override
- public void fromData(DataInput in, DeserializationContext context)
- throws IOException, ClassNotFoundException {
-
- }
-
- @Override
- public Version[] getSerializationVersions() {
- return null;
- }
- }
}
diff --git a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/TestMessage.java b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/TestMessage.java
new file mode 100644
index 0000000..8da2dac
--- /dev/null
+++ b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/TestMessage.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.distributed.internal.membership.gms;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.geode.distributed.internal.membership.gms.messages.AbstractGMSMessage;
+import org.apache.geode.internal.serialization.DeserializationContext;
+import org.apache.geode.internal.serialization.SerializationContext;
+import org.apache.geode.internal.serialization.Version;
+
+public class TestMessage extends AbstractGMSMessage {
+
+ @Override
+ public int getDSFID() {
+ return HIGH_PRIORITY_ACKED_MESSAGE;
+ }
+
+ @Override
+ public void toData(DataOutput out, SerializationContext context) throws IOException {
+
+ }
+
+ @Override
+ public void fromData(DataInput in, DeserializationContext context)
+ throws IOException, ClassNotFoundException {
+
+ }
+
+ @Override
+ public Version[] getSerializationVersions() {
+ return null;
+ }
+}
diff --git a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
index 0e0748d..fe52eb8 100755
--- a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
+++ b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
@@ -25,6 +25,7 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
@@ -61,6 +62,7 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.apache.geode.distributed.internal.membership.api.MemberDisconnectedException;
@@ -74,6 +76,7 @@ import org.apache.geode.distributed.internal.membership.gms.GMSMembershipView;
import org.apache.geode.distributed.internal.membership.gms.MemberIdentifierImpl;
import org.apache.geode.distributed.internal.membership.gms.Services;
import org.apache.geode.distributed.internal.membership.gms.Services.Stopper;
+import org.apache.geode.distributed.internal.membership.gms.TestMessage;
import org.apache.geode.distributed.internal.membership.gms.interfaces.HealthMonitor;
import org.apache.geode.distributed.internal.membership.gms.interfaces.JoinLeave;
import org.apache.geode.distributed.internal.membership.gms.interfaces.Manager;
@@ -823,6 +826,24 @@ public class JGroupsMessengerJUnitTest {
assertTrue(pinger.isPongMessage(m.getBuffer()));
}
+ /**
+ * messages for the Manager that were queued by a quorum checker shouldn't be delivered to
+ * a Manager
+ */
+ @Test
+ public void testIgnoreManagerMessagesFromQuorumChecker() throws Exception {
+ initMocks(false);
+ MemberIdentifier memberIdentifier = createAddress(8888);
+ JGAddress jgAddress = new JGAddress(memberIdentifier);
+
+ ArgumentCaptor<Message> valueCapture = ArgumentCaptor.forClass(Message.class);
+ doNothing().when(manager).processMessage(valueCapture.capture());
+ org.jgroups.Message jgroupsMessage = messenger.createJGMessage(new TestMessage(), jgAddress,
+ memberIdentifier, Version.CURRENT_ORDINAL);
+ messenger.jgroupsReceiver.receive(jgroupsMessage, true);
+ assertThat(valueCapture.getAllValues()).isEmpty();
+ }
+
@Test
public void testJGroupsIOExceptionHandler() throws Exception {
initMocks(false);
diff --git a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
index ca53906..da67d36 100644
--- a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
+++ b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
@@ -82,6 +82,7 @@ import org.apache.geode.distributed.internal.membership.gms.GMSMembershipView;
import org.apache.geode.distributed.internal.membership.gms.GMSUtil;
import org.apache.geode.distributed.internal.membership.gms.Services;
import org.apache.geode.distributed.internal.membership.gms.interfaces.HealthMonitor;
+import org.apache.geode.distributed.internal.membership.gms.interfaces.Manager;
import org.apache.geode.distributed.internal.membership.gms.interfaces.MessageHandler;
import org.apache.geode.distributed.internal.membership.gms.interfaces.Messenger;
import org.apache.geode.distributed.internal.membership.gms.locator.FindCoordinatorRequest;
@@ -178,7 +179,7 @@ public class JGroupsMessenger<ID extends MemberIdentifier> implements Messenger<
* The JGroupsReceiver is handed messages by the JGroups Channel. It is responsible
* for deserializating and dispatching those messages to the appropriate handler
*/
- private JGroupsReceiver jgroupsReceiver;
+ protected JGroupsReceiver jgroupsReceiver;
public static void setChannelReceiver(JChannel channel, Receiver r) {
try {
@@ -1266,7 +1267,7 @@ public class JGroupsMessenger<ID extends MemberIdentifier> implements Messenger<
receive(jgmsg, false);
}
- private void receive(org.jgroups.Message jgmsg, boolean fromQuorumChecker) {
+ protected void receive(org.jgroups.Message jgmsg, boolean fromQuorumChecker) {
long startTime = services.getStatistics().startUDPDispatchRequest();
try {
if (services.getManager().shutdownInProgress()) {
@@ -1320,9 +1321,12 @@ public class JGroupsMessenger<ID extends MemberIdentifier> implements Messenger<
}
filterIncomingMessage(msg);
MessageHandler<Message<ID>> handler = getMessageHandler(msg);
- if (fromQuorumChecker && handler instanceof HealthMonitor) {
+ if (fromQuorumChecker
+ && (handler instanceof HealthMonitor || handler instanceof Manager)) {
// ignore suspect / heartbeat messages that happened during
- // auto-reconnect because they very likely have old member IDs in them
+ // auto-reconnect because they very likely have old member IDs in them.
+ // Also ignore non-membership messages because we weren't a member when we received
+ // them.
} else {
handler.processMessage(msg);
}