You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/12/17 23:38:53 UTC

[1/5] git commit: cleanup + debug logging

Updated Branches:
  refs/heads/cassandra-2.0 ecec863d1 -> 53af91e65
  refs/heads/trunk 90e585dde -> 6635cde3a


cleanup + debug logging


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1d605281
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1d605281
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1d605281

Branch: refs/heads/cassandra-2.0
Commit: 1d6052810df9363ed8dee308444b8466be112b5d
Parents: ecec863
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Dec 17 16:34:53 2013 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Dec 17 16:37:05 2013 -0600

----------------------------------------------------------------------
 .../apache/cassandra/net/MessagingService.java  | 48 +++++++++-----------
 1 file changed, 22 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d605281/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 20cad82..b2c8014 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -37,7 +37,6 @@ import com.google.common.collect.Lists;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.concurrent.TracingAwareExecutorService;
@@ -391,7 +390,7 @@ public final class MessagingService implements MessagingServiceMBean
     public void listen(InetAddress localEp) throws ConfigurationException
     {
         callbacks.reset(); // hack to allow tests to stop/restart MS
-        for (ServerSocket ss : getServerSocket(localEp))
+        for (ServerSocket ss : getServerSockets(localEp))
         {
             SocketThread th = new SocketThread(ss, "ACCEPT-" + localEp);
             th.start();
@@ -400,7 +399,7 @@ public final class MessagingService implements MessagingServiceMBean
         listenGate.signalAll();
     }
 
-    private List<ServerSocket> getServerSocket(InetAddress localEp) throws ConfigurationException
+    private List<ServerSocket> getServerSockets(InetAddress localEp) throws ConfigurationException
     {
         final List<ServerSocket> ss = new ArrayList<ServerSocket>(2);
         if (DatabaseDescriptor.getServerEncryptionOptions().internode_encryption != ServerEncryptionOptions.InternodeEncryption.none)
@@ -834,36 +833,31 @@ public final class MessagingService implements MessagingServiceMBean
                 try
                 {
                     socket = server.accept();
-                    if (authenticate(socket))
-                    {
-                        socket.setKeepAlive(true);
-                        // determine the connection type to decide whether to buffer
-                        DataInputStream in = new DataInputStream(socket.getInputStream());
-                        MessagingService.validateMagic(in.readInt());
-                        int header = in.readInt();
-                        boolean isStream = MessagingService.getBits(header, 3, 1) == 1;
-                        int version = MessagingService.getBits(header, 15, 8);
-                        logger.debug("Connection version {} from {}", version, socket.getInetAddress());
-
-                        if (isStream)
-                        {
-                            new IncomingStreamingConnection(version, socket).start();
-                        }
-                        else
-                        {
-                            boolean compressed = MessagingService.getBits(header, 2, 1) == 1;
-                            new IncomingTcpConnection(version, compressed, socket).start();
-                        }
-                    }
-                    else
+                    if (!authenticate(socket))
                     {
+                        logger.debug("remote failed to authenticate");
                         socket.close();
+                        continue;
                     }
+
+                    socket.setKeepAlive(true);
+                    // determine the connection type to decide whether to buffer
+                    DataInputStream in = new DataInputStream(socket.getInputStream());
+                    MessagingService.validateMagic(in.readInt());
+                    int header = in.readInt();
+                    boolean isStream = MessagingService.getBits(header, 3, 1) == 1;
+                    int version = MessagingService.getBits(header, 15, 8);
+                    logger.debug("Connection version {} from {}", version, socket.getInetAddress());
+
+                    Thread thread = isStream
+                                  ? new IncomingStreamingConnection(version, socket)
+                                  : new IncomingTcpConnection(version, MessagingService.getBits(header, 2, 1) == 1, socket);
+                    thread.start();
                 }
                 catch (AsynchronousCloseException e)
                 {
                     // this happens when another thread calls close().
-                    logger.info("MessagingService shutting down server thread");
+                    logger.debug("Asynchronous close seen by server thread");
                     break;
                 }
                 catch (ClosedChannelException e)
@@ -877,10 +871,12 @@ public final class MessagingService implements MessagingServiceMBean
                     FileUtils.closeQuietly(socket);
                 }
             }
+            logger.info("MessagingService has terminated the accept() thread");
         }
 
         void close() throws IOException
         {
+            logger.debug("Closing accept() thread");
             server.close();
         }
 


[3/5] git commit: loop based on isClosed to accomodate SSL sockets patch by Mikhail Stepura; reviewed by jbellis for CASSANDRA-6349

Posted by jb...@apache.org.
loop based on isClosed to accomodate SSL sockets
patch by Mikhail Stepura; reviewed by jbellis for CASSANDRA-6349


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/53af91e6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/53af91e6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/53af91e6

Branch: refs/heads/trunk
Commit: 53af91e650d3fd881df6e811f5d4c5e46a039119
Parents: 1d60528
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Dec 17 16:37:43 2013 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Dec 17 16:38:32 2013 -0600

----------------------------------------------------------------------
 CHANGES.txt                                             | 1 +
 src/java/org/apache/cassandra/net/MessagingService.java | 2 +-
 2 files changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/53af91e6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c2cd052..b8757d7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.4
+ * Fix accept() loop for SSL sockets post-shutdown (CASSANDRA-6468)
  * Fix size-tiered compaction in LCS L0 (CASSANDRA-6496)
  * Fix assertion failure in filterColdSSTables (CASSANDRA-6483)
  * Fix row tombstones in larger-than-memory compactions (CASSANDRA-6008)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53af91e6/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index b2c8014..21c9345 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -827,7 +827,7 @@ public final class MessagingService implements MessagingServiceMBean
 
         public void run()
         {
-            while (true)
+            while (!server.isClosed())
             {
                 Socket socket = null;
                 try


[2/5] git commit: cleanup + debug logging

Posted by jb...@apache.org.
cleanup + debug logging


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1d605281
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1d605281
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1d605281

Branch: refs/heads/trunk
Commit: 1d6052810df9363ed8dee308444b8466be112b5d
Parents: ecec863
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Dec 17 16:34:53 2013 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Dec 17 16:37:05 2013 -0600

----------------------------------------------------------------------
 .../apache/cassandra/net/MessagingService.java  | 48 +++++++++-----------
 1 file changed, 22 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d605281/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 20cad82..b2c8014 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -37,7 +37,6 @@ import com.google.common.collect.Lists;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.concurrent.TracingAwareExecutorService;
@@ -391,7 +390,7 @@ public final class MessagingService implements MessagingServiceMBean
     public void listen(InetAddress localEp) throws ConfigurationException
     {
         callbacks.reset(); // hack to allow tests to stop/restart MS
-        for (ServerSocket ss : getServerSocket(localEp))
+        for (ServerSocket ss : getServerSockets(localEp))
         {
             SocketThread th = new SocketThread(ss, "ACCEPT-" + localEp);
             th.start();
@@ -400,7 +399,7 @@ public final class MessagingService implements MessagingServiceMBean
         listenGate.signalAll();
     }
 
-    private List<ServerSocket> getServerSocket(InetAddress localEp) throws ConfigurationException
+    private List<ServerSocket> getServerSockets(InetAddress localEp) throws ConfigurationException
     {
         final List<ServerSocket> ss = new ArrayList<ServerSocket>(2);
         if (DatabaseDescriptor.getServerEncryptionOptions().internode_encryption != ServerEncryptionOptions.InternodeEncryption.none)
@@ -834,36 +833,31 @@ public final class MessagingService implements MessagingServiceMBean
                 try
                 {
                     socket = server.accept();
-                    if (authenticate(socket))
-                    {
-                        socket.setKeepAlive(true);
-                        // determine the connection type to decide whether to buffer
-                        DataInputStream in = new DataInputStream(socket.getInputStream());
-                        MessagingService.validateMagic(in.readInt());
-                        int header = in.readInt();
-                        boolean isStream = MessagingService.getBits(header, 3, 1) == 1;
-                        int version = MessagingService.getBits(header, 15, 8);
-                        logger.debug("Connection version {} from {}", version, socket.getInetAddress());
-
-                        if (isStream)
-                        {
-                            new IncomingStreamingConnection(version, socket).start();
-                        }
-                        else
-                        {
-                            boolean compressed = MessagingService.getBits(header, 2, 1) == 1;
-                            new IncomingTcpConnection(version, compressed, socket).start();
-                        }
-                    }
-                    else
+                    if (!authenticate(socket))
                     {
+                        logger.debug("remote failed to authenticate");
                         socket.close();
+                        continue;
                     }
+
+                    socket.setKeepAlive(true);
+                    // determine the connection type to decide whether to buffer
+                    DataInputStream in = new DataInputStream(socket.getInputStream());
+                    MessagingService.validateMagic(in.readInt());
+                    int header = in.readInt();
+                    boolean isStream = MessagingService.getBits(header, 3, 1) == 1;
+                    int version = MessagingService.getBits(header, 15, 8);
+                    logger.debug("Connection version {} from {}", version, socket.getInetAddress());
+
+                    Thread thread = isStream
+                                  ? new IncomingStreamingConnection(version, socket)
+                                  : new IncomingTcpConnection(version, MessagingService.getBits(header, 2, 1) == 1, socket);
+                    thread.start();
                 }
                 catch (AsynchronousCloseException e)
                 {
                     // this happens when another thread calls close().
-                    logger.info("MessagingService shutting down server thread");
+                    logger.debug("Asynchronous close seen by server thread");
                     break;
                 }
                 catch (ClosedChannelException e)
@@ -877,10 +871,12 @@ public final class MessagingService implements MessagingServiceMBean
                     FileUtils.closeQuietly(socket);
                 }
             }
+            logger.info("MessagingService has terminated the accept() thread");
         }
 
         void close() throws IOException
         {
+            logger.debug("Closing accept() thread");
             server.close();
         }
 


[5/5] git commit: Merge branch 'cassandra-2.0' into trunk

Posted by jb...@apache.org.
Merge branch 'cassandra-2.0' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6635cde3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6635cde3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6635cde3

Branch: refs/heads/trunk
Commit: 6635cde3a0eb6fa2e0599aa9d970596a8664cd63
Parents: 90e585d 53af91e
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Dec 17 16:38:46 2013 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Dec 17 16:38:46 2013 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/net/MessagingService.java  | 50 +++++++++-----------
 2 files changed, 24 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6635cde3/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 6c9f2e1,b8757d7..1c88431
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,25 -1,5 +1,26 @@@
 +2.1
 + * Multithreaded commitlog (CASSANDRA-3578)
 + * allocate fixed index summary memory pool and resample cold index summaries 
 +   to use less memory (CASSANDRA-5519)
 + * Removed multithreaded compaction (CASSANDRA-6142)
 + * Parallelize fetching rows for low-cardinality indexes (CASSANDRA-1337)
 + * change logging from log4j to logback (CASSANDRA-5883)
 + * switch to LZ4 compression for internode communication (CASSANDRA-5887)
 + * Stop using Thrift-generated Index* classes internally (CASSANDRA-5971)
 + * Remove 1.2 network compatibility code (CASSANDRA-5960)
 + * Remove leveled json manifest migration code (CASSANDRA-5996)
 + * Remove CFDefinition (CASSANDRA-6253)
 + * Use AtomicIntegerFieldUpdater in RefCountedMemory (CASSANDRA-6278)
 + * User-defined types for CQL3 (CASSANDRA-5590)
 + * Use of o.a.c.metrics in nodetool (CASSANDRA-5871, 6406)
 + * Batch read from OTC's queue and cleanup (CASSANDRA-1632)
 + * Secondary index support for collections (CASSANDRA-4511)
 + * SSTable metadata(Stats.db) format change (CASSANDRA-6356)
 + * Push composites support in the storage engine (CASSANDRA-5417)
 +
 +
  2.0.4
+  * Fix accept() loop for SSL sockets post-shutdown (CASSANDRA-6468)
   * Fix size-tiered compaction in LCS L0 (CASSANDRA-6496)
   * Fix assertion failure in filterColdSSTables (CASSANDRA-6483)
   * Fix row tombstones in larger-than-memory compactions (CASSANDRA-6008)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6635cde3/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------


[4/5] git commit: loop based on isClosed to accomodate SSL sockets patch by Mikhail Stepura; reviewed by jbellis for CASSANDRA-6349

Posted by jb...@apache.org.
loop based on isClosed to accomodate SSL sockets
patch by Mikhail Stepura; reviewed by jbellis for CASSANDRA-6349


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/53af91e6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/53af91e6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/53af91e6

Branch: refs/heads/cassandra-2.0
Commit: 53af91e650d3fd881df6e811f5d4c5e46a039119
Parents: 1d60528
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Dec 17 16:37:43 2013 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Dec 17 16:38:32 2013 -0600

----------------------------------------------------------------------
 CHANGES.txt                                             | 1 +
 src/java/org/apache/cassandra/net/MessagingService.java | 2 +-
 2 files changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/53af91e6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c2cd052..b8757d7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.4
+ * Fix accept() loop for SSL sockets post-shutdown (CASSANDRA-6468)
  * Fix size-tiered compaction in LCS L0 (CASSANDRA-6496)
  * Fix assertion failure in filterColdSSTables (CASSANDRA-6483)
  * Fix row tombstones in larger-than-memory compactions (CASSANDRA-6008)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53af91e6/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index b2c8014..21c9345 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -827,7 +827,7 @@ public final class MessagingService implements MessagingServiceMBean
 
         public void run()
         {
-            while (true)
+            while (!server.isClosed())
             {
                 Socket socket = null;
                 try