You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2020/11/05 22:32:39 UTC

[geode] branch support/1.12 updated: GEODE-7727: modify sender thread to detect relese of connection (#4751)

This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.12 by this push:
     new 798a245  GEODE-7727: modify sender thread to detect relese of connection (#4751)
798a245 is described below

commit 798a245147835c1e1b0026e863b9816a3ce2c551
Author: Mario Ivanac <48...@users.noreply.github.com>
AuthorDate: Tue Mar 10 06:47:09 2020 +0100

    GEODE-7727: modify sender thread to detect relese of connection (#4751)
    
    * GEODE-7727: modify sender thread to detect relese of connection
    
    * GEODE-7727: Update solution only for shared connections
    
    * GEODE-7727: added test
    
    * GEODE-7727: update ater comments
    
    * GEODE-7727: update test
    
    * GEODE-7727: fix for async write hanging
    
    * GEODE-7727: Test of region operations in the face of closed connections
    
    Adding a test for what happens to region operations when a connection is closed
    out from under the system. This test hangs without the changes to let the
    reader thread keep running.
    
    Fix to test
    
    * GEODE-7727: Preventing a double release of the input buffer
    
    The releaseInputBuffer method was not thread safe. If it is called
    concurrently, it will end up being released twice, which will add the buffer to
    to the buffer pool twice. Later, this could result in two threads using the
    same buffer, resulting in corruption of the buffer.
    
    With the changes for GEODE-7727, we made it likely that releaseInputBuffer
    would be called concurrently. If a member departs, one thread will call
    Connection.close. Connection.close will close the socket and call
    releaseInputBuffer. However, closing the socket will wake up the reader thread,
    which will also call releaseInputBuffer concurrently.
    
    Making releaseInputBuffer thread safe by introducing a lock.
    
    * GEODE-7727: update after merge
    
    * GEODE-7727: update test name
    
    Co-authored-by: Dan Smith <up...@apache.org>
    (cherry picked from commit c8413592e5573f675c538c63ef9ee9f97a349e73)
---
 .../geode/internal/tcp/CloseConnectionTest.java    | 76 ++++++++++++++++++++++
 .../geode/internal/tcp/TCPConduitDUnitTest.java    |  4 +-
 ...erStartupWhenAsyncDistributionTimeoutIsSet.java | 71 ++++++++++++++++++++
 ...butedSystemMXBeanWithAlertsDistributedTest.java |  1 +
 .../org/apache/geode/internal/tcp/Connection.java  | 38 ++++++++---
 .../apache/geode/internal/tcp/ConnectionTable.java |  2 +-
 6 files changed, 181 insertions(+), 11 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/CloseConnectionTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/CloseConnectionTest.java
new file mode 100644
index 0000000..154e908
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/CloseConnectionTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.tcp;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Test;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionImpl;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.cache.CacheTestCase;
+
+public class CloseConnectionTest extends CacheTestCase {
+
+  @Test(timeout = 60_000)
+  public void sharedSenderShouldRecoverFromClosedSocket() {
+    VM vm0 = VM.getVM(0);
+    VM vm1 = VM.getVM(1);
+
+    // Create a region in each member. VM0 has a proxy region, so state must be in VM1
+    vm0.invoke(() -> {
+      getCache().createRegionFactory(RegionShortcut.REPLICATE_PROXY).create("region");
+    });
+    vm1.invoke(() -> {
+      getCache().createRegionFactory(RegionShortcut.REPLICATE).create("region");
+    });
+
+
+    // Force VM1 to close it's connections.
+    vm1.invoke(() -> {
+      ConnectionTable conTable = getConnectionTable();
+      assertThat(conTable.getNumberOfReceivers()).isEqualTo(2);
+      conTable.closeReceivers(false);
+      assertThat(conTable.getNumberOfReceivers()).isEqualTo(0);
+    });
+
+    // See if VM0 noticed the closed connections. Try to do a couple of region
+    // operations
+    vm0.invoke(() -> {
+      Region<Object, Object> region = getCache().getRegion("region");
+      region.put("1", "1");
+
+      assertThat(region.get("1")).isEqualTo("1");
+    });
+
+    // Make sure connections were reestablished
+    vm1.invoke(() -> {
+      ConnectionTable conTable = getConnectionTable();
+      assertThat(conTable.getNumberOfReceivers()).isEqualTo(2);
+    });
+  }
+
+  private ConnectionTable getConnectionTable() {
+    ClusterDistributionManager cdm =
+        (ClusterDistributionManager) getSystem().getDistributionManager();
+    DistributionImpl distribution = (DistributionImpl) cdm.getDistribution();
+    return distribution.getDirectChannel().getConduit().getConTable();
+  }
+
+
+}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitDUnitTest.java
index 698e7fe..41d64c6 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitDUnitTest.java
@@ -94,7 +94,9 @@ public class TCPConduitDUnitTest extends DistributedTestCase {
     vm2.invoke(() -> startServer(properties));
     vm3.invoke(() -> startServer(properties));
 
-    Thread.sleep(5000);
+    await().untilAsserted(() -> {
+      assertThat(ConnectionTable.getNumSenderSharedConnections()).isEqualTo(3);
+    });
 
     // ensure that the closing of a shared/unordered connection to another node does not
     // remove all connections for that node
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TestServerStartupWhenAsyncDistributionTimeoutIsSet.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TestServerStartupWhenAsyncDistributionTimeoutIsSet.java
new file mode 100644
index 0000000..be57e26
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TestServerStartupWhenAsyncDistributionTimeoutIsSet.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.tcp;
+
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.SerialAckedMessage;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+public class TestServerStartupWhenAsyncDistributionTimeoutIsSet implements Serializable {
+  int serversToStart = 3;
+
+  protected static InternalDistributedSystem system =
+      InternalDistributedSystem.getConnectedInstance();
+
+  @Rule
+  public ClusterStartupRule cluster = new ClusterStartupRule(serversToStart + 1);
+
+  MemberVM locator;
+  MemberVM server1;
+  MemberVM server2;
+  MemberVM server3;
+
+  @Before
+  public void setUp() throws Exception {
+    locator = cluster.startLocatorVM(0);
+  }
+
+  private MemberVM startServer(final int vmIndex) {
+    return cluster.startServerVM(
+        vmIndex, s -> s.withConnectionToLocator(locator.getPort())
+            .withProperty("async-distribution-timeout", "5"));
+  }
+
+  @Test
+  public void testServerStartupDoesNotHangWhenAsyncDistributionTimeoutIsSet() {
+    server1 = startServer(1);
+    server2 = startServer(2);
+    server3 = startServer(3);
+    locator.invoke(() -> await().untilAsserted(() -> {
+      assertThat(ConnectionTable.getNumSenderSharedConnections()).isEqualTo(3);
+    }));
+
+    locator.invoke(() -> await("for message to be sent").until(() -> {
+      final SerialAckedMessage serialAckedMessage = new SerialAckedMessage();
+      serialAckedMessage.send(system.getAllOtherMembers(), false);
+      return true;
+    }));
+  }
+}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/management/DistributedSystemMXBeanWithAlertsDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/management/DistributedSystemMXBeanWithAlertsDistributedTest.java
index c549fe4..8aa3ada 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/management/DistributedSystemMXBeanWithAlertsDistributedTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/management/DistributedSystemMXBeanWithAlertsDistributedTest.java
@@ -137,6 +137,7 @@ public class DistributedSystemMXBeanWithAlertsDistributedTest implements Seriali
     memberVM3 = getVM(3);
 
     managerMember = managerVM.invoke(() -> createManager());
+    IgnoredException.addIgnoredException("Cannot form connection to alert listener");
 
     for (VM memberVM : toArray(memberVM1, memberVM2, memberVM3)) {
       memberVM.invoke(() -> {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index d93b75c..306eb4e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -311,6 +311,9 @@ public class Connection implements Runnable {
   /** the buffer used for message receipt */
   private ByteBuffer inputBuffer;
 
+  /** Lock used to protect the input buffer */
+  public final Object inputBufferLock = new Object();
+
   /** the length of the next message to be dispatched */
   private int messageLength;
 
@@ -339,6 +342,7 @@ public class Connection implements Runnable {
 
   private boolean directAck;
 
+  private boolean asyncMode;
 
   /** is this connection used for serial message delivery? */
   boolean preserveOrder;
@@ -483,6 +487,7 @@ public class Connection implements Runnable {
     handshakeRead = false;
     handshakeCancelled = false;
     connected = true;
+    asyncMode = false;
 
     try {
       socket.setTcpNoDelay(true);
@@ -1137,6 +1142,7 @@ public class Connection implements Runnable {
     handshakeRead = false;
     handshakeCancelled = false;
     connected = true;
+    asyncMode = false;
 
     uniqueId = ID_COUNTER.getAndIncrement();
 
@@ -1470,6 +1476,10 @@ public class Connection implements Runnable {
         }
         asyncClose(false);
         owner.removeAndCloseThreadOwnedSockets();
+      } else {
+        if (sharedResource && !asyncMode) {
+          asyncClose(false);
+        }
       }
       releaseInputBuffer();
 
@@ -1484,10 +1494,12 @@ public class Connection implements Runnable {
   }
 
   private void releaseInputBuffer() {
-    ByteBuffer tmp = inputBuffer;
-    if (tmp != null) {
-      inputBuffer = null;
-      getBufferPool().releaseReceiveBuffer(tmp);
+    synchronized (inputBufferLock) {
+      ByteBuffer tmp = inputBuffer;
+      if (tmp != null) {
+        inputBuffer = null;
+        getBufferPool().releaseReceiveBuffer(tmp);
+      }
     }
   }
 
@@ -1609,10 +1621,9 @@ public class Connection implements Runnable {
             }
             return;
           }
-
           processInputBuffer();
 
-          if (!isReceiver && (handshakeRead || handshakeCancelled)) {
+          if (!isHandShakeReader && !isReceiver && (handshakeRead || handshakeCancelled)) {
             if (logger.isDebugEnabled()) {
               if (handshakeRead) {
                 logger.debug("handshake has been read {}", this);
@@ -1621,8 +1632,13 @@ public class Connection implements Runnable {
               }
             }
             isHandShakeReader = true;
-            // Once we have read the handshake the reader can go away
-            break;
+
+            // Once we have read the handshake for unshared connections, the reader can skip
+            // processing messages
+            if (!sharedResource || asyncMode) {
+              break;
+            }
+
           }
         } catch (CancelException e) {
           if (logger.isDebugEnabled()) {
@@ -1676,7 +1692,7 @@ public class Connection implements Runnable {
         }
       }
     } finally {
-      if (!isHandShakeReader) {
+      if (!isHandShakeReader || (sharedResource && !asyncMode)) {
         synchronized (stateLock) {
           connectionState = STATE_IDLE;
         }
@@ -3107,6 +3123,10 @@ public class Connection implements Runnable {
           remoteVersion = Version.readVersion(dis, true);
           ioFilter.doneReading(peerDataBuffer);
           notifyHandshakeWaiter(true);
+          if (preserveOrder && asyncDistributionTimeout != 0) {
+            asyncMode = true;
+          }
+
           return;
         default:
           String err =
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
index c4796a4..113d91a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
@@ -715,7 +715,7 @@ public class ConnectionTable {
    *
    * @param beingSick a test hook to simulate a sick process
    */
-  private void closeReceivers(boolean beingSick) {
+  void closeReceivers(boolean beingSick) {
     synchronized (receivers) {
       for (Iterator it = receivers.iterator(); it.hasNext();) {
         Connection con = (Connection) it.next();