You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2016/06/28 08:28:03 UTC

[08/21] cassandra git commit: Remove finished incoming streaming connections from MessagingService

Remove finished incoming streaming connections from MessagingService

patch by Paulo Motta; reviewed by Marcus Eriksson for CASSANDRA-11854


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2811f15b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2811f15b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2811f15b

Branch: refs/heads/cassandra-2.1
Commit: 2811f15bc9117fed4fb38de490d25d68df4e85b7
Parents: 341b3fb
Author: Paulo Motta <pa...@gmail.com>
Authored: Mon Jun 27 12:17:33 2016 -0300
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Jun 28 10:13:24 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../net/IncomingStreamingConnection.java        |  4 +-
 .../apache/cassandra/net/MessagingService.java  | 12 +++++-
 .../cassandra/streaming/ConnectionHandler.java  | 39 +++++++++++++++-----
 .../cassandra/streaming/StreamResultFuture.java | 27 +++++++-------
 .../streaming/StreamingTransferTest.java        | 23 ++++++++++++
 6 files changed, 79 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2811f15b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 620568d..5741241 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.15
+ * Remove finished incoming streaming connections from MessagingService (CASSANDRA-11854)
  * Don't try to get sstables for non-repairing column families (CASSANDRA-12077)
  * Prevent select statements with clustering key > 64k (CASSANDRA-11882)
  * Avoid marking too many sstables as repaired (CASSANDRA-11696)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2811f15b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
index 5ced786..bfe92f9 100644
--- a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
@@ -40,7 +40,7 @@ public class IncomingStreamingConnection extends Thread implements Closeable
     private static final Logger logger = LoggerFactory.getLogger(IncomingStreamingConnection.class);
 
     private final int version;
-    private final Socket socket;
+    public final Socket socket;
     private final Set<Closeable> group;
 
     public IncomingStreamingConnection(int version, Socket socket, Set<Closeable> group)
@@ -71,7 +71,7 @@ public class IncomingStreamingConnection extends Thread implements Closeable
             // The receiving side distinguish two connections by looking at StreamInitMessage#isForOutgoing.
             // Note: we cannot use the same socket for incoming and outgoing streams because we want to
             // parallelize said streams and the socket is blocking, so we might deadlock.
-            StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.description, init.from, socket, init.isForOutgoing, version);
+            StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.description, init.from, this, init.isForOutgoing, version);
         }
         catch (IOException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2811f15b/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 82320b1..ac8ad79 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -955,10 +955,12 @@ public final class MessagingService implements MessagingServiceMBean
         return ret;
     }
 
-    private static class SocketThread extends Thread
+    @VisibleForTesting
+    public static class SocketThread extends Thread
     {
         private final ServerSocket server;
-        private final Set<Closeable> connections = Sets.newConcurrentHashSet();
+        @VisibleForTesting
+        public final Set<Closeable> connections = Sets.newConcurrentHashSet();
 
         SocketThread(ServerSocket server, String name)
         {
@@ -1145,4 +1147,10 @@ public final class MessagingService implements MessagingServiceMBean
         }
         return result;
     }
+
+    @VisibleForTesting
+    public List<SocketThread> getSocketThreads()
+    {
+        return socketThreads;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2811f15b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index 52268b2..60ce11e 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -37,6 +37,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.net.IncomingStreamingConnection;
 import org.apache.cassandra.streaming.messages.StreamInitMessage;
 import org.apache.cassandra.streaming.messages.StreamMessage;
 import org.apache.cassandra.utils.FBUtilities;
@@ -89,16 +90,16 @@ public class ConnectionHandler
     /**
      * Set up outgoing message handler on receiving side.
      *
-     * @param socket socket to use for {@link org.apache.cassandra.streaming.ConnectionHandler.OutgoingMessageHandler}.
+     * @param connection Incoming connection to use for {@link OutgoingMessageHandler}.
      * @param version Streaming message version
      * @throws IOException
      */
-    public void initiateOnReceivingSide(Socket socket, boolean isForOutgoing, int version) throws IOException
+    public void initiateOnReceivingSide(IncomingStreamingConnection connection, boolean isForOutgoing, int version) throws IOException
     {
         if (isForOutgoing)
-            outgoing.start(socket, version);
+            outgoing.start(connection, version);
         else
-            incoming.start(socket, version);
+            incoming.start(connection, version);
     }
 
     public ListenableFuture<?> close()
@@ -156,6 +157,7 @@ public class ConnectionHandler
         protected Socket socket;
 
         private final AtomicReference<SettableFuture<?>> closeFuture = new AtomicReference<>();
+        private IncomingStreamingConnection incomingConnection;
 
         protected MessageHandler(StreamSession session)
         {
@@ -191,6 +193,12 @@ public class ConnectionHandler
             getWriteChannel(socket).write(messageBuf);
         }
 
+        public void start(IncomingStreamingConnection connection, int protocolVersion)
+        {
+            this.incomingConnection = connection;
+            start(connection.socket, protocolVersion);
+        }
+
         public void start(Socket socket, int protocolVersion)
         {
             this.socket = socket;
@@ -218,15 +226,26 @@ public class ConnectionHandler
             closeFuture.get().set(null);
 
             // We can now close the socket
-            try
+            if (incomingConnection != null)
             {
-                socket.close();
+                //this will close the underlying socket and remove it
+                //from active MessagingService connections (CASSANDRA-11854)
+                incomingConnection.close();
             }
-            catch (IOException e)
+            else
             {
-                // Erroring out while closing shouldn't happen but is not really a big deal, so just log
-                // it at DEBUG and ignore otherwise.
-                logger.debug("Unexpected error while closing streaming connection", e);
+                //this is an outgoing connection not registered in the MessagingService
+                //so we can close the socket directly
+                try
+                {
+                    socket.close();
+                }
+                catch (IOException e)
+                {
+                    // Erroring out while closing shouldn't happen but is not really a big deal, so just log
+                    // it at DEBUG and ignore otherwise.
+                    logger.debug("Unexpected error while closing streaming connection", e);
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2811f15b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index 6a6f2b9..5c9c6de 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.streaming;
 
 import java.io.IOException;
 import java.net.InetAddress;
-import java.net.Socket;
 import java.util.*;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
@@ -28,6 +27,8 @@ import com.google.common.util.concurrent.Futures;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.net.IncomingStreamingConnection;
+
 /**
  * A future on the result ({@link StreamState}) of a streaming plan.
  *
@@ -83,7 +84,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
                 future.addEventListener(listener);
         }
 
-        logger.info("[Stream #{}] Executing streaming plan for {}", planId,  description);
+        logger.info("[Stream #{}] Executing streaming plan for {}", planId, description);
 
         // Initialize and start all sessions
         for (final StreamSession session : coordinator.getAllStreamSessions())
@@ -99,7 +100,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
                                                                     UUID planId,
                                                                     String description,
                                                                     InetAddress from,
-                                                                    Socket socket,
+                                                                    IncomingStreamingConnection connection,
                                                                     boolean isForOutgoing,
                                                                     int version) throws IOException
     {
@@ -112,7 +113,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
             future = new StreamResultFuture(planId, description);
             StreamManager.instance.registerReceiving(future);
         }
-        future.attachSocket(from, sessionIndex, socket, isForOutgoing, version);
+        future.attachConnection(from, sessionIndex, connection, isForOutgoing, version);
         logger.info("[Stream #{}, ID#{}] Received streaming plan for {}", planId, sessionIndex, description);
         return future;
     }
@@ -124,11 +125,11 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
         return future;
     }
 
-    private void attachSocket(InetAddress from, int sessionIndex, Socket socket, boolean isForOutgoing, int version) throws IOException
+    private void attachConnection(InetAddress from, int sessionIndex, IncomingStreamingConnection connection, boolean isForOutgoing, int version) throws IOException
     {
-        StreamSession session = coordinator.getOrCreateSessionById(from, sessionIndex, socket.getInetAddress());
+        StreamSession session = coordinator.getOrCreateSessionById(from, sessionIndex, connection.socket.getInetAddress());
         session.init(this);
-        session.handler.initiateOnReceivingSide(socket, isForOutgoing, version);
+        session.handler.initiateOnReceivingSide(connection, isForOutgoing, version);
     }
 
     public void addEventListener(StreamEventHandler listener)
@@ -164,12 +165,12 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
     {
         SessionInfo sessionInfo = session.getSessionInfo();
         logger.info("[Stream #{} ID#{}] Prepare completed. Receiving {} files({} bytes), sending {} files({} bytes)",
-                              session.planId(),
-                              session.sessionIndex(),
-                              sessionInfo.getTotalFilesToReceive(),
-                              sessionInfo.getTotalSizeToReceive(),
-                              sessionInfo.getTotalFilesToSend(),
-                              sessionInfo.getTotalSizeToSend());
+                    session.planId(),
+                    session.sessionIndex(),
+                    sessionInfo.getTotalFilesToReceive(),
+                    sessionInfo.getTotalSizeToReceive(),
+                    sessionInfo.getTotalFilesToSend(),
+                    sessionInfo.getTotalSizeToSend());
         StreamEvent.SessionPreparedEvent event = new StreamEvent.SessionPreparedEvent(planId, sessionInfo);
         coordinator.addSessionInfo(sessionInfo);
         fireStreamEvent(event);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2811f15b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 31dc492..abff812 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@ -52,6 +52,7 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableUtils;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.CounterId;
@@ -206,11 +207,33 @@ public class StreamingTransferTest extends SchemaLoader
         // wrapped range
         ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key1")), p.getToken(ByteBufferUtil.bytes("key0"))));
         new StreamPlan("StreamingTransferTest").transferRanges(LOCAL, cfs.keyspace.getName(), ranges, cfs.getColumnFamilyName()).execute().get();
+        verifyConnectionsAreClosed();
     }
 
     private void transfer(SSTableReader sstable, List<Range<Token>> ranges) throws Exception
     {
         new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(ranges, Refs.tryRef(Arrays.asList(sstable)))).execute().get();
+        verifyConnectionsAreClosed();
+    }
+
+    /**
+     * Test that finished incoming connections are removed from MessagingService (CASSANDRA-11854)
+     */
+    private void verifyConnectionsAreClosed() throws InterruptedException
+    {
+        //after stream session is finished, message handlers may take several milliseconds to be closed
+        outer:
+        for (int i = 0; i <= 10; i++)
+        {
+            for (MessagingService.SocketThread socketThread : MessagingService.instance().getSocketThreads())
+                if (!socketThread.connections.isEmpty())
+                {
+                    Thread.sleep(100);
+                    continue outer;
+                }
+            return;
+        }
+        fail("Streaming connections remain registered in MessagingService");
     }
 
     private Collection<StreamSession.SSTableStreamingSections> makeStreamingDetails(List<Range<Token>> ranges, Refs<SSTableReader> sstables)