You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/06/08 17:35:35 UTC
svn commit: r782678 -
/incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/TcpConnection.java
Author: jbellis
Date: Mon Jun 8 15:35:34 2009
New Revision: 782678
URL: http://svn.apache.org/viewvc?rev=782678&view=rev
Log:
cleanup for TcpConnection. patch by jbellis; reviewed by Jun Rao for CASSANDRA-220
Modified:
incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/TcpConnection.java
Modified: incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/TcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/TcpConnection.java?rev=782678&r1=782677&r2=782678&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/TcpConnection.java (original)
+++ incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/TcpConnection.java Mon Jun 8 15:35:34 2009
@@ -19,31 +19,25 @@
package org.apache.cassandra.net;
import java.io.*;
-import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.*;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.net.io.FastSerializer;
import org.apache.cassandra.net.io.ISerializer;
import org.apache.cassandra.net.io.ProtocolState;
import org.apache.cassandra.net.io.StartState;
import org.apache.cassandra.net.io.TcpReader;
-import org.apache.cassandra.net.io.TcpReader.TcpReaderState;
import org.apache.cassandra.utils.LogUtil;
import org.apache.log4j.Logger;
-import org.apache.cassandra.net.io.*;
-import org.apache.cassandra.net.sink.*;
/**
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -60,7 +54,7 @@
private boolean isIncoming_ = false;
private TcpReader tcpReader_;
private ReadWorkItem readWork_ = new ReadWorkItem();
- private List<ByteBuffer> pendingWrites_ = new Vector<ByteBuffer>();
+ private Queue<ByteBuffer> pendingWrites_ = new ConcurrentLinkedQueue<ByteBuffer>();
private EndPoint localEp_;
private EndPoint remoteEp_;
boolean inUse_ = false;
@@ -195,7 +189,7 @@
}
}
- public void stream(File file, long startPosition, long endPosition) throws IOException
+ public void stream(File file, long startPosition, long endPosition) throws IOException, InterruptedException
{
if ( !bStream_ )
throw new IllegalStateException("Cannot stream since we are not set up to stream data.");
@@ -222,7 +216,7 @@
{
if ( retry == 3 )
throw new IOException("Unable to connect to " + remoteEp_ + " after " + retry + " attempts.");
- waitToContinueStreaming(waitTime, TimeUnit.SECONDS);
+ condition_.await(waitTime, TimeUnit.SECONDS);
++retry;
}
@@ -232,7 +226,12 @@
{
ByteBuffer buffer = MessagingService.constructStreamHeader(false, true);
socketChannel_.write(buffer);
- handleIncompleteWrite(buffer);
+ if (buffer.remaining() > 0)
+ {
+ pendingWrites_.add(buffer);
+ key_.interestOps(key_.interestOps() | SelectionKey.OP_WRITE);
+ condition_.await();
+ }
}
/* returns the number of bytes transferred from file to the socket */
@@ -247,7 +246,7 @@
if ( bytesTransferred < limit && bytesWritten != total )
{
key_.interestOps(key_.interestOps() | SelectionKey.OP_WRITE);
- waitToContinueStreaming();
+ condition_.await();
}
}
}
@@ -256,41 +255,7 @@
lock_.unlock();
}
}
-
- private void handleIncompleteWrite(ByteBuffer buffer)
- {
- if (buffer.remaining() > 0)
- {
- pendingWrites_.add(buffer);
- key_.interestOps(key_.interestOps() | SelectionKey.OP_WRITE);
- waitToContinueStreaming();
- }
- }
-
- private void waitToContinueStreaming()
- {
- try
- {
- condition_.await();
- }
- catch ( InterruptedException ex )
- {
- logger_.warn( LogUtil.throwableToString(ex) );
- }
- }
-
- private void waitToContinueStreaming(long waitTime, TimeUnit tu)
- {
- try
- {
- condition_.await(waitTime, tu);
- }
- catch ( InterruptedException ex )
- {
- logger_.warn( LogUtil.throwableToString(ex) );
- }
- }
-
+
private void resumeStreaming()
{
/* if not in streaming mode do nothing */
@@ -397,14 +362,13 @@
}
else
{
- logger_.warn("Closing connection because socket channel could not finishConnect.");;
+ logger_.error("Closing connection because socket channel could not finishConnect.");;
errorClose();
}
}
catch(IOException e)
{
- logger_.warn("Encountered IOException on connection: " + socketChannel_);
- logger_.warn( LogUtil.throwableToString(e) );
+ logger_.error("Encountered IOException on connection: " + socketChannel_, e);
errorClose();
}
}
@@ -431,19 +395,19 @@
{
while(!pendingWrites_.isEmpty())
{
- ByteBuffer buffer = pendingWrites_.get(0);
+ ByteBuffer buffer = pendingWrites_.peek();
socketChannel_.write(buffer);
if (buffer.remaining() > 0)
{
break;
}
- pendingWrites_.remove(0);
+ pendingWrites_.remove();
}
}
catch(IOException ex)
{
- logger_.warn(LogUtil.throwableToString(ex));
+ logger_.error(LogUtil.throwableToString(ex));
// This is to fix the wierd Linux bug with NIO.
errorClose();
}