You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2020/03/20 22:57:08 UTC

[geode] 01/01: GEODE-6008 work in progress

This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch feature/GEODE-6008
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 071c4e4c98144741e30adcee19d72bec2cdc2c34
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Fri Mar 20 15:56:22 2020 -0700

    GEODE-6008 work in progress
---
 ...tServerHostNameVerificationDistributedTest.java |  2 +
 .../internal/tcp/TCPConduitBareBonesDUnitTest.java | 76 ++++++++++++++++++++++
 .../apache/geode/distributed/internal/DMStats.java |  2 +
 .../distributed/internal/DistributionStats.java    |  9 +++
 .../internal/LonerDistributionManager.java         |  5 ++
 .../org/apache/geode/internal/tcp/Connection.java  | 29 +++++++--
 .../org/apache/geode/internal/tcp/TCPConduit.java  |  4 +-
 7 files changed, 118 insertions(+), 9 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/client/internal/ClientServerHostNameVerificationDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/client/internal/ClientServerHostNameVerificationDistributedTest.java
index 8f1db5f..8ce2b34 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache/client/internal/ClientServerHostNameVerificationDistributedTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/client/internal/ClientServerHostNameVerificationDistributedTest.java
@@ -39,6 +39,7 @@ import org.apache.geode.cache.client.NoAvailableServersException;
 import org.apache.geode.cache.ssl.CertStores;
 import org.apache.geode.cache.ssl.CertificateBuilder;
 import org.apache.geode.cache.ssl.CertificateMaterial;
+import org.apache.geode.internal.inet.LocalHostUtil;
 import org.apache.geode.internal.net.SocketCreatorFactory;
 import org.apache.geode.test.dunit.IgnoredException;
 import org.apache.geode.test.dunit.rules.ClusterStartupRule;
@@ -162,6 +163,7 @@ public class ClientServerHostNameVerificationDistributedTest {
         .sanDnsName(InetAddress.getLoopbackAddress().getHostName())
         .sanDnsName(InetAddress.getLocalHost().getHostName())
         .sanDnsName(InetAddress.getLocalHost().getCanonicalHostName())
+        .sanIpAddress(LocalHostUtil.getLocalHost())
         .sanIpAddress(InetAddress.getLocalHost())
         .generate();
 
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitBareBonesDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitBareBonesDUnitTest.java
new file mode 100644
index 0000000..77eb02e
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitBareBonesDUnitTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.tcp;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Properties;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.DistributionConfigImpl;
+import org.apache.geode.distributed.internal.DistributionImpl;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.test.dunit.DistributedTestUtils;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+
+public class TCPConduitBareBonesDUnitTest {
+
+  @Rule
+  public DistributedRule distributedRule = new DistributedRule(0);
+
+  @Test
+  public void testShortTermSharedConnections() throws Exception {
+    // create and close many shared, ordered connections. Also see CloseConnectionTest,
+    // which ensures that the outgoing socket in a shared connection is properly closed
+    Properties properties = new Properties();
+    properties.put(ConfigurationProperties.LOG_LEVEL, "fine");
+    properties.put(ConfigurationProperties.LOCATORS, DistributedTestUtils.getLocators());
+    DistributionConfig configuration = new DistributionConfigImpl(properties);
+    InternalDistributedSystem distributedSystem =
+        (InternalDistributedSystem) InternalDistributedSystem.connect(configuration.toProperties());
+    TCPConduit conduit =
+        ((DistributionImpl) distributedSystem.getDistributionManager().getDistribution())
+            .getDirectChannel().getConduit();
+
+    InternalDistributedMember memberID =
+        distributedSystem.getDistributionManager().getDistribution().getView().getCreator();
+    ConnectionTable.threadWantsSharedResources();
+
+    try {
+      for (int i = 0; i < 1000; i++) {
+        long buffersSize =
+            distributedSystem.getDistributionManager().getStats().getSenderBufferSize(true);
+        // connect to the dunit locator. The first iteration will get the Connection that was formed
+        // during startup, but we'll close it and start creating new ones.
+        Connection connection =
+            conduit.getConnection(memberID, true, false, System.currentTimeMillis(), 15000, 0);
+        System.out.println("Test iteration " + i + ": " + connection);
+        connection.requestClose("for test");
+        // the connection should be stopped at this point
+        assertThat(connection.isReceiverStopped()).isTrue();
+        // make sure there are no double releases of ByteBuffers
+        assertThat(distributedSystem.getDistributionManager().getStats().getSenderBufferSize(true))
+            .isEqualTo(buffersSize);
+      }
+    } finally {
+      distributedSystem.disconnect();
+    }
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DMStats.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DMStats.java
index 0f880b2..f910e9d 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DMStats.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DMStats.java
@@ -475,6 +475,8 @@ public interface DMStats extends MembershipStatistics {
    */
   void incSenderBufferSize(int inc, boolean direct);
 
+  long getSenderBufferSize(boolean direct);
+
   /**
    * @since GemFire 5.0.2.4
    */
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java
index b08a512..c44f29a 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java
@@ -2287,6 +2287,15 @@ public class DistributionStats implements DMStats {
   }
 
   @Override
+  public long getSenderBufferSize(boolean direct) {
+    if (direct) {
+      return stats.getLong(senderDirectBufferSizeId);
+    } else {
+      return stats.getLong(senderHeapBufferSizeId);
+    }
+  }
+
+  @Override
   public void incMessagesBeingReceived(boolean newMsg, int bytes) {
     if (newMsg) {
       stats.incInt(messagesBeingReceivedId, 1);
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
index f2e9f51..9cb34d2 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
@@ -800,6 +800,11 @@ public class LonerDistributionManager implements DistributionManager {
     public void incSenderBufferSize(int inc, boolean direct) {}
 
     @Override
+    public long getSenderBufferSize(boolean direct) {
+      return 0;
+    }
+
+    @Override
     public long startSocketLock() {
       return 0;
     }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 8c8a2fc..893d42d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -471,6 +471,12 @@ public class Connection implements Runnable {
   private volatile boolean ackTimedOut;
 
   /**
+   * a Reader thread for an shared Connection will remain around in order to
+   * ensure that the socket is properly closed.
+   */
+  private volatile boolean isResidualReaderThread;
+
+  /**
    * creates a "reader" connection that we accepted (it was initiated by an explicit connect being
    * done on the other side).
    */
@@ -1326,7 +1332,7 @@ public class Connection implements Runnable {
         }
         // make sure our socket is closed
         asyncClose(false);
-        if (!isReceiver) {
+        if (!isReceiver && !isResidualReaderThread) {
           // receivers release the input buffer when exiting run(). Senders use the
           // inputBuffer for reading direct-reply responses
           releaseInputBuffer();
@@ -1465,6 +1471,7 @@ public class Connection implements Runnable {
           asyncClose(false);
         }
       }
+
       releaseInputBuffer();
 
       // make sure that if the reader thread exits we notify a thread waiting for the handshake.
@@ -1508,6 +1515,9 @@ public class Connection implements Runnable {
   }
 
   private void readMessages() {
+    if (closing.get()) {
+      return;
+    }
     // take a snapshot of uniqueId to detect reconnect attempts
     SocketChannel channel;
     try {
@@ -1552,7 +1562,7 @@ public class Connection implements Runnable {
     }
     // we should not change the state of the connection if we are a handshake reader thread
     // as there is a race between this thread and the application thread doing direct ack
-    boolean isHandShakeReader = false;
+    boolean handshakeHasBeenRead = false;
     // if we're using SSL/TLS the input buffer may already have data to process
     boolean skipInitialRead = getInputBuffer().position() > 0;
     try {
@@ -1607,7 +1617,7 @@ public class Connection implements Runnable {
           }
           processInputBuffer();
 
-          if (!isHandShakeReader && !isReceiver && (handshakeRead || handshakeCancelled)) {
+          if (!handshakeHasBeenRead && !isReceiver && (handshakeRead || handshakeCancelled)) {
             if (logger.isDebugEnabled()) {
               if (handshakeRead) {
                 logger.debug("handshake has been read {}", this);
@@ -1615,12 +1625,16 @@ public class Connection implements Runnable {
                 logger.debug("handshake has been cancelled {}", this);
               }
             }
-            isHandShakeReader = true;
+            handshakeHasBeenRead = true;
 
             // Once we have read the handshake for unshared connections, the reader can skip
             // processing messages
             if (!sharedResource || asyncMode) {
               break;
+            } else {
+              // not exiting and not a Reader spawned from a ServerSocket.accept(), so
+              // let's set some state noting that this is happening
+              isResidualReaderThread = true;
             }
 
           }
@@ -1663,7 +1677,7 @@ public class Connection implements Runnable {
           return;
 
         } catch (Exception e) {
-          owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
+          owner.getConduit().getCancelCriterion().checkCancelInProgress(e);
           if (!stopped && !isSocketClosed()) {
             logger.fatal(String.format("%s exception in channel read", p2pReaderName()), e);
           }
@@ -1676,14 +1690,15 @@ public class Connection implements Runnable {
         }
       }
     } finally {
-      if (!isHandShakeReader || (sharedResource && !asyncMode)) {
+      isResidualReaderThread = false;
+      if (!handshakeHasBeenRead || (sharedResource && !asyncMode)) {
         synchronized (stateLock) {
           connectionState = STATE_IDLE;
         }
       }
       if (logger.isDebugEnabled()) {
         logger.debug("readMessages terminated id={} from {} isHandshakeReader={}", conduitIdStr,
-            remoteAddr, isHandShakeReader);
+            remoteAddr, handshakeHasBeenRead);
       }
     }
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
index 6f5edda..51af0de 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
@@ -305,14 +305,14 @@ public class TCPConduit implements Runnable {
         logger.warn("exception parsing p2p.idleConnectionTimeout", e);
       }
 
-      s = p.getProperty("membership_port_range_start");
+      s = p.getProperty("membership_port_range_start", "" + DEFAULT_MEMBERSHIP_PORT_RANGE[0]);
       try {
         tcpPortRange[0] = Integer.parseInt(s);
       } catch (Exception e) {
         logger.warn("Exception parsing membership-port-range start port.", e);
       }
 
-      s = p.getProperty("membership_port_range_end");
+      s = p.getProperty("membership_port_range_end", "" + DEFAULT_MEMBERSHIP_PORT_RANGE[1]);
       try {
         tcpPortRange[1] = Integer.parseInt(s);
       } catch (Exception e) {