You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/06/08 17:35:35 UTC

svn commit: r782678 - /incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/TcpConnection.java

Author: jbellis
Date: Mon Jun  8 15:35:34 2009
New Revision: 782678

URL: http://svn.apache.org/viewvc?rev=782678&view=rev
Log:
cleanup for TcpConnection.  patch by jbellis; reviewed by Jun Rao for CASSANDRA-220

Modified:
    incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/TcpConnection.java

Modified: incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/TcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/TcpConnection.java?rev=782678&r1=782677&r2=782678&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/TcpConnection.java (original)
+++ incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/TcpConnection.java Mon Jun  8 15:35:34 2009
@@ -19,31 +19,25 @@
 package org.apache.cassandra.net;
 
 import java.io.*;
-import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.net.io.FastSerializer;
 import org.apache.cassandra.net.io.ISerializer;
 import org.apache.cassandra.net.io.ProtocolState;
 import org.apache.cassandra.net.io.StartState;
 import org.apache.cassandra.net.io.TcpReader;
-import org.apache.cassandra.net.io.TcpReader.TcpReaderState;
 import org.apache.cassandra.utils.LogUtil;
 import org.apache.log4j.Logger;
-import org.apache.cassandra.net.io.*;
-import org.apache.cassandra.net.sink.*;
 
 /**
  * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -60,7 +54,7 @@
     private boolean isIncoming_ = false;       
     private TcpReader tcpReader_;    
     private ReadWorkItem readWork_ = new ReadWorkItem(); 
-    private List<ByteBuffer> pendingWrites_ = new Vector<ByteBuffer>();  
+    private Queue<ByteBuffer> pendingWrites_ = new ConcurrentLinkedQueue<ByteBuffer>();
     private EndPoint localEp_;
     private EndPoint remoteEp_;
     boolean inUse_ = false;
@@ -195,7 +189,7 @@
         }
     }
     
-    public void stream(File file, long startPosition, long endPosition) throws IOException
+    public void stream(File file, long startPosition, long endPosition) throws IOException, InterruptedException
     {
         if ( !bStream_ )
             throw new IllegalStateException("Cannot stream since we are not set up to stream data.");
@@ -222,7 +216,7 @@
             {
                 if ( retry == 3 )
                     throw new IOException("Unable to connect to " + remoteEp_ + " after " + retry + " attempts.");
-                waitToContinueStreaming(waitTime, TimeUnit.SECONDS);
+                condition_.await(waitTime, TimeUnit.SECONDS);
                 ++retry;
             }
             
@@ -232,7 +226,12 @@
                 {
                     ByteBuffer buffer = MessagingService.constructStreamHeader(false, true);                      
                     socketChannel_.write(buffer);
-                    handleIncompleteWrite(buffer);
+                    if (buffer.remaining() > 0)
+                    {
+                        pendingWrites_.add(buffer);
+                        key_.interestOps(key_.interestOps() | SelectionKey.OP_WRITE);
+                        condition_.await();
+                    }
                 }
                 
                 /* returns the number of bytes transferred from file to the socket */
@@ -247,7 +246,7 @@
                 if ( bytesTransferred < limit && bytesWritten != total )
                 {                    
                     key_.interestOps(key_.interestOps() | SelectionKey.OP_WRITE);
-                    waitToContinueStreaming();
+                    condition_.await();
                 }
             }
         }
@@ -256,41 +255,7 @@
             lock_.unlock();
         }        
     }
-    
-    private void handleIncompleteWrite(ByteBuffer buffer)
-    {
-        if (buffer.remaining() > 0) 
-        {            
-            pendingWrites_.add(buffer);
-            key_.interestOps(key_.interestOps() | SelectionKey.OP_WRITE);
-            waitToContinueStreaming();
-        }     
-    }
-    
-    private void waitToContinueStreaming()
-    {
-        try
-        {
-            condition_.await();
-        }
-        catch ( InterruptedException ex )
-        {
-            logger_.warn( LogUtil.throwableToString(ex) );
-        }
-    }
-    
-    private void waitToContinueStreaming(long waitTime, TimeUnit tu)
-    {
-        try
-        {
-            condition_.await(waitTime, tu);
-        }
-        catch ( InterruptedException ex )
-        {
-            logger_.warn( LogUtil.throwableToString(ex) );
-        }
-    }
-    
+
     private void resumeStreaming()
     {
         /* if not in streaming mode do nothing */
@@ -397,14 +362,13 @@
             } 
             else 
             {  
-                logger_.warn("Closing connection because socket channel could not finishConnect.");;
+                logger_.error("Closing connection because socket channel could not finishConnect.");;
                 errorClose();
             }
         } 
         catch(IOException e) 
         {               
-            logger_.warn("Encountered IOException on connection: "  + socketChannel_);
-            logger_.warn( LogUtil.throwableToString(e) );
+            logger_.error("Encountered IOException on connection: "  + socketChannel_, e);
             errorClose();
         }
     }
@@ -431,19 +395,19 @@
             {                     
                 while(!pendingWrites_.isEmpty()) 
                 {
-                    ByteBuffer buffer = pendingWrites_.get(0);
+                    ByteBuffer buffer = pendingWrites_.peek();
                     socketChannel_.write(buffer);                    
                     if (buffer.remaining() > 0) 
                     {   
                         break;
                     }               
-                    pendingWrites_.remove(0);                    
+                    pendingWrites_.remove();
                 } 
             
             }
             catch(IOException ex)
             {
-                logger_.warn(LogUtil.throwableToString(ex));
+                logger_.error(LogUtil.throwableToString(ex));
                 // This is to fix the wierd Linux bug with NIO.
                 errorClose();
             }