You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2019/01/28 18:15:23 UTC

[geode] branch feature/GEODE-2113e updated: added PIDs to log messages to make stress-test artifacts more useful

This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch feature/GEODE-2113e
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-2113e by this push:
     new 8b37afe  added PIDs to log messages to make stress-test artifacts more useful
8b37afe is described below

commit 8b37afe944fa6991be05fa60a3f0e453cdef9aab
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Mon Jan 28 10:13:56 2019 -0800

    added PIDs to log messages to make stress-test artifacts more useful
    
    I also removed the time-out parameters in Awaitility calls - they'll use
    the default 5 minute timeout.
---
 .../geode/ClusterCommunicationsDUnitTest.java      |  2 +-
 .../internal/net/SSLSocketIntegrationTest.java     |  4 +-
 .../distributed/internal/tcpserver/TcpClient.java  |  4 ++
 .../apache/geode/internal/net/NioSslEngine.java    | 58 +++++++++++++---------
 .../org/apache/geode/internal/tcp/MsgReader.java   | 26 ++++++----
 5 files changed, 57 insertions(+), 37 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java
index 6a1e0a5..b4d106e 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java
@@ -184,7 +184,7 @@ public class ClusterCommunicationsDUnitTest implements java.io.Serializable {
           VM.getVM(2).invoke(() -> cache.getDistributedSystem().getDistributedMember());
       VM.getVM(1).invoke("receive a large direct-reply message", () -> {
         SerialAckedMessageWithBigReply messageWithBigReply = new SerialAckedMessageWithBigReply();
-        await().atMost(3, TimeUnit.MINUTES).until(() -> {
+        await().until(() -> {
           messageWithBigReply.send(Collections.<DistributedMember>singleton(vm2ID));
           return true;
         });
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
index 5fa7697..5e81b83 100755
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
@@ -202,10 +202,10 @@ public class SSLSocketIntegrationTest {
         SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER);
     this.serverThread = startServerNIO(serverSocket, 15000);
 
-    await().atMost(5, TimeUnit.MINUTES).until(() -> serverThread.isAlive());
+    await().until(() -> serverThread.isAlive());
 
     SocketChannel clientChannel = SocketChannel.open();
-    await().atMost(5, TimeUnit.MINUTES).until(
+    await().until(
         () -> clientChannel.connect(new InetSocketAddress(localHost, serverPort)));
 
     clientSocket = clientChannel.socket();
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpClient.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpClient.java
index 44263f0..4ff707b 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpClient.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpClient.java
@@ -326,6 +326,10 @@ public class TcpClient {
     } finally {
       try {
         sock.setSoLinger(true, 0); // initiate an abort on close to shut down the server's socket
+      } catch (Exception e) {
+        logger.error("Error aborting socket ", e);
+      }
+      try {
         sock.close();
       } catch (Exception e) {
         logger.error("Error closing socket ", e);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
index e9313ba..84d4f54 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
@@ -42,6 +42,7 @@ import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.GemFireIOException;
 import org.apache.geode.distributed.internal.DMStats;
+import org.apache.geode.internal.OSProcess;
 import org.apache.geode.internal.logging.LogService;
 
 
@@ -308,6 +309,11 @@ public class NioSslEngine implements NioFilter {
     }
   }
 
+  /*
+   * NioSslEngine doesn't need to ensure capacity in network buffers because they
+   * are fixed in size by the SslContext. The size recommended by the context is
+   * big enough for the SslEngine to do its work.
+   */
   @Override
   public ByteBuffer ensureWrappedCapacity(int amount, ByteBuffer wrappedBuffer,
       Buffers.BufferType bufferType, DMStats stats) {
@@ -317,32 +323,38 @@ public class NioSslEngine implements NioFilter {
   @Override
   public ByteBuffer readAtLeast(SocketChannel channel, int bytes,
       ByteBuffer wrappedBuffer, DMStats stats) throws IOException {
-
-    if (peerAppData.capacity() > bytes) {
-      // we already have a buffer that's big enough
-      if (peerAppData.capacity() - peerAppData.position() < bytes) {
-        peerAppData.compact();
-        peerAppData.flip();
+    try {
+      if (peerAppData.capacity() > bytes) {
+        // we already have a buffer that's big enough
+        if (peerAppData.capacity() - peerAppData.position() < bytes) {
+          peerAppData.compact();
+          peerAppData.flip();
+        }
+      } else {
+        peerAppData =
+            Buffers.expandReadBufferIfNeeded(TRACKED_RECEIVER, peerAppData, bytes, this.stats);
       }
-    } else {
-      peerAppData =
-          Buffers.expandReadBufferIfNeeded(TRACKED_RECEIVER, peerAppData, bytes, this.stats);
-    }
 
-    while (peerAppData.remaining() < bytes) {
-      wrappedBuffer.limit(wrappedBuffer.capacity());
-      int amountRead = channel.read(wrappedBuffer);
-      if (amountRead < 0) {
-        throw new EOFException();
-      }
-      if (amountRead > 0) {
-        wrappedBuffer.flip();
-        // prep the decoded buffer for writing
-        peerAppData.compact();
-        peerAppData = unwrap(wrappedBuffer);
-        // done writing to the decoded buffer - prep it for reading again
-        peerAppData.flip();
+      while (peerAppData.remaining() < bytes) {
+        wrappedBuffer.limit(wrappedBuffer.capacity());
+        int amountRead = channel.read(wrappedBuffer);
+        if (amountRead < 0) {
+          throw new EOFException();
+        }
+        if (amountRead > 0) {
+          wrappedBuffer.flip();
+          // prep the decoded buffer for writing
+          peerAppData.compact();
+          peerAppData = unwrap(wrappedBuffer);
+          // done writing to the decoded buffer - prep it for reading again
+          peerAppData.flip();
+        }
       }
+    } catch (IOException | RuntimeException | Error e) {
+      logger.warn(
+          "BRUCE: PID " + OSProcess.getId() + ", MsgReader.readAtLeast() is throwing an exception",
+          e);
+      throw e;
     }
     return peerAppData;
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
index 15a6389..85c8d93 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
@@ -25,6 +25,7 @@ import org.apache.geode.distributed.internal.DistributionMessage;
 import org.apache.geode.distributed.internal.ReplyProcessor21;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.InternalDataSerializer;
+import org.apache.geode.internal.OSProcess;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.net.Buffers;
@@ -109,7 +110,7 @@ public class MsgReader {
     try {
       byteBufferInputStream.setBuffer(nioInputBuffer);
       ReplyProcessor21.initMessageRPId();
-//      dumpState("readMessage ready to deserialize", null, nioInputBuffer, position, limit);
+      // dumpState("readMessage ready to deserialize", null, nioInputBuffer, position, limit);
       return (DistributionMessage) InternalDataSerializer.readDSFID(byteBufferInputStream);
     } catch (RuntimeException e) {
       dumpState("readMessage(1)", e, nioInputBuffer, position, limit);
@@ -126,16 +127,19 @@ public class MsgReader {
 
   private void dumpState(String whereFrom, Throwable e, ByteBuffer inputBuffer, int position,
       int limit) {
-     logger.info("BRUCE: {}, Connection to {}", whereFrom, conn.getRemoteAddress());
-     logger.info("BRUCE: {}, Message length {}; type {}; id {}",
-     whereFrom, header.messageLength, header.messageId, header.messageId);
-     logger.info("BRUCE: {}, starting buffer position {}; buffer limit {} buffer hash {}",
-     whereFrom, position, limit, Integer.toHexString(System.identityHashCode(inputBuffer)));
-     logger.info("BRUCE: {}, current buffer position {}; buffer limit {}",
-     whereFrom, inputBuffer.position(), inputBuffer.limit());
-     if (e != null) {
-     logger.info("BRUCE: Exception reading message", e);
-     }
+    logger.info("BRUCE: {}, PID {} Connection to {}, {} SSL", whereFrom, OSProcess.getId(),
+        conn.getRemoteAddress(), conn.getConduit().useSSL());
+    logger.info("BRUCE: {}, Message length {}; type {}; id {}",
+        whereFrom, header.messageLength, header.messageType, header.messageId);
+    logger.info(
+        "BRUCE: {}, starting buffer position {}; buffer limit {} capacity {} buffer hash {}",
+        whereFrom, position, limit, inputBuffer.capacity(),
+        Integer.toHexString(System.identityHashCode(inputBuffer)));
+    logger.info("BRUCE: {}, current buffer position {}; buffer limit {}",
+        whereFrom, inputBuffer.position(), inputBuffer.limit());
+    if (e != null) {
+      logger.info("BRUCE: Exception reading message", e);
+    }
   }
 
   void readChunk(Header header, MsgDestreamer md)