You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ka...@apache.org on 2022/04/13 07:32:18 UTC

[geode] branch develop updated: GEODE-9512: close receiver connection if membership check timed out (#7409)

This is an automated email from the ASF dual-hosted git repository.

kamilla pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new ee3fb5b985 GEODE-9512: close receiver connection if membership check timed out (#7409)
ee3fb5b985 is described below

commit ee3fb5b9859396583a1666c5890b014522657d67
Author: Kamilla Aslami <ka...@vmware.com>
AuthorDate: Wed Apr 13 00:32:10 2022 -0700

    GEODE-9512: close receiver connection if membership check timed out (#7409)
    
    If a reader thread performs membership check and it fails, we should close the connection as the sender didn't pass the authentication check.
---
 .../org/apache/geode/internal/tcp/Connection.java  |  26 +--
 .../internal/tcp/ConnectionTransmissionTest.java   | 257 +++++++++++++++++++++
 2 files changed, 267 insertions(+), 16 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 57bb4880ca..44205d4d63 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -2910,7 +2910,10 @@ public class Connection implements Runnable {
     inputBuffer.limit(inputBuffer.capacity());
   }
 
-  private boolean readHandshakeForReceiver(final DataInput dis) {
+  /**
+   * Returns true if handshake was read successfully, false otherwise.
+   */
+  boolean readHandshakeForReceiver(final DataInput dis) {
     try {
       checkHandshakeInitialByte(dis);
       checkHandshakeVersion(dis);
@@ -2960,24 +2963,15 @@ public class Connection implements Runnable {
       final boolean isSecure = authInit != null && !authInit.isEmpty();
 
       if (isSecure) {
-        if (owner.getConduit().waitForMembershipCheck(remoteMember)) {
-          sendOKHandshakeReply();
-          notifyHandshakeWaiter(true);
-        } else {
-          // check if we need notifyHandshakeWaiter() call.
+        if (!owner.getConduit().waitForMembershipCheck(remoteMember)) {
           notifyHandshakeWaiter(false);
-          logger.warn("{} timed out during a membership check.",
-              p2pReaderName());
+          logger.warn("{} timed out during a membership check.", p2pReaderName());
+          requestClose("timed out during a membership check");
           return true;
         }
-      } else {
-        sendOKHandshakeReply();
-        try {
-          notifyHandshakeWaiter(true);
-        } catch (Exception e) {
-          logger.fatal("Uncaught exception from listener", e);
-        }
       }
+      sendOKHandshakeReply();
+      notifyHandshakeWaiter(true);
       finishedConnecting = true;
     } catch (IOException ex) {
       final String err = "Failed sending handshake reply";
@@ -3047,7 +3041,7 @@ public class Connection implements Runnable {
     return false;
   }
 
-  private void readMessage(ByteBuffer peerDataBuffer, AbstractExecutor threadMonitorExecutor) {
+  void readMessage(ByteBuffer peerDataBuffer, AbstractExecutor threadMonitorExecutor) {
     if (messageType == NORMAL_MSG_TYPE) {
       owner.getConduit().getStats().incMessagesBeingReceived(true, messageLength);
       try (ByteBufferInputStream bbis =
diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTransmissionTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTransmissionTest.java
new file mode 100644
index 0000000000..5a041eb167
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTransmissionTest.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.tcp;
+
+import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_PEER_AUTH_INIT;
+import static org.apache.geode.distributed.internal.DistributionConfig.DEFAULT_SOCKET_BUFFER_SIZE;
+import static org.apache.geode.distributed.internal.DistributionConfigImpl.SECURITY_SYSTEM_PREFIX;
+import static org.apache.geode.internal.inet.LocalHostUtil.getLocalHost;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.distributed.internal.DMStats;
+import org.apache.geode.distributed.internal.Distribution;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.ReplyMessage;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.distributed.internal.membership.api.Membership;
+import org.apache.geode.internal.monitoring.ThreadsMonitoring;
+import org.apache.geode.internal.monitoring.executor.AbstractExecutor;
+import org.apache.geode.internal.net.BufferPool;
+import org.apache.geode.internal.net.SocketCloser;
+import org.apache.geode.test.junit.categories.MembershipTest;
+
+@Category(MembershipTest.class)
+public class ConnectionTransmissionTest {
+
+  /**
+   * Create a sender connection and a receiver connection and pass data from
+   * one to the other.
+   *
+   * This test uses a real socket, but mocks all other collaborators
+   * of connection, such as the InternalDistributedSystem.
+   */
+  @Test
+  public void testDataTransmittedBetweenSenderAndReceiverIfMembershipCheckPassed()
+      throws Exception {
+    final Connection reader = createConnectionsAndWriteMessage(true, false, false);
+
+    await().untilAsserted(() -> verify(reader, times(1)).readMessage(any(), any()));
+    assertThat(reader.isClosing()).isFalse();
+    verify(reader).readHandshakeForReceiver(any());
+    verify(reader).readMessage(any(), any());
+    verify(reader, times(0)).requestClose(any());
+  }
+
+  @Test
+  public void testReceiverClosesConnectionIfMembershipCheckFailed() throws Exception {
+    final Connection reader = createConnectionsAndWriteMessage(false, true, true);
+
+    await().untilAsserted(() -> assertThat(assertThat(reader.isClosing()).isTrue()));
+    verify(reader).readHandshakeForReceiver(any());
+    verify(reader).requestClose("timed out during a membership check");
+  }
+
+  private Connection createConnectionsAndWriteMessage(final boolean isSenderInView,
+      final boolean isCancelInProgress, final boolean waitUntilReaderExits)
+      throws IOException, InterruptedException, ExecutionException {
+    final DMStats stats = mock(DMStats.class);
+    final BufferPool bufferPool = new BufferPool(stats);
+    final ServerSocketChannel acceptorSocket = createReceiverSocket();
+
+    final int serverSocketPort = acceptorSocket.socket().getLocalPort();
+    final CompletableFuture<Connection> readerFuture =
+        createReaderFuture(acceptorSocket, isSenderInView);
+
+    final Connection sender =
+        createWriter(serverSocketPort, isCancelInProgress, waitUntilReaderExits);
+
+    final Connection reader = readerFuture.get();
+    final ReplyMessage msg = createReplyMessage(sender);
+
+    final List<Connection> connections = new ArrayList<>();
+    connections.add(sender);
+
+    final BaseMsgStreamer streamer = MsgStreamer.create(connections, msg, false, stats, bufferPool);
+    streamer.writeMessage();
+    return reader;
+  }
+
+  /**
+   * Start an asynchronous runnable that is waiting for a sender to connect to the socket
+   * When the sender connects, this runnable will create a receiver connection and
+   * return it to the future.
+   */
+  private CompletableFuture<Connection> createReaderFuture(final ServerSocketChannel acceptorSocket,
+      final boolean isSenderInView) {
+    return CompletableFuture.supplyAsync(
+        () -> createReceiverConnectionOnFirstAccept(acceptorSocket, isSenderInView));
+  }
+
+  /**
+   * Creates a socket for the receiver side. This is the server socket listening for connections.
+   */
+  private ServerSocketChannel createReceiverSocket() throws IOException {
+    final ServerSocketChannel acceptorSocket = ServerSocketChannel.open();
+    acceptorSocket.bind(new InetSocketAddress(InetAddress.getLocalHost(), 0));
+    return acceptorSocket;
+  }
+
+  /**
+   * Creates a dummy reply message.
+   */
+  private ReplyMessage createReplyMessage(final Connection sender) {
+    final ReplyMessage msg = new ReplyMessage();
+    msg.setProcessorId(1);
+    msg.setRecipient(sender.getRemoteAddress());
+    return msg;
+  }
+
+  /**
+   * Create a sender that connects to the server socket.
+   */
+  private Connection createWriter(final int serverSocketPort, final boolean isCancelInProgress,
+      boolean waitUntilReaderExits)
+      throws IOException {
+    final ConnectionTable writerTable = mockConnectionTable(waitUntilReaderExits);
+
+    final Membership<InternalDistributedMember> membership = mock(Membership.class);
+    final TCPConduit conduit = writerTable.getConduit();
+    final InternalDistributedMember remoteAddr =
+        new InternalDistributedMember(InetAddress.getLocalHost(), 0, true, true);
+    final InternalDistributedMember senderAddr =
+        new InternalDistributedMember(InetAddress.getLocalHost(), 1, true, true);
+
+    when(conduit.getCancelCriterion().isCancelInProgress()).thenReturn(isCancelInProgress);
+    when(conduit.getMembership()).thenReturn(membership);
+    when(conduit.getDM().getCanonicalId(remoteAddr)).thenReturn(remoteAddr);
+    when(conduit.getDM().getCanonicalId(senderAddr)).thenReturn(senderAddr);
+    when(conduit.getMemberId()).thenReturn(senderAddr);
+    when(membership.memberExists(any())).thenReturn(true);
+
+    remoteAddr.setDirectChannelPort(serverSocketPort);
+    senderAddr.setDirectChannelPort(conduit.getPort());
+
+    return spy(Connection.createSender(membership, writerTable, true, remoteAddr, true,
+        System.currentTimeMillis(), 1000, 1000));
+  }
+
+  private Connection createReceiverConnectionOnFirstAccept(final ServerSocketChannel acceptorSocket,
+      final boolean isSenderInView) {
+    try {
+      final SocketChannel readerSocket = acceptorSocket.accept();
+      final ConnectionTable readerTable = mockConnectionTable(false);
+      if (isSenderInView) {
+        when(readerTable.getConduit().waitForMembershipCheck(any())).thenReturn(true);
+      }
+
+      final Connection reader = spy(new Connection(readerTable, readerSocket.socket()));
+      CompletableFuture.runAsync(() -> {
+        try {
+          reader.initReceiver();
+        } catch (final RuntimeException e) {
+          e.printStackTrace();
+          throw e;
+        }
+      });
+      return reader;
+    } catch (final Exception e) {
+      e.printStackTrace();
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * @param waitUntilReaderExits if true, start reader thread and wait until it exits,
+   *        otherwise run it asynchronously.
+   */
+  private ConnectionTable mockConnectionTable(final boolean waitUntilReaderExits)
+      throws UnknownHostException {
+    final ConnectionTable connectionTable = mock(ConnectionTable.class);
+    final Distribution distribution = mock(Distribution.class);
+    final DistributionManager distributionManager = mock(DistributionManager.class);
+    final DMStats dmStats = mock(DMStats.class);
+    final CancelCriterion stopper = mock(CancelCriterion.class);
+    final SocketCloser socketCloser = mock(SocketCloser.class);
+    final TCPConduit tcpConduit = mock(TCPConduit.class);
+    final ThreadsMonitoring threadMonitoring = mock(ThreadsMonitoring.class);
+    final AbstractExecutor threadMonitoringExecutor = mock(AbstractExecutor.class);
+    final DistributionConfig config = mock(DistributionConfig.class);
+
+    System.setProperty(SECURITY_SYSTEM_PREFIX + SECURITY_PEER_AUTH_INIT, "true");
+    tcpConduit.tcpBufferSize = DEFAULT_SOCKET_BUFFER_SIZE;
+
+    when(connectionTable.getBufferPool()).thenReturn(new BufferPool(dmStats));
+    when(connectionTable.getConduit()).thenReturn(tcpConduit);
+    when(connectionTable.getDM()).thenReturn(distributionManager);
+    when(connectionTable.getSocketCloser()).thenReturn(socketCloser);
+
+    when(distributionManager.getConfig()).thenReturn(config);
+    when(distributionManager.getDistribution()).thenReturn(distribution);
+    when(distributionManager.getThreadMonitoring()).thenReturn(threadMonitoring);
+
+    when(tcpConduit.getDM()).thenReturn(distributionManager);
+    when(tcpConduit.getCancelCriterion()).thenReturn(stopper);
+    when(tcpConduit.getSocketId()).thenReturn(new InetSocketAddress(getLocalHost(), 10337));
+    when(tcpConduit.getStats()).thenReturn(dmStats);
+    when(tcpConduit.getConfig()).thenReturn(config);
+
+    when(stopper.cancelInProgress()).thenReturn(null);
+    when(threadMonitoring.createAbstractExecutor(any())).thenReturn(threadMonitoringExecutor);
+
+    doAnswer(invocationOnMock -> {
+      final Runnable runnable = (invocationOnMock.getArgument(0));
+      if (waitUntilReaderExits) {
+        startReader(runnable);
+      } else {
+        CompletableFuture.runAsync(() -> startReader(runnable));
+      }
+      return null;
+    }).when(connectionTable).executeCommand(any());
+    return connectionTable;
+  }
+
+  private void startReader(Runnable runnable) {
+    try {
+      runnable.run();
+    } catch (final Exception e) {
+      e.printStackTrace();
+      throw e;
+    }
+  }
+}