You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/12/17 23:38:54 UTC

[2/5] git commit: cleanup + debug logging

cleanup + debug logging


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1d605281
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1d605281
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1d605281

Branch: refs/heads/trunk
Commit: 1d6052810df9363ed8dee308444b8466be112b5d
Parents: ecec863
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Dec 17 16:34:53 2013 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Dec 17 16:37:05 2013 -0600

----------------------------------------------------------------------
 .../apache/cassandra/net/MessagingService.java  | 48 +++++++++-----------
 1 file changed, 22 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d605281/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 20cad82..b2c8014 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -37,7 +37,6 @@ import com.google.common.collect.Lists;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.concurrent.TracingAwareExecutorService;
@@ -391,7 +390,7 @@ public final class MessagingService implements MessagingServiceMBean
     public void listen(InetAddress localEp) throws ConfigurationException
     {
         callbacks.reset(); // hack to allow tests to stop/restart MS
-        for (ServerSocket ss : getServerSocket(localEp))
+        for (ServerSocket ss : getServerSockets(localEp))
         {
             SocketThread th = new SocketThread(ss, "ACCEPT-" + localEp);
             th.start();
@@ -400,7 +399,7 @@ public final class MessagingService implements MessagingServiceMBean
         listenGate.signalAll();
     }
 
-    private List<ServerSocket> getServerSocket(InetAddress localEp) throws ConfigurationException
+    private List<ServerSocket> getServerSockets(InetAddress localEp) throws ConfigurationException
     {
         final List<ServerSocket> ss = new ArrayList<ServerSocket>(2);
         if (DatabaseDescriptor.getServerEncryptionOptions().internode_encryption != ServerEncryptionOptions.InternodeEncryption.none)
@@ -834,36 +833,31 @@ public final class MessagingService implements MessagingServiceMBean
                 try
                 {
                     socket = server.accept();
-                    if (authenticate(socket))
-                    {
-                        socket.setKeepAlive(true);
-                        // determine the connection type to decide whether to buffer
-                        DataInputStream in = new DataInputStream(socket.getInputStream());
-                        MessagingService.validateMagic(in.readInt());
-                        int header = in.readInt();
-                        boolean isStream = MessagingService.getBits(header, 3, 1) == 1;
-                        int version = MessagingService.getBits(header, 15, 8);
-                        logger.debug("Connection version {} from {}", version, socket.getInetAddress());
-
-                        if (isStream)
-                        {
-                            new IncomingStreamingConnection(version, socket).start();
-                        }
-                        else
-                        {
-                            boolean compressed = MessagingService.getBits(header, 2, 1) == 1;
-                            new IncomingTcpConnection(version, compressed, socket).start();
-                        }
-                    }
-                    else
+                    if (!authenticate(socket))
                     {
+                        logger.debug("remote failed to authenticate");
                         socket.close();
+                        continue;
                     }
+
+                    socket.setKeepAlive(true);
+                    // determine the connection type to decide whether to buffer
+                    DataInputStream in = new DataInputStream(socket.getInputStream());
+                    MessagingService.validateMagic(in.readInt());
+                    int header = in.readInt();
+                    boolean isStream = MessagingService.getBits(header, 3, 1) == 1;
+                    int version = MessagingService.getBits(header, 15, 8);
+                    logger.debug("Connection version {} from {}", version, socket.getInetAddress());
+
+                    Thread thread = isStream
+                                  ? new IncomingStreamingConnection(version, socket)
+                                  : new IncomingTcpConnection(version, MessagingService.getBits(header, 2, 1) == 1, socket);
+                    thread.start();
                 }
                 catch (AsynchronousCloseException e)
                 {
                     // this happens when another thread calls close().
-                    logger.info("MessagingService shutting down server thread");
+                    logger.debug("Asynchronous close seen by server thread");
                     break;
                 }
                 catch (ClosedChannelException e)
@@ -877,10 +871,12 @@ public final class MessagingService implements MessagingServiceMBean
                     FileUtils.closeQuietly(socket);
                 }
             }
+            logger.info("MessagingService has terminated the accept() thread");
         }
 
         void close() throws IOException
         {
+            logger.debug("Closing accept() thread");
             server.close();
         }