You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by gd...@apache.org on 2009/12/30 23:20:32 UTC
svn commit: r894721 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra/net:
MessagingService.java TcpConnection.java TcpConnectionManager.java
Author: gdusbabek
Date: Wed Dec 30 22:20:32 2009
New Revision: 894721
URL: http://svn.apache.org/viewvc?rev=894721&view=rev
Log:
CASSANDRA-651 TcpConnectionManager was holding on to disconnected connections, giving the false indication they were being used.
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=894721&r1=894720&r2=894721&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Wed Dec 30 22:20:32 2009
@@ -20,6 +20,8 @@
import org.apache.cassandra.concurrent.*;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.IFailureDetectionEventListener;
import org.apache.cassandra.net.io.SerializerType;
import org.apache.cassandra.net.sink.SinkManager;
import org.apache.cassandra.utils.*;
@@ -27,7 +29,6 @@
import java.io.IOException;
import java.net.ServerSocket;
-import java.net.SocketException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
@@ -40,7 +41,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
-public class MessagingService
+public class MessagingService implements IFailureDetectionEventListener
{
private static int version_ = 1;
//TODO: make this parameter dynamic somehow. Not sure if config is appropriate.
@@ -182,7 +183,14 @@
}
return result;
}
-
+
+ /** called by failure detection code to notify that housekeeping should be performed on downed sockets. */
+ public void convict(InetAddress ep)
+ {
+ logger_.debug("Canceling pool for " + ep);
+ getConnectionPool(FBUtilities.getLocalAddress(), ep).reset();
+ }
+
/**
* Listen on the specified port.
* @param localEp InetAddress whose port to listen on.
@@ -199,7 +207,8 @@
SelectionKey key = SelectorManager.getSelectorManager().register(serverChannel, handler, SelectionKey.OP_ACCEPT);
endPoints_.add(localEp);
- listenSockets_.put(localEp, key);
+ listenSockets_.put(localEp, key);
+ FailureDetector.instance().registerFailureDetectionEventListener(this);
}
/**
@@ -410,12 +419,6 @@
connection = MessagingService.getConnection(processedMessage.getFrom(), to, message);
connection.write(message);
}
- catch (SocketException se)
- {
- // Shutting down the entire pool. May be too conservative an approach.
- MessagingService.getConnectionPool(message.getFrom(), to).shutdown();
- logger_.error("socket error writing to " + to, se);
- }
catch (IOException e)
{
if (connection != null)
@@ -492,6 +495,7 @@
logger_.info("Shutting down ...");
synchronized (MessagingService.class)
{
+ FailureDetector.instance().unregisterFailureDetectionEventListener(MessagingService.instance());
/* Stop listening on any TCP socket */
for (SelectionKey skey : listenSockets_.values())
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java?rev=894721&r1=894720&r2=894721&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java Wed Dec 30 22:20:32 2009
@@ -316,7 +316,7 @@
cancel(key_);
pendingWrites_.clear();
if (pool_ != null)
- pool_.destroy(this);
+ pool_.reset();
}
private void cancel(SelectionKey key)
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java?rev=894721&r1=894720&r2=894721&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java Wed Dec 30 22:20:32 2009
@@ -64,24 +64,12 @@
}
}
- synchronized void shutdown()
+ synchronized void reset()
{
for (TcpConnection con : new TcpConnection[] { cmdCon, ackCon })
if (con != null)
con.closeSocket();
- }
-
- synchronized void destroy(TcpConnection con)
- {
- assert con != null;
- if (cmdCon == con)
- {
- cmdCon = null;
- }
- else
- {
- assert ackCon == con;
- ackCon = null;
- }
+ cmdCon = null;
+ ackCon = null;
}
}