You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/12/17 23:38:54 UTC
[2/5] git commit: cleanup + debug logging
cleanup + debug logging
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1d605281
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1d605281
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1d605281
Branch: refs/heads/trunk
Commit: 1d6052810df9363ed8dee308444b8466be112b5d
Parents: ecec863
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Dec 17 16:34:53 2013 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Dec 17 16:37:05 2013 -0600
----------------------------------------------------------------------
.../apache/cassandra/net/MessagingService.java | 48 +++++++++-----------
1 file changed, 22 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d605281/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 20cad82..b2c8014 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -37,7 +37,6 @@ import com.google.common.collect.Lists;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.concurrent.TracingAwareExecutorService;
@@ -391,7 +390,7 @@ public final class MessagingService implements MessagingServiceMBean
public void listen(InetAddress localEp) throws ConfigurationException
{
callbacks.reset(); // hack to allow tests to stop/restart MS
- for (ServerSocket ss : getServerSocket(localEp))
+ for (ServerSocket ss : getServerSockets(localEp))
{
SocketThread th = new SocketThread(ss, "ACCEPT-" + localEp);
th.start();
@@ -400,7 +399,7 @@ public final class MessagingService implements MessagingServiceMBean
listenGate.signalAll();
}
- private List<ServerSocket> getServerSocket(InetAddress localEp) throws ConfigurationException
+ private List<ServerSocket> getServerSockets(InetAddress localEp) throws ConfigurationException
{
final List<ServerSocket> ss = new ArrayList<ServerSocket>(2);
if (DatabaseDescriptor.getServerEncryptionOptions().internode_encryption != ServerEncryptionOptions.InternodeEncryption.none)
@@ -834,36 +833,31 @@ public final class MessagingService implements MessagingServiceMBean
try
{
socket = server.accept();
- if (authenticate(socket))
- {
- socket.setKeepAlive(true);
- // determine the connection type to decide whether to buffer
- DataInputStream in = new DataInputStream(socket.getInputStream());
- MessagingService.validateMagic(in.readInt());
- int header = in.readInt();
- boolean isStream = MessagingService.getBits(header, 3, 1) == 1;
- int version = MessagingService.getBits(header, 15, 8);
- logger.debug("Connection version {} from {}", version, socket.getInetAddress());
-
- if (isStream)
- {
- new IncomingStreamingConnection(version, socket).start();
- }
- else
- {
- boolean compressed = MessagingService.getBits(header, 2, 1) == 1;
- new IncomingTcpConnection(version, compressed, socket).start();
- }
- }
- else
+ if (!authenticate(socket))
{
+ logger.debug("remote failed to authenticate");
socket.close();
+ continue;
}
+
+ socket.setKeepAlive(true);
+ // determine the connection type to decide whether to buffer
+ DataInputStream in = new DataInputStream(socket.getInputStream());
+ MessagingService.validateMagic(in.readInt());
+ int header = in.readInt();
+ boolean isStream = MessagingService.getBits(header, 3, 1) == 1;
+ int version = MessagingService.getBits(header, 15, 8);
+ logger.debug("Connection version {} from {}", version, socket.getInetAddress());
+
+ Thread thread = isStream
+ ? new IncomingStreamingConnection(version, socket)
+ : new IncomingTcpConnection(version, MessagingService.getBits(header, 2, 1) == 1, socket);
+ thread.start();
}
catch (AsynchronousCloseException e)
{
// this happens when another thread calls close().
- logger.info("MessagingService shutting down server thread");
+ logger.debug("Asynchronous close seen by server thread");
break;
}
catch (ClosedChannelException e)
@@ -877,10 +871,12 @@ public final class MessagingService implements MessagingServiceMBean
FileUtils.closeQuietly(socket);
}
}
+ logger.info("MessagingService has terminated the accept() thread");
}
void close() throws IOException
{
+ logger.debug("Closing accept() thread");
server.close();
}