You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by mi...@apache.org on 2020/03/10 05:47:43 UTC

[geode] branch develop updated: GEODE-7727: modify sender thread to detect relese of connection (#4751)

This is an automated email from the ASF dual-hosted git repository.

mivanac pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new c841359  GEODE-7727: modify sender thread to detect relese of connection (#4751)
c841359 is described below

commit c8413592e5573f675c538c63ef9ee9f97a349e73
Author: Mario Ivanac <48...@users.noreply.github.com>
AuthorDate: Tue Mar 10 06:47:09 2020 +0100

    GEODE-7727: modify sender thread to detect relese of connection (#4751)
    
    * GEODE-7727: modify sender thread to detect relese of connection
    
    * GEODE-7727: Update solution only for shared connections
    
    * GEODE-7727: added test
    
    * GEODE-7727: update ater comments
    
    * GEODE-7727: update test
    
    * GEODE-7727: fix for async write hanging
    
    * GEODE-7727: Test of region operations in the face of closed connections
    
    Adding a test for what happens to region operations when a connection is closed
    out from under the system. This test hangs without the changes to let the
    reader thread keep running.
    
    Fix to test
    
    * GEODE-7727: Preventing a double release of the input buffer
    
    The releaseInputBuffer method was not thread safe. If it is called
    concurrently, it will end up being released twice, which will add the buffer to
    to the buffer pool twice. Later, this could result in two threads using the
    same buffer, resulting in corruption of the buffer.
    
    With the changes for GEODE-7727, we made it likely that releaseInputBuffer
    would be called concurrently. If a member departs, one thread will call
    Connection.close. Connection.close will close the socket and call
    releaseInputBuffer. However, closing the socket will wake up the reader thread,
    which will also call releaseInputBuffer concurrently.
    
    Making releaseInputBuffer thread safe by introducing a lock.
    
    * GEODE-7727: update after merge
    
    * GEODE-7727: update test name
    
    Co-authored-by: Dan Smith <up...@apache.org>
---
 .../geode/internal/tcp/CloseConnectionTest.java    | 76 ++++++++++++++++++++++
 .../geode/internal/tcp/TCPConduitDUnitTest.java    |  5 +-
 ...erStartupWhenAsyncDistributionTimeoutIsSet.java | 71 ++++++++++++++++++++
 ...butedSystemMXBeanWithAlertsDistributedTest.java |  1 +
 .../distributed/internal/DistributionImpl.java     |  4 ++
 .../org/apache/geode/internal/tcp/Connection.java  | 38 ++++++++---
 .../apache/geode/internal/tcp/ConnectionTable.java | 14 +++-
 .../org/apache/geode/internal/tcp/TCPConduit.java  |  2 +-
 8 files changed, 199 insertions(+), 12 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/CloseConnectionTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/CloseConnectionTest.java
new file mode 100644
index 0000000..154e908
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/CloseConnectionTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.tcp;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Test;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionImpl;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.cache.CacheTestCase;
+
+public class CloseConnectionTest extends CacheTestCase {
+
+  @Test(timeout = 60_000)
+  public void sharedSenderShouldRecoverFromClosedSocket() {
+    VM vm0 = VM.getVM(0);
+    VM vm1 = VM.getVM(1);
+
+    // Create a region in each member. VM0 has a proxy region, so state must be in VM1
+    vm0.invoke(() -> {
+      getCache().createRegionFactory(RegionShortcut.REPLICATE_PROXY).create("region");
+    });
+    vm1.invoke(() -> {
+      getCache().createRegionFactory(RegionShortcut.REPLICATE).create("region");
+    });
+
+
+    // Force VM1 to close it's connections.
+    vm1.invoke(() -> {
+      ConnectionTable conTable = getConnectionTable();
+      assertThat(conTable.getNumberOfReceivers()).isEqualTo(2);
+      conTable.closeReceivers(false);
+      assertThat(conTable.getNumberOfReceivers()).isEqualTo(0);
+    });
+
+    // See if VM0 noticed the closed connections. Try to do a couple of region
+    // operations
+    vm0.invoke(() -> {
+      Region<Object, Object> region = getCache().getRegion("region");
+      region.put("1", "1");
+
+      assertThat(region.get("1")).isEqualTo("1");
+    });
+
+    // Make sure connections were reestablished
+    vm1.invoke(() -> {
+      ConnectionTable conTable = getConnectionTable();
+      assertThat(conTable.getNumberOfReceivers()).isEqualTo(2);
+    });
+  }
+
+  private ConnectionTable getConnectionTable() {
+    ClusterDistributionManager cdm =
+        (ClusterDistributionManager) getSystem().getDistributionManager();
+    DistributionImpl distribution = (DistributionImpl) cdm.getDistribution();
+    return distribution.getDirectChannel().getConduit().getConTable();
+  }
+
+
+}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitDUnitTest.java
index 9462c72..9ffb76c 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitDUnitTest.java
@@ -16,6 +16,7 @@ package org.apache.geode.internal.tcp;
 
 import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
 import static org.apache.geode.test.util.ResourceUtils.createTempFileFromResource;
+import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.File;
 import java.io.IOException;
@@ -91,7 +92,9 @@ public class TCPConduitDUnitTest extends DistributedTestCase {
     vm2.invoke(() -> startServer(properties));
     vm3.invoke(() -> startServer(properties));
 
-    Thread.sleep(5000);
+    await().untilAsserted(() -> {
+      assertThat(ConnectionTable.getNumSenderSharedConnections()).isEqualTo(3);
+    });
 
     try {
       await("for message to be sent").until(() -> {
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TestServerStartupWhenAsyncDistributionTimeoutIsSet.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TestServerStartupWhenAsyncDistributionTimeoutIsSet.java
new file mode 100644
index 0000000..be57e26
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TestServerStartupWhenAsyncDistributionTimeoutIsSet.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.tcp;
+
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.SerialAckedMessage;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+public class TestServerStartupWhenAsyncDistributionTimeoutIsSet implements Serializable {
+  int serversToStart = 3;
+
+  protected static InternalDistributedSystem system =
+      InternalDistributedSystem.getConnectedInstance();
+
+  @Rule
+  public ClusterStartupRule cluster = new ClusterStartupRule(serversToStart + 1);
+
+  MemberVM locator;
+  MemberVM server1;
+  MemberVM server2;
+  MemberVM server3;
+
+  @Before
+  public void setUp() throws Exception {
+    locator = cluster.startLocatorVM(0);
+  }
+
+  private MemberVM startServer(final int vmIndex) {
+    return cluster.startServerVM(
+        vmIndex, s -> s.withConnectionToLocator(locator.getPort())
+            .withProperty("async-distribution-timeout", "5"));
+  }
+
+  @Test
+  public void testServerStartupDoesNotHangWhenAsyncDistributionTimeoutIsSet() {
+    server1 = startServer(1);
+    server2 = startServer(2);
+    server3 = startServer(3);
+    locator.invoke(() -> await().untilAsserted(() -> {
+      assertThat(ConnectionTable.getNumSenderSharedConnections()).isEqualTo(3);
+    }));
+
+    locator.invoke(() -> await("for message to be sent").until(() -> {
+      final SerialAckedMessage serialAckedMessage = new SerialAckedMessage();
+      serialAckedMessage.send(system.getAllOtherMembers(), false);
+      return true;
+    }));
+  }
+}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/management/DistributedSystemMXBeanWithAlertsDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/management/DistributedSystemMXBeanWithAlertsDistributedTest.java
index c549fe4..8aa3ada 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/management/DistributedSystemMXBeanWithAlertsDistributedTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/management/DistributedSystemMXBeanWithAlertsDistributedTest.java
@@ -137,6 +137,7 @@ public class DistributedSystemMXBeanWithAlertsDistributedTest implements Seriali
     memberVM3 = getVM(3);
 
     managerMember = managerVM.invoke(() -> createManager());
+    IgnoredException.addIgnoredException("Cannot form connection to alert listener");
 
     for (VM memberVM : toArray(memberVM1, memberVM2, memberVM3)) {
       memberVM.invoke(() -> {
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionImpl.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionImpl.java
index a89d23f..2fad2e6 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionImpl.java
@@ -794,6 +794,10 @@ public class DistributionImpl implements Distribution {
     return result;
   }
 
+  public DirectChannel getDirectChannel() {
+    return directChannel;
+  }
+
 
   /**
    * Insert our own MessageReceiver between us and the direct channel, in order to correctly filter
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index ce83ad4..8c8a2fc 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -311,6 +311,9 @@ public class Connection implements Runnable {
   /** the buffer used for message receipt */
   private ByteBuffer inputBuffer;
 
+  /** Lock used to protect the input buffer */
+  public final Object inputBufferLock = new Object();
+
   /** the length of the next message to be dispatched */
   private int messageLength;
 
@@ -339,6 +342,7 @@ public class Connection implements Runnable {
 
   private boolean directAck;
 
+  private boolean asyncMode;
 
   /** is this connection used for serial message delivery? */
   boolean preserveOrder;
@@ -483,6 +487,7 @@ public class Connection implements Runnable {
     handshakeRead = false;
     handshakeCancelled = false;
     connected = true;
+    asyncMode = false;
 
     try {
       socket.setTcpNoDelay(true);
@@ -1121,6 +1126,7 @@ public class Connection implements Runnable {
     handshakeRead = false;
     handshakeCancelled = false;
     connected = true;
+    asyncMode = false;
 
     uniqueId = ID_COUNTER.getAndIncrement();
 
@@ -1454,6 +1460,10 @@ public class Connection implements Runnable {
         }
         asyncClose(false);
         owner.removeAndCloseThreadOwnedSockets();
+      } else {
+        if (sharedResource && !asyncMode) {
+          asyncClose(false);
+        }
       }
       releaseInputBuffer();
 
@@ -1468,10 +1478,12 @@ public class Connection implements Runnable {
   }
 
   private void releaseInputBuffer() {
-    ByteBuffer tmp = inputBuffer;
-    if (tmp != null) {
-      inputBuffer = null;
-      getBufferPool().releaseReceiveBuffer(tmp);
+    synchronized (inputBufferLock) {
+      ByteBuffer tmp = inputBuffer;
+      if (tmp != null) {
+        inputBuffer = null;
+        getBufferPool().releaseReceiveBuffer(tmp);
+      }
     }
   }
 
@@ -1593,10 +1605,9 @@ public class Connection implements Runnable {
             }
             return;
           }
-
           processInputBuffer();
 
-          if (!isReceiver && (handshakeRead || handshakeCancelled)) {
+          if (!isHandShakeReader && !isReceiver && (handshakeRead || handshakeCancelled)) {
             if (logger.isDebugEnabled()) {
               if (handshakeRead) {
                 logger.debug("handshake has been read {}", this);
@@ -1605,8 +1616,13 @@ public class Connection implements Runnable {
               }
             }
             isHandShakeReader = true;
-            // Once we have read the handshake the reader can go away
-            break;
+
+            // Once we have read the handshake for unshared connections, the reader can skip
+            // processing messages
+            if (!sharedResource || asyncMode) {
+              break;
+            }
+
           }
         } catch (CancelException e) {
           if (logger.isDebugEnabled()) {
@@ -1660,7 +1676,7 @@ public class Connection implements Runnable {
         }
       }
     } finally {
-      if (!isHandShakeReader) {
+      if (!isHandShakeReader || (sharedResource && !asyncMode)) {
         synchronized (stateLock) {
           connectionState = STATE_IDLE;
         }
@@ -3069,6 +3085,10 @@ public class Connection implements Runnable {
           remoteVersion = Version.readVersion(dis, true);
           ioFilter.doneReading(peerDataBuffer);
           notifyHandshakeWaiter(true);
+          if (preserveOrder && asyncDistributionTimeout != 0) {
+            asyncMode = true;
+          }
+
           return;
         default:
           String err =
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
index 6f5bf47..0c098d1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
@@ -37,6 +37,7 @@ import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.SystemFailure;
 import org.apache.geode.alerting.internal.spi.AlertingAction;
+import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.annotations.internal.MakeNotStatic;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.DistributedSystemDisconnectedException;
@@ -707,7 +708,7 @@ public class ConnectionTable {
    *
    * @param beingSick a test hook to simulate a sick process
    */
-  private void closeReceivers(boolean beingSick) {
+  void closeReceivers(boolean beingSick) {
     synchronized (receivers) {
       for (Iterator it = receivers.iterator(); it.hasNext();) {
         Connection con = (Connection) it.next();
@@ -917,6 +918,17 @@ public class ConnectionTable {
     }
   }
 
+  @VisibleForTesting
+  public static int getNumSenderSharedConnections() {
+    ConnectionTable ct = (ConnectionTable) lastInstance.get();
+    if (ct == null) {
+      return 0;
+    }
+    return (ct.getConduit().getStats().getSendersSU());
+  }
+
+
+
   /**
    * Clears lastInstance. Does not yet close underlying sockets, but probably not strictly
    * necessary.
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
index 6130420..6f5edda 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
@@ -613,7 +613,7 @@ public class TCPConduit implements Runnable {
     }
   }
 
-  private ConnectionTable getConTable() {
+  ConnectionTable getConTable() {
     ConnectionTable result = conTable;
     if (result == null) {
       stopper.checkCancelInProgress(null);