You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2016/06/28 08:28:03 UTC
[08/21] cassandra git commit: Remove finished incoming streaming
connections from MessagingService
Remove finished incoming streaming connections from MessagingService
patch by Paulo Motta; reviewed by Marcus Eriksson for CASSANDRA-11854
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2811f15b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2811f15b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2811f15b
Branch: refs/heads/cassandra-2.1
Commit: 2811f15bc9117fed4fb38de490d25d68df4e85b7
Parents: 341b3fb
Author: Paulo Motta <pa...@gmail.com>
Authored: Mon Jun 27 12:17:33 2016 -0300
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Jun 28 10:13:24 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../net/IncomingStreamingConnection.java | 4 +-
.../apache/cassandra/net/MessagingService.java | 12 +++++-
.../cassandra/streaming/ConnectionHandler.java | 39 +++++++++++++++-----
.../cassandra/streaming/StreamResultFuture.java | 27 +++++++-------
.../streaming/StreamingTransferTest.java | 23 ++++++++++++
6 files changed, 79 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2811f15b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 620568d..5741241 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.15
+ * Remove finished incoming streaming connections from MessagingService (CASSANDRA-11854)
* Don't try to get sstables for non-repairing column families (CASSANDRA-12077)
* Prevent select statements with clustering key > 64k (CASSANDRA-11882)
* Avoid marking too many sstables as repaired (CASSANDRA-11696)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2811f15b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
index 5ced786..bfe92f9 100644
--- a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
@@ -40,7 +40,7 @@ public class IncomingStreamingConnection extends Thread implements Closeable
private static final Logger logger = LoggerFactory.getLogger(IncomingStreamingConnection.class);
private final int version;
- private final Socket socket;
+ public final Socket socket;
private final Set<Closeable> group;
public IncomingStreamingConnection(int version, Socket socket, Set<Closeable> group)
@@ -71,7 +71,7 @@ public class IncomingStreamingConnection extends Thread implements Closeable
// The receiving side distinguish two connections by looking at StreamInitMessage#isForOutgoing.
// Note: we cannot use the same socket for incoming and outgoing streams because we want to
// parallelize said streams and the socket is blocking, so we might deadlock.
- StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.description, init.from, socket, init.isForOutgoing, version);
+ StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.description, init.from, this, init.isForOutgoing, version);
}
catch (IOException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2811f15b/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 82320b1..ac8ad79 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -955,10 +955,12 @@ public final class MessagingService implements MessagingServiceMBean
return ret;
}
- private static class SocketThread extends Thread
+ @VisibleForTesting
+ public static class SocketThread extends Thread
{
private final ServerSocket server;
- private final Set<Closeable> connections = Sets.newConcurrentHashSet();
+ @VisibleForTesting
+ public final Set<Closeable> connections = Sets.newConcurrentHashSet();
SocketThread(ServerSocket server, String name)
{
@@ -1145,4 +1147,10 @@ public final class MessagingService implements MessagingServiceMBean
}
return result;
}
+
+ @VisibleForTesting
+ public List<SocketThread> getSocketThreads()
+ {
+ return socketThreads;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2811f15b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index 52268b2..60ce11e 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -37,6 +37,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
+import org.apache.cassandra.net.IncomingStreamingConnection;
import org.apache.cassandra.streaming.messages.StreamInitMessage;
import org.apache.cassandra.streaming.messages.StreamMessage;
import org.apache.cassandra.utils.FBUtilities;
@@ -89,16 +90,16 @@ public class ConnectionHandler
/**
* Set up outgoing message handler on receiving side.
*
- * @param socket socket to use for {@link org.apache.cassandra.streaming.ConnectionHandler.OutgoingMessageHandler}.
+ * @param connection Incoming connection to use for {@link OutgoingMessageHandler}.
* @param version Streaming message version
* @throws IOException
*/
- public void initiateOnReceivingSide(Socket socket, boolean isForOutgoing, int version) throws IOException
+ public void initiateOnReceivingSide(IncomingStreamingConnection connection, boolean isForOutgoing, int version) throws IOException
{
if (isForOutgoing)
- outgoing.start(socket, version);
+ outgoing.start(connection, version);
else
- incoming.start(socket, version);
+ incoming.start(connection, version);
}
public ListenableFuture<?> close()
@@ -156,6 +157,7 @@ public class ConnectionHandler
protected Socket socket;
private final AtomicReference<SettableFuture<?>> closeFuture = new AtomicReference<>();
+ private IncomingStreamingConnection incomingConnection;
protected MessageHandler(StreamSession session)
{
@@ -191,6 +193,12 @@ public class ConnectionHandler
getWriteChannel(socket).write(messageBuf);
}
+ public void start(IncomingStreamingConnection connection, int protocolVersion)
+ {
+ this.incomingConnection = connection;
+ start(connection.socket, protocolVersion);
+ }
+
public void start(Socket socket, int protocolVersion)
{
this.socket = socket;
@@ -218,15 +226,26 @@ public class ConnectionHandler
closeFuture.get().set(null);
// We can now close the socket
- try
+ if (incomingConnection != null)
{
- socket.close();
+ //this will close the underlying socket and remove it
+ //from active MessagingService connections (CASSANDRA-11854)
+ incomingConnection.close();
}
- catch (IOException e)
+ else
{
- // Erroring out while closing shouldn't happen but is not really a big deal, so just log
- // it at DEBUG and ignore otherwise.
- logger.debug("Unexpected error while closing streaming connection", e);
+ //this is an outgoing connection not registered in the MessagingService
+ //so we can close the socket directly
+ try
+ {
+ socket.close();
+ }
+ catch (IOException e)
+ {
+ // Erroring out while closing shouldn't happen but is not really a big deal, so just log
+ // it at DEBUG and ignore otherwise.
+ logger.debug("Unexpected error while closing streaming connection", e);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2811f15b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index 6a6f2b9..5c9c6de 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.streaming;
import java.io.IOException;
import java.net.InetAddress;
-import java.net.Socket;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -28,6 +27,8 @@ import com.google.common.util.concurrent.Futures;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.net.IncomingStreamingConnection;
+
/**
* A future on the result ({@link StreamState}) of a streaming plan.
*
@@ -83,7 +84,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
future.addEventListener(listener);
}
- logger.info("[Stream #{}] Executing streaming plan for {}", planId, description);
+ logger.info("[Stream #{}] Executing streaming plan for {}", planId, description);
// Initialize and start all sessions
for (final StreamSession session : coordinator.getAllStreamSessions())
@@ -99,7 +100,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
UUID planId,
String description,
InetAddress from,
- Socket socket,
+ IncomingStreamingConnection connection,
boolean isForOutgoing,
int version) throws IOException
{
@@ -112,7 +113,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
future = new StreamResultFuture(planId, description);
StreamManager.instance.registerReceiving(future);
}
- future.attachSocket(from, sessionIndex, socket, isForOutgoing, version);
+ future.attachConnection(from, sessionIndex, connection, isForOutgoing, version);
logger.info("[Stream #{}, ID#{}] Received streaming plan for {}", planId, sessionIndex, description);
return future;
}
@@ -124,11 +125,11 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
return future;
}
- private void attachSocket(InetAddress from, int sessionIndex, Socket socket, boolean isForOutgoing, int version) throws IOException
+ private void attachConnection(InetAddress from, int sessionIndex, IncomingStreamingConnection connection, boolean isForOutgoing, int version) throws IOException
{
- StreamSession session = coordinator.getOrCreateSessionById(from, sessionIndex, socket.getInetAddress());
+ StreamSession session = coordinator.getOrCreateSessionById(from, sessionIndex, connection.socket.getInetAddress());
session.init(this);
- session.handler.initiateOnReceivingSide(socket, isForOutgoing, version);
+ session.handler.initiateOnReceivingSide(connection, isForOutgoing, version);
}
public void addEventListener(StreamEventHandler listener)
@@ -164,12 +165,12 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
{
SessionInfo sessionInfo = session.getSessionInfo();
logger.info("[Stream #{} ID#{}] Prepare completed. Receiving {} files({} bytes), sending {} files({} bytes)",
- session.planId(),
- session.sessionIndex(),
- sessionInfo.getTotalFilesToReceive(),
- sessionInfo.getTotalSizeToReceive(),
- sessionInfo.getTotalFilesToSend(),
- sessionInfo.getTotalSizeToSend());
+ session.planId(),
+ session.sessionIndex(),
+ sessionInfo.getTotalFilesToReceive(),
+ sessionInfo.getTotalSizeToReceive(),
+ sessionInfo.getTotalFilesToSend(),
+ sessionInfo.getTotalSizeToSend());
StreamEvent.SessionPreparedEvent event = new StreamEvent.SessionPreparedEvent(planId, sessionInfo);
coordinator.addSessionInfo(sessionInfo);
fireStreamEvent(event);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2811f15b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 31dc492..abff812 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@ -52,6 +52,7 @@ import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableUtils;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.CounterId;
@@ -206,11 +207,33 @@ public class StreamingTransferTest extends SchemaLoader
// wrapped range
ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key1")), p.getToken(ByteBufferUtil.bytes("key0"))));
new StreamPlan("StreamingTransferTest").transferRanges(LOCAL, cfs.keyspace.getName(), ranges, cfs.getColumnFamilyName()).execute().get();
+ verifyConnectionsAreClosed();
}
private void transfer(SSTableReader sstable, List<Range<Token>> ranges) throws Exception
{
new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(ranges, Refs.tryRef(Arrays.asList(sstable)))).execute().get();
+ verifyConnectionsAreClosed();
+ }
+
+ /**
+ * Test that finished incoming connections are removed from MessagingService (CASSANDRA-11854)
+ */
+ private void verifyConnectionsAreClosed() throws InterruptedException
+ {
+ //after stream session is finished, message handlers may take several milliseconds to be closed
+ outer:
+ for (int i = 0; i <= 10; i++)
+ {
+ for (MessagingService.SocketThread socketThread : MessagingService.instance().getSocketThreads())
+ if (!socketThread.connections.isEmpty())
+ {
+ Thread.sleep(100);
+ continue outer;
+ }
+ return;
+ }
+ fail("Streaming connections remain registered in MessagingService");
}
private Collection<StreamSession.SSTableStreamingSections> makeStreamingDetails(List<Range<Token>> ranges, Refs<SSTableReader> sstables)