You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by mc...@apache.org on 2018/11/21 02:50:01 UTC

[geode] branch develop updated: GEODE-6065: Continue event processing when hostname lookup fails (#2883)

This is an automated email from the ASF dual-hosted git repository.

mcmellawatt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 7cd2f0c  GEODE-6065: Continue event processing when hostname lookup fails (#2883)
7cd2f0c is described below

commit 7cd2f0c5dfc982148352acbfbb303afaa1358c2a
Author: Ryan McMahon <rm...@pivotal.io>
AuthorDate: Tue Nov 20 18:49:51 2018 -0800

    GEODE-6065: Continue event processing when hostname lookup fails (#2883)
    
    Co-authored-by: Ryan McMahon <rm...@pivotal.io>
    Co-authored-by: Bill Burcham <bb...@pivotal.io>
---
 .../client/internal/pooling/PooledConnection.java  |   2 +-
 .../membership/InternalDistributedMember.java      |  22 ++-
 ...SenderEventRemoteDispatcherIntegrationTest.java | 189 +++++++++++++++++++++
 3 files changed, 208 insertions(+), 5 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/PooledConnection.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/PooledConnection.java
index cb155e6..3cfbb31 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/PooledConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/PooledConnection.java
@@ -37,7 +37,7 @@ import org.apache.geode.internal.cache.tier.sockets.ServerQueueStatus;
  * @since GemFire 5.7
  *
  */
-class PooledConnection implements Connection {
+public class PooledConnection implements Connection {
 
   /*
    * connection is volatile because we may asynchronously destroy the pooled connection while
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/InternalDistributedMember.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/InternalDistributedMember.java
index 5aaf5c6..cb86b02 100755
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/InternalDistributedMember.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/InternalDistributedMember.java
@@ -30,9 +30,9 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.geode.DataSerializer;
-import org.apache.geode.GemFireConfigException;
 import org.apache.geode.InternalGemFireError;
 import org.apache.geode.cache.UnsupportedVersionException;
+import org.apache.geode.cache.client.ServerConnectivityException;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.DurableClientAttributes;
 import org.apache.geode.distributed.Role;
@@ -94,6 +94,19 @@ public class InternalDistributedMember implements DistributedMember, Externaliza
   /** product version bit flag */
   private static final int VERSION_BIT = 0x8;
 
+  @FunctionalInterface
+  public interface HostnameResolver {
+    InetAddress getInetAddress(ServerLocation location) throws UnknownHostException;
+  }
+
+  public static void setHostnameResolver(final HostnameResolver hostnameResolver) {
+    InternalDistributedMember.hostnameResolver = hostnameResolver;
+  }
+
+  /** Retrieves an InetAddress given the provided hostname */
+  private static HostnameResolver hostnameResolver =
+      (location) -> InetAddress.getByName(location.getHostName());
+
   /**
    * Representing the host name of this member.
    */
@@ -213,12 +226,13 @@ public class InternalDistributedMember implements DistributedMember, Externaliza
 
   public InternalDistributedMember(ServerLocation location) {
     this.hostName = location.getHostName();
-    InetAddress addr = null;
+    final InetAddress addr;
     try {
-      addr = InetAddress.getByName(this.hostName);
+      addr = hostnameResolver.getInetAddress(location);
     } catch (UnknownHostException e) {
-      throw new GemFireConfigException("Unable to resolve server location " + location, e);
+      throw new ServerConnectivityException("Unable to resolve server location " + location, e);
     }
+
     netMbr = MemberFactory.newNetMember(addr, location.getPort());
     netMbr.setVmKind(ClusterDistributionManager.NORMAL_DM_TYPE);
     versionObj = Version.CURRENT;
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherIntegrationTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherIntegrationTest.java
new file mode 100644
index 0000000..a48c401
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherIntegrationTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan;
+
+import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.LinkedList;
+import java.util.Properties;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.Statistics;
+import org.apache.geode.cache.client.internal.Connection;
+import org.apache.geode.cache.client.internal.Endpoint;
+import org.apache.geode.cache.client.internal.EndpointManager;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import org.apache.geode.cache.client.internal.pooling.PooledConnection;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.ServerLocation;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.distributed.internal.membership.gms.membership.HostAddress;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.PoolFactoryImpl;
+import org.apache.geode.internal.cache.PoolManagerImpl;
+import org.apache.geode.internal.monitoring.ThreadsMonitoring;
+import org.apache.geode.internal.net.SSLConfigurationFactory;
+import org.apache.geode.internal.security.SecurableCommunicationChannel;
+
+public class GatewaySenderEventRemoteDispatcherIntegrationTest {
+
+  /*
+   * Sometimes hostname lookup is flaky. We don't want such a failure to cripple our
+   * event processor.
+   *
+   * This test assumes hostname lookup (of IP number) succeeds when establishing the initial
+   * connection, but fails when constructing the InternalDistributedSystem object in response to a
+   * remote server crash.
+   */
+  @Test
+  public void canProcessesEventAfterHostnameLookupFailsInNotifyServerCrashed() throws Exception {
+
+    final PoolImpl pool = getPool();
+
+    final ServerLocation serverLocation = mock(ServerLocation.class);
+
+    final AbstractGatewaySenderEventProcessor eventProcessor =
+        getMockedAbstractGatewaySenderEventProcessor(pool, serverLocation);
+
+    final Endpoint endpoint = getMockedEndpoint(serverLocation);
+    final Connection connection = getMockedConnection(serverLocation, endpoint);
+
+    /*
+     * In order for listeners to be notified, the endpoint must be referenced by the
+     * endpointManager so that it can be removed when the RuntimeException() is thrown by the
+     * connection
+     */
+    final EndpointManager endpointManager = pool.getEndpointManager();
+    endpointManager.referenceEndpoint(serverLocation, mock(InternalDistributedMember.class));
+
+    final GatewaySenderEventRemoteDispatcher dispatcher =
+        new GatewaySenderEventRemoteDispatcher(eventProcessor, connection);
+
+    /*
+     * Set a HostnameResolver which simulates a failed
+     * hostname lookup resulting in an UnknownHostException
+     */
+    InternalDistributedMember.setHostnameResolver(ignored -> {
+      throw new UnknownHostException("a.b.c");
+    });
+
+    /*
+     * We have mocked our connection to throw a RuntimeException when readAcknowledgement() is
+     * called, then in the exception handling for that RuntimeException, the UnknownHostException
+     * will be thrown when trying to notify listeners of the crash.
+     */
+    dispatcher.readAcknowledgement();
+
+    /*
+     * Need to reset the hostname resolver to a real InetAddress resolver as it is static state and
+     * we do not want it to throw an UnknownHostException in subsequent test runs.
+     */
+    InternalDistributedMember
+        .setHostnameResolver((location) -> InetAddress.getByName(location.getHostName()));
+
+    /*
+     * The handling of the UnknownHostException should not result in the event processor being
+     * stopped, so assert that setIsStopped(true) was never called.
+     */
+    verify(eventProcessor, Mockito.times(0)).setIsStopped(true);
+  }
+
+  private PoolImpl getPool() {
+    final DistributionConfig distributionConfig = mock(DistributionConfig.class);
+    doReturn(new SecurableCommunicationChannel[] {}).when(distributionConfig)
+        .getSecurableCommunicationChannels();
+
+    SSLConfigurationFactory.setDistributionConfig(distributionConfig);
+
+    final Properties properties = new Properties();
+    properties.put(DURABLE_CLIENT_ID, "1");
+
+    final Statistics statistics = mock(Statistics.class);
+
+    final PoolFactoryImpl.PoolAttributes poolAttributes =
+        mock(PoolFactoryImpl.PoolAttributes.class);
+    /*
+     * These are the minimum pool attributes required
+     * so that basic validation and setup completes successfully. The values of
+     * these attributes have no importance to the assertions of the test itself.
+     */
+    doReturn(1).when(poolAttributes).getMaxConnections();
+    doReturn((long) 10e8).when(poolAttributes).getPingInterval();
+
+    final CancelCriterion cancelCriterion = mock(CancelCriterion.class);
+
+    final InternalCache internalCache = mock(InternalCache.class);
+    doReturn(cancelCriterion).when(internalCache).getCancelCriterion();
+
+    final InternalDistributedSystem internalDistributedSystem =
+        mock(InternalDistributedSystem.class);
+    doReturn(distributionConfig).when(internalDistributedSystem).getConfig();
+    doReturn(properties).when(internalDistributedSystem).getProperties();
+    doReturn(statistics).when(internalDistributedSystem).createAtomicStatistics(any(), anyString());
+
+    final PoolManagerImpl poolManager = mock(PoolManagerImpl.class);
+    doReturn(true).when(poolManager).isNormal();
+
+    final ThreadsMonitoring tMonitoring = mock(ThreadsMonitoring.class);
+
+    return PoolImpl.create(poolManager, "pool", poolAttributes, new LinkedList<HostAddress>(),
+        internalDistributedSystem, internalCache, tMonitoring);
+  }
+
+  private Connection getMockedConnection(ServerLocation serverLocation, Endpoint endpoint)
+      throws Exception {
+    /*
+     * Mock the connection to throw a RuntimeException() when connection.Execute() is called,
+     * so that we attempt to notify listeners in the exception handling logic in
+     * OpExecutorImpl.executeWithPossibleReAuthentication()
+     */
+    final Connection connection = mock(PooledConnection.class);
+    doReturn(serverLocation).when(connection).getServer();
+    doReturn(endpoint).when(connection).getEndpoint();
+    doThrow(new RuntimeException()).when(connection).execute(any());
+    return connection;
+  }
+
+  private AbstractGatewaySenderEventProcessor getMockedAbstractGatewaySenderEventProcessor(
+      PoolImpl pool, ServerLocation serverLocation) {
+    final AbstractGatewaySender abstractGatewaySender = mock(AbstractGatewaySender.class);
+    doReturn(serverLocation).when(abstractGatewaySender).getServerLocation();
+    doReturn(pool).when(abstractGatewaySender).getProxy();
+
+    final AbstractGatewaySenderEventProcessor eventProcessor =
+        mock(AbstractGatewaySenderEventProcessor.class);
+    doReturn(abstractGatewaySender).when(eventProcessor).getSender();
+    return eventProcessor;
+  }
+
+  private Endpoint getMockedEndpoint(ServerLocation serverLocation) {
+    final Endpoint endpoint = mock(Endpoint.class);
+    doReturn(serverLocation).when(endpoint).getLocation();
+    return endpoint;
+  }
+
+}