You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2019/01/28 18:15:23 UTC
[geode] branch feature/GEODE-2113e updated: added PIDs to log
messages to make stress-test artifacts more useful
This is an automated email from the ASF dual-hosted git repository.
bschuchardt pushed a commit to branch feature/GEODE-2113e
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-2113e by this push:
new 8b37afe added PIDs to log messages to make stress-test artifacts more useful
8b37afe is described below
commit 8b37afe944fa6991be05fa60a3f0e453cdef9aab
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Mon Jan 28 10:13:56 2019 -0800
added PIDs to log messages to make stress-test artifacts more useful
I also removed the time-out parameters in Awaitility calls - they'll use
the default 5 minute timeout.
---
.../geode/ClusterCommunicationsDUnitTest.java | 2 +-
.../internal/net/SSLSocketIntegrationTest.java | 4 +-
.../distributed/internal/tcpserver/TcpClient.java | 4 ++
.../apache/geode/internal/net/NioSslEngine.java | 58 +++++++++++++---------
.../org/apache/geode/internal/tcp/MsgReader.java | 26 ++++++----
5 files changed, 57 insertions(+), 37 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java
index 6a1e0a5..b4d106e 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java
@@ -184,7 +184,7 @@ public class ClusterCommunicationsDUnitTest implements java.io.Serializable {
VM.getVM(2).invoke(() -> cache.getDistributedSystem().getDistributedMember());
VM.getVM(1).invoke("receive a large direct-reply message", () -> {
SerialAckedMessageWithBigReply messageWithBigReply = new SerialAckedMessageWithBigReply();
- await().atMost(3, TimeUnit.MINUTES).until(() -> {
+ await().until(() -> {
messageWithBigReply.send(Collections.<DistributedMember>singleton(vm2ID));
return true;
});
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
index 5fa7697..5e81b83 100755
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
@@ -202,10 +202,10 @@ public class SSLSocketIntegrationTest {
SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER);
this.serverThread = startServerNIO(serverSocket, 15000);
- await().atMost(5, TimeUnit.MINUTES).until(() -> serverThread.isAlive());
+ await().until(() -> serverThread.isAlive());
SocketChannel clientChannel = SocketChannel.open();
- await().atMost(5, TimeUnit.MINUTES).until(
+ await().until(
() -> clientChannel.connect(new InetSocketAddress(localHost, serverPort)));
clientSocket = clientChannel.socket();
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpClient.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpClient.java
index 44263f0..4ff707b 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpClient.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpClient.java
@@ -326,6 +326,10 @@ public class TcpClient {
} finally {
try {
sock.setSoLinger(true, 0); // initiate an abort on close to shut down the server's socket
+ } catch (Exception e) {
+ logger.error("Error aborting socket ", e);
+ }
+ try {
sock.close();
} catch (Exception e) {
logger.error("Error closing socket ", e);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
index e9313ba..84d4f54 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
@@ -42,6 +42,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.geode.GemFireIOException;
import org.apache.geode.distributed.internal.DMStats;
+import org.apache.geode.internal.OSProcess;
import org.apache.geode.internal.logging.LogService;
@@ -308,6 +309,11 @@ public class NioSslEngine implements NioFilter {
}
}
+ /*
+ * NioSslEngine doesn't need to ensure capacity in network buffers because they
+ * are fixed in size by the SslContext. The size recommended by the context is
+ * big enough for the SslEngine to do its work.
+ */
@Override
public ByteBuffer ensureWrappedCapacity(int amount, ByteBuffer wrappedBuffer,
Buffers.BufferType bufferType, DMStats stats) {
@@ -317,32 +323,38 @@ public class NioSslEngine implements NioFilter {
@Override
public ByteBuffer readAtLeast(SocketChannel channel, int bytes,
ByteBuffer wrappedBuffer, DMStats stats) throws IOException {
-
- if (peerAppData.capacity() > bytes) {
- // we already have a buffer that's big enough
- if (peerAppData.capacity() - peerAppData.position() < bytes) {
- peerAppData.compact();
- peerAppData.flip();
+ try {
+ if (peerAppData.capacity() > bytes) {
+ // we already have a buffer that's big enough
+ if (peerAppData.capacity() - peerAppData.position() < bytes) {
+ peerAppData.compact();
+ peerAppData.flip();
+ }
+ } else {
+ peerAppData =
+ Buffers.expandReadBufferIfNeeded(TRACKED_RECEIVER, peerAppData, bytes, this.stats);
}
- } else {
- peerAppData =
- Buffers.expandReadBufferIfNeeded(TRACKED_RECEIVER, peerAppData, bytes, this.stats);
- }
- while (peerAppData.remaining() < bytes) {
- wrappedBuffer.limit(wrappedBuffer.capacity());
- int amountRead = channel.read(wrappedBuffer);
- if (amountRead < 0) {
- throw new EOFException();
- }
- if (amountRead > 0) {
- wrappedBuffer.flip();
- // prep the decoded buffer for writing
- peerAppData.compact();
- peerAppData = unwrap(wrappedBuffer);
- // done writing to the decoded buffer - prep it for reading again
- peerAppData.flip();
+ while (peerAppData.remaining() < bytes) {
+ wrappedBuffer.limit(wrappedBuffer.capacity());
+ int amountRead = channel.read(wrappedBuffer);
+ if (amountRead < 0) {
+ throw new EOFException();
+ }
+ if (amountRead > 0) {
+ wrappedBuffer.flip();
+ // prep the decoded buffer for writing
+ peerAppData.compact();
+ peerAppData = unwrap(wrappedBuffer);
+ // done writing to the decoded buffer - prep it for reading again
+ peerAppData.flip();
+ }
}
+ } catch (IOException | RuntimeException | Error e) {
+ logger.warn(
+ "BRUCE: PID " + OSProcess.getId() + ", MsgReader.readAtLeast() is throwing an exception",
+ e);
+ throw e;
}
return peerAppData;
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
index 15a6389..85c8d93 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
@@ -25,6 +25,7 @@ import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.InternalDataSerializer;
+import org.apache.geode.internal.OSProcess;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.net.Buffers;
@@ -109,7 +110,7 @@ public class MsgReader {
try {
byteBufferInputStream.setBuffer(nioInputBuffer);
ReplyProcessor21.initMessageRPId();
-// dumpState("readMessage ready to deserialize", null, nioInputBuffer, position, limit);
+ // dumpState("readMessage ready to deserialize", null, nioInputBuffer, position, limit);
return (DistributionMessage) InternalDataSerializer.readDSFID(byteBufferInputStream);
} catch (RuntimeException e) {
dumpState("readMessage(1)", e, nioInputBuffer, position, limit);
@@ -126,16 +127,19 @@ public class MsgReader {
private void dumpState(String whereFrom, Throwable e, ByteBuffer inputBuffer, int position,
int limit) {
- logger.info("BRUCE: {}, Connection to {}", whereFrom, conn.getRemoteAddress());
- logger.info("BRUCE: {}, Message length {}; type {}; id {}",
- whereFrom, header.messageLength, header.messageId, header.messageId);
- logger.info("BRUCE: {}, starting buffer position {}; buffer limit {} buffer hash {}",
- whereFrom, position, limit, Integer.toHexString(System.identityHashCode(inputBuffer)));
- logger.info("BRUCE: {}, current buffer position {}; buffer limit {}",
- whereFrom, inputBuffer.position(), inputBuffer.limit());
- if (e != null) {
- logger.info("BRUCE: Exception reading message", e);
- }
+ logger.info("BRUCE: {}, PID {} Connection to {}, {} SSL", whereFrom, OSProcess.getId(),
+ conn.getRemoteAddress(), conn.getConduit().useSSL());
+ logger.info("BRUCE: {}, Message length {}; type {}; id {}",
+ whereFrom, header.messageLength, header.messageType, header.messageId);
+ logger.info(
+ "BRUCE: {}, starting buffer position {}; buffer limit {} capacity {} buffer hash {}",
+ whereFrom, position, limit, inputBuffer.capacity(),
+ Integer.toHexString(System.identityHashCode(inputBuffer)));
+ logger.info("BRUCE: {}, current buffer position {}; buffer limit {}",
+ whereFrom, inputBuffer.position(), inputBuffer.limit());
+ if (e != null) {
+ logger.info("BRUCE: Exception reading message", e);
+ }
}
void readChunk(Header header, MsgDestreamer md)