You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by mc...@apache.org on 2018/11/21 02:50:01 UTC
[geode] branch develop updated: GEODE-6065: Continue event
processing when hostname lookup fails (#2883)
This is an automated email from the ASF dual-hosted git repository.
mcmellawatt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 7cd2f0c GEODE-6065: Continue event processing when hostname lookup fails (#2883)
7cd2f0c is described below
commit 7cd2f0c5dfc982148352acbfbb303afaa1358c2a
Author: Ryan McMahon <rm...@pivotal.io>
AuthorDate: Tue Nov 20 18:49:51 2018 -0800
GEODE-6065: Continue event processing when hostname lookup fails (#2883)
Co-authored-by: Ryan McMahon <rm...@pivotal.io>
Co-authored-by: Bill Burcham <bb...@pivotal.io>
---
.../client/internal/pooling/PooledConnection.java | 2 +-
.../membership/InternalDistributedMember.java | 22 ++-
...SenderEventRemoteDispatcherIntegrationTest.java | 189 +++++++++++++++++++++
3 files changed, 208 insertions(+), 5 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/PooledConnection.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/PooledConnection.java
index cb155e6..3cfbb31 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/PooledConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/PooledConnection.java
@@ -37,7 +37,7 @@ import org.apache.geode.internal.cache.tier.sockets.ServerQueueStatus;
* @since GemFire 5.7
*
*/
-class PooledConnection implements Connection {
+public class PooledConnection implements Connection {
/*
* connection is volatile because we may asynchronously destroy the pooled connection while
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/InternalDistributedMember.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/InternalDistributedMember.java
index 5aaf5c6..cb86b02 100755
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/InternalDistributedMember.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/InternalDistributedMember.java
@@ -30,9 +30,9 @@ import java.util.List;
import java.util.Set;
import org.apache.geode.DataSerializer;
-import org.apache.geode.GemFireConfigException;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.cache.UnsupportedVersionException;
+import org.apache.geode.cache.client.ServerConnectivityException;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DurableClientAttributes;
import org.apache.geode.distributed.Role;
@@ -94,6 +94,19 @@ public class InternalDistributedMember implements DistributedMember, Externaliza
/** product version bit flag */
private static final int VERSION_BIT = 0x8;
+ @FunctionalInterface
+ public interface HostnameResolver {
+ InetAddress getInetAddress(ServerLocation location) throws UnknownHostException;
+ }
+
+ public static void setHostnameResolver(final HostnameResolver hostnameResolver) {
+ InternalDistributedMember.hostnameResolver = hostnameResolver;
+ }
+
+ /** Retrieves an InetAddress given the provided hostname */
+ private static HostnameResolver hostnameResolver =
+ (location) -> InetAddress.getByName(location.getHostName());
+
/**
* Representing the host name of this member.
*/
@@ -213,12 +226,13 @@ public class InternalDistributedMember implements DistributedMember, Externaliza
public InternalDistributedMember(ServerLocation location) {
this.hostName = location.getHostName();
- InetAddress addr = null;
+ final InetAddress addr;
try {
- addr = InetAddress.getByName(this.hostName);
+ addr = hostnameResolver.getInetAddress(location);
} catch (UnknownHostException e) {
- throw new GemFireConfigException("Unable to resolve server location " + location, e);
+ throw new ServerConnectivityException("Unable to resolve server location " + location, e);
}
+
netMbr = MemberFactory.newNetMember(addr, location.getPort());
netMbr.setVmKind(ClusterDistributionManager.NORMAL_DM_TYPE);
versionObj = Version.CURRENT;
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherIntegrationTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherIntegrationTest.java
new file mode 100644
index 0000000..a48c401
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherIntegrationTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan;
+
+import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.LinkedList;
+import java.util.Properties;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.Statistics;
+import org.apache.geode.cache.client.internal.Connection;
+import org.apache.geode.cache.client.internal.Endpoint;
+import org.apache.geode.cache.client.internal.EndpointManager;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import org.apache.geode.cache.client.internal.pooling.PooledConnection;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.ServerLocation;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.distributed.internal.membership.gms.membership.HostAddress;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.PoolFactoryImpl;
+import org.apache.geode.internal.cache.PoolManagerImpl;
+import org.apache.geode.internal.monitoring.ThreadsMonitoring;
+import org.apache.geode.internal.net.SSLConfigurationFactory;
+import org.apache.geode.internal.security.SecurableCommunicationChannel;
+
+public class GatewaySenderEventRemoteDispatcherIntegrationTest {
+
+ /*
+ * Sometimes hostname lookup is flaky. We don't want such a failure to cripple our
+ * event processor.
+ *
+ * This test assumes hostname lookup (of IP number) succeeds when establishing the initial
+ * connection, but fails when constructing the InternalDistributedSystem object in response to a
+ * remote server crash.
+ */
+ @Test
+ public void canProcessesEventAfterHostnameLookupFailsInNotifyServerCrashed() throws Exception {
+
+ final PoolImpl pool = getPool();
+
+ final ServerLocation serverLocation = mock(ServerLocation.class);
+
+ final AbstractGatewaySenderEventProcessor eventProcessor =
+ getMockedAbstractGatewaySenderEventProcessor(pool, serverLocation);
+
+ final Endpoint endpoint = getMockedEndpoint(serverLocation);
+ final Connection connection = getMockedConnection(serverLocation, endpoint);
+
+ /*
+ * In order for listeners to be notified, the endpoint must be referenced by the
+ * endpointManager so that it can be removed when the RuntimeException() is thrown by the
+ * connection
+ */
+ final EndpointManager endpointManager = pool.getEndpointManager();
+ endpointManager.referenceEndpoint(serverLocation, mock(InternalDistributedMember.class));
+
+ final GatewaySenderEventRemoteDispatcher dispatcher =
+ new GatewaySenderEventRemoteDispatcher(eventProcessor, connection);
+
+ /*
+ * Set a HostnameResolver which simulates a failed
+ * hostname lookup resulting in an UnknownHostException
+ */
+ InternalDistributedMember.setHostnameResolver(ignored -> {
+ throw new UnknownHostException("a.b.c");
+ });
+
+ /*
+ * We have mocked our connection to throw a RuntimeException when readAcknowledgement() is
+ * called, then in the exception handling for that RuntimeException, the UnknownHostException
+ * will be thrown when trying to notify listeners of the crash.
+ */
+ dispatcher.readAcknowledgement();
+
+ /*
+ * Need to reset the hostname resolver to a real InetAddress resolver as it is static state and
+ * we do not want it to throw an UnknownHostException in subsequent test runs.
+ */
+ InternalDistributedMember
+ .setHostnameResolver((location) -> InetAddress.getByName(location.getHostName()));
+
+ /*
+ * The handling of the UnknownHostException should not result in the event processor being
+ * stopped, so assert that setIsStopped(true) was never called.
+ */
+ verify(eventProcessor, Mockito.times(0)).setIsStopped(true);
+ }
+
+ private PoolImpl getPool() {
+ final DistributionConfig distributionConfig = mock(DistributionConfig.class);
+ doReturn(new SecurableCommunicationChannel[] {}).when(distributionConfig)
+ .getSecurableCommunicationChannels();
+
+ SSLConfigurationFactory.setDistributionConfig(distributionConfig);
+
+ final Properties properties = new Properties();
+ properties.put(DURABLE_CLIENT_ID, "1");
+
+ final Statistics statistics = mock(Statistics.class);
+
+ final PoolFactoryImpl.PoolAttributes poolAttributes =
+ mock(PoolFactoryImpl.PoolAttributes.class);
+ /*
+ * These are the minimum pool attributes required
+ * so that basic validation and setup completes successfully. The values of
+ * these attributes have no importance to the assertions of the test itself.
+ */
+ doReturn(1).when(poolAttributes).getMaxConnections();
+ doReturn((long) 10e8).when(poolAttributes).getPingInterval();
+
+ final CancelCriterion cancelCriterion = mock(CancelCriterion.class);
+
+ final InternalCache internalCache = mock(InternalCache.class);
+ doReturn(cancelCriterion).when(internalCache).getCancelCriterion();
+
+ final InternalDistributedSystem internalDistributedSystem =
+ mock(InternalDistributedSystem.class);
+ doReturn(distributionConfig).when(internalDistributedSystem).getConfig();
+ doReturn(properties).when(internalDistributedSystem).getProperties();
+ doReturn(statistics).when(internalDistributedSystem).createAtomicStatistics(any(), anyString());
+
+ final PoolManagerImpl poolManager = mock(PoolManagerImpl.class);
+ doReturn(true).when(poolManager).isNormal();
+
+ final ThreadsMonitoring tMonitoring = mock(ThreadsMonitoring.class);
+
+ return PoolImpl.create(poolManager, "pool", poolAttributes, new LinkedList<HostAddress>(),
+ internalDistributedSystem, internalCache, tMonitoring);
+ }
+
+ private Connection getMockedConnection(ServerLocation serverLocation, Endpoint endpoint)
+ throws Exception {
+ /*
+ * Mock the connection to throw a RuntimeException() when connection.Execute() is called,
+ * so that we attempt to notify listeners in the exception handling logic in
+ * OpExecutorImpl.executeWithPossibleReAuthentication()
+ */
+ final Connection connection = mock(PooledConnection.class);
+ doReturn(serverLocation).when(connection).getServer();
+ doReturn(endpoint).when(connection).getEndpoint();
+ doThrow(new RuntimeException()).when(connection).execute(any());
+ return connection;
+ }
+
+ private AbstractGatewaySenderEventProcessor getMockedAbstractGatewaySenderEventProcessor(
+ PoolImpl pool, ServerLocation serverLocation) {
+ final AbstractGatewaySender abstractGatewaySender = mock(AbstractGatewaySender.class);
+ doReturn(serverLocation).when(abstractGatewaySender).getServerLocation();
+ doReturn(pool).when(abstractGatewaySender).getProxy();
+
+ final AbstractGatewaySenderEventProcessor eventProcessor =
+ mock(AbstractGatewaySenderEventProcessor.class);
+ doReturn(abstractGatewaySender).when(eventProcessor).getSender();
+ return eventProcessor;
+ }
+
+ private Endpoint getMockedEndpoint(ServerLocation serverLocation) {
+ final Endpoint endpoint = mock(Endpoint.class);
+ doReturn(serverLocation).when(endpoint).getLocation();
+ return endpoint;
+ }
+
+}