You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/21 00:38:40 UTC
svn commit: r901438 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra/net:
MessagingService.java OutboundTcpConnection.java
OutboundTcpConnectionPool.java
Author: jbellis
Date: Wed Jan 20 23:38:39 2010
New Revision: 901438
URL: http://svn.apache.org/viewvc?rev=901438&view=rev
Log:
handle node failure
patch by jbellis; reviewed by Brandon Williams for CASSANDRA-705
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=901438&r1=901437&r2=901438&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Wed Jan 20 23:38:39 2010
@@ -68,7 +68,7 @@
/* Thread pool to handle messaging write activities */
private static ExecutorService streamExecutor_;
- private static NonBlockingHashMap<String, OutboundTcpConnectionPool> connectionManagers_ = new NonBlockingHashMap<String, OutboundTcpConnectionPool>();
+ private static NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool> connectionManagers_ = new NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool>();
private static Logger logger_ = Logger.getLogger(MessagingService.class);
@@ -132,8 +132,8 @@
/** called by failure detection code to notify that housekeeping should be performed on downed sockets. */
public void convict(InetAddress ep)
{
- logger_.debug("Canceling pool for " + ep);
- getConnectionPool(FBUtilities.getLocalAddress(), ep).reset();
+ logger_.debug("Resetting pool for " + ep);
+ getConnectionPool(ep).reset();
}
/**
@@ -167,21 +167,20 @@
}, "ACCEPT-" + localEp).start();
}
- public static OutboundTcpConnectionPool getConnectionPool(InetAddress from, InetAddress to)
+ public static OutboundTcpConnectionPool getConnectionPool(InetAddress to)
{
- String key = from + ":" + to;
- OutboundTcpConnectionPool cp = connectionManagers_.get(key);
+ OutboundTcpConnectionPool cp = connectionManagers_.get(to);
if (cp == null)
{
- connectionManagers_.putIfAbsent(key, new OutboundTcpConnectionPool(from, to));
- cp = connectionManagers_.get(key);
+ connectionManagers_.putIfAbsent(to, new OutboundTcpConnectionPool(to));
+ cp = connectionManagers_.get(to);
}
return cp;
}
- public static OutboundTcpConnection getConnection(InetAddress from, InetAddress to, Message msg)
+ public static OutboundTcpConnection getConnection(InetAddress to, Message msg)
{
- return getConnectionPool(from, to).getConnection(msg);
+ return getConnectionPool(to).getConnection(msg);
}
/**
@@ -292,12 +291,20 @@
return;
}
+ // message sinks are a testing hook
Message processedMessage = SinkManager.processClientMessageSink(message);
if (processedMessage == null)
{
return;
}
+ // get pooled connection (really, connection queue)
+ OutboundTcpConnection connection = null;
+ connection = getConnection(to, message);
+ if (connection == null)
+ return;
+
+ // pack message with header in a bytebuffer
byte[] data;
try
{
@@ -312,8 +319,7 @@
assert data.length > 0;
ByteBuffer buffer = packIt(data , false, false);
- OutboundTcpConnection connection = null;
- connection = getConnection(processedMessage.getFrom(), to, message);
+ // write it
connection.write(buffer);
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=901438&r1=901437&r2=901438&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java Wed Jan 20 23:38:39 2010
@@ -5,6 +5,7 @@
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
+import java.net.SocketException;
import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -22,19 +23,16 @@
public DataOutputStream output;
public Socket socket;
- // TODO localEp is ignored, get rid of it
- public OutboundTcpConnection(final OutboundTcpConnectionPool pool, InetAddress localEp, final InetAddress remoteEp)
+ public OutboundTcpConnection(final OutboundTcpConnectionPool pool, final InetAddress remoteEp)
+ throws IOException
{
- try
- {
- socket = new Socket(remoteEp, DatabaseDescriptor.getStoragePort());
- socket.setTcpNoDelay(true);
- output = new DataOutputStream(socket.getOutputStream());
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
+ if (logger.isDebugEnabled())
+ logger.debug("attempting to connect to " + remoteEp);
+
+ socket = new Socket(remoteEp, DatabaseDescriptor.getStoragePort());
+ socket.setTcpNoDelay(true);
+ output = new DataOutputStream(socket.getOutputStream());
+
new Thread(new Runnable()
{
public void run()
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java?rev=901438&r1=901437&r2=901438&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java Wed Jan 20 23:38:39 2010
@@ -21,29 +21,29 @@
import java.io.IOException;
import java.net.InetAddress;
+import org.apache.log4j.Logger;
+
import org.apache.cassandra.concurrent.StageManager;
class OutboundTcpConnectionPool
{
- private InetAddress localEp_;
+ private static Logger logger = Logger.getLogger(OutboundTcpConnectionPool.class);
+
+ private final int OPEN_RETRY_DELAY = 100; // ms between retries
+
private InetAddress remoteEp_;
private OutboundTcpConnection cmdCon;
private OutboundTcpConnection ackCon;
+ private long lastFailedAttempt = Long.MIN_VALUE;
- // TODO localEp is ignored, get rid of it
- OutboundTcpConnectionPool(InetAddress localEp, InetAddress remoteEp)
+ OutboundTcpConnectionPool(InetAddress remoteEp)
{
- localEp_ = localEp;
remoteEp_ = remoteEp;
}
- private OutboundTcpConnection newCon()
- {
- return new OutboundTcpConnection(this, localEp_, remoteEp_);
- }
-
/**
* returns the appropriate connection based on message type.
+ * returns null if a connection could not be established.
*/
synchronized OutboundTcpConnection getConnection(Message msg)
{
@@ -51,13 +51,39 @@
|| StageManager.GOSSIP_STAGE.equals(msg.getMessageType()))
{
if (ackCon == null)
- ackCon = newCon();
+ {
+ if (System.currentTimeMillis() < lastFailedAttempt + OPEN_RETRY_DELAY)
+ return null;
+ try
+ {
+ ackCon = new OutboundTcpConnection(this, remoteEp_);
+ }
+ catch (IOException e)
+ {
+ lastFailedAttempt = System.currentTimeMillis();
+ if (logger.isDebugEnabled())
+ logger.debug("unable to connect to " + remoteEp_, e);
+ }
+ }
return ackCon;
}
else
{
if (cmdCon == null)
- cmdCon = newCon();
+ {
+ if (System.currentTimeMillis() < lastFailedAttempt + OPEN_RETRY_DELAY)
+ return null;
+ try
+ {
+ cmdCon = new OutboundTcpConnection(this, remoteEp_);
+ }
+ catch (IOException e)
+ {
+ lastFailedAttempt = System.currentTimeMillis();
+ if (logger.isDebugEnabled())
+ logger.debug("unable to connect to " + remoteEp_, e);
+ }
+ }
return cmdCon;
}
}