You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/07/30 17:30:27 UTC

svn commit: r799331 [21/29] - in /incubator/cassandra/trunk: ./ src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/gms/ src/...

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MultiAsyncResult.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MultiAsyncResult.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MultiAsyncResult.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MultiAsyncResult.java Thu Jul 30 15:30:21 2009
@@ -1,133 +1,133 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
-
-public class MultiAsyncResult implements IAsyncResult
-{
-    private static Logger logger_ = Logger.getLogger( AsyncResult.class );
-    private int expectedResults_;
-    private List<byte[]> result_ = new ArrayList<byte[]>();
-    private AtomicBoolean done_ = new AtomicBoolean(false);
-    private Lock lock_ = new ReentrantLock();
-    private Condition condition_;
-    
-    MultiAsyncResult(int expectedResults)
-    {
-        expectedResults_ = expectedResults;
-        condition_ = lock_.newCondition();
-    }
-    
-    public byte[] get()
-    {
-        throw new UnsupportedOperationException("This operation is not supported in the AsyncResult abstraction.");
-    }
-    
-    public byte[] get(long timeout, TimeUnit tu) throws TimeoutException
-    {
-        throw new UnsupportedOperationException("This operation is not supported in the AsyncResult abstraction.");
-    }
-    
-    public List<byte[]> multiget()
-    {
-        lock_.lock();
-        try
-        {
-            if ( !done_.get() )
-            {
-                condition_.await();                    
-            }
-        }
-        catch ( InterruptedException ex )
-        {
-            logger_.warn( LogUtil.throwableToString(ex) );
-        }
-        finally
-        {
-            lock_.unlock();            
-        }        
-        return result_;
-    }
-    
-    public boolean isDone()
-    {
-        return done_.get();
-    }
-    
-    public List<byte[]> multiget(long timeout, TimeUnit tu) throws TimeoutException
-    {
-        lock_.lock();
-        try
-        {            
-            boolean bVal = true;
-            try
-            {
-                if ( !done_.get() )
-                {                    
-                    bVal = condition_.await(timeout, tu);
-                }
-            }
-            catch ( InterruptedException ex )
-            {
-                logger_.warn( LogUtil.throwableToString(ex) );
-            }
-            
-            if ( !bVal && !done_.get() )
-            {                                           
-                throw new TimeoutException("Operation timed out.");
-            }
-        }
-        finally
-        {
-            lock_.unlock();      
-        }
-        return result_;
-    }
-    
-    public void result(Message result)
-    {        
-        try
-        {
-            lock_.lock();
-            if ( !done_.get() )
-            {
-                result_.add(result.getMessageBody());
-                if ( result_.size() == expectedResults_ )
-                {
-                    done_.set(true);
-                    condition_.signal();
-                }
-            }
-        }
-        finally
-        {
-            lock_.unlock();
-        }        
-    }    
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+public class MultiAsyncResult implements IAsyncResult
+{
+    private static Logger logger_ = Logger.getLogger( AsyncResult.class );
+    private int expectedResults_;
+    private List<byte[]> result_ = new ArrayList<byte[]>();
+    private AtomicBoolean done_ = new AtomicBoolean(false);
+    private Lock lock_ = new ReentrantLock();
+    private Condition condition_;
+    
+    MultiAsyncResult(int expectedResults)
+    {
+        expectedResults_ = expectedResults;
+        condition_ = lock_.newCondition();
+    }
+    
+    public byte[] get()
+    {
+        throw new UnsupportedOperationException("This operation is not supported in the AsyncResult abstraction.");
+    }
+    
+    public byte[] get(long timeout, TimeUnit tu) throws TimeoutException
+    {
+        throw new UnsupportedOperationException("This operation is not supported in the AsyncResult abstraction.");
+    }
+    
+    public List<byte[]> multiget()
+    {
+        lock_.lock();
+        try
+        {
+            if ( !done_.get() )
+            {
+                condition_.await();                    
+            }
+        }
+        catch ( InterruptedException ex )
+        {
+            logger_.warn( LogUtil.throwableToString(ex) );
+        }
+        finally
+        {
+            lock_.unlock();            
+        }        
+        return result_;
+    }
+    
+    public boolean isDone()
+    {
+        return done_.get();
+    }
+    
+    public List<byte[]> multiget(long timeout, TimeUnit tu) throws TimeoutException
+    {
+        lock_.lock();
+        try
+        {            
+            boolean bVal = true;
+            try
+            {
+                if ( !done_.get() )
+                {                    
+                    bVal = condition_.await(timeout, tu);
+                }
+            }
+            catch ( InterruptedException ex )
+            {
+                logger_.warn( LogUtil.throwableToString(ex) );
+            }
+            
+            if ( !bVal && !done_.get() )
+            {                                           
+                throw new TimeoutException("Operation timed out.");
+            }
+        }
+        finally
+        {
+            lock_.unlock();      
+        }
+        return result_;
+    }
+    
+    public void result(Message result)
+    {        
+        try
+        {
+            lock_.lock();
+            if ( !done_.get() )
+            {
+                result_.add(result.getMessageBody());
+                if ( result_.size() == expectedResults_ )
+                {
+                    done_.set(true);
+                    condition_.signal();
+                }
+            }
+        }
+        finally
+        {
+            lock_.unlock();
+        }        
+    }    
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ProtocolHeader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ProtocolHeader.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ProtocolHeader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ProtocolHeader.java Thu Jul 30 15:30:21 2009
@@ -1,36 +1,36 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public final class ProtocolHeader
-{
-    public static final String SERIALIZER = "SERIALIZER";
-    public static final String COMPRESSION = "COMPRESSION";
-    public static final String VERSION = "VERSION";
-    
-    public int serializerType_;
-    public boolean isCompressed_;
-    public boolean isStreamingMode_;
-    public boolean isListening_;
-    public int version_;
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public final class ProtocolHeader
+{
+    public static final String SERIALIZER = "SERIALIZER";
+    public static final String COMPRESSION = "COMPRESSION";
+    public static final String VERSION = "VERSION";
+    
+    public int serializerType_;
+    public boolean isCompressed_;
+    public boolean isStreamingMode_;
+    public boolean isListening_;
+    public int version_;
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java Thu Jul 30 15:30:21 2009
@@ -1,52 +1,52 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net;
-
-import org.apache.log4j.Logger;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-class ResponseVerbHandler implements IVerbHandler
-{
-    private static final Logger logger_ = Logger.getLogger( ResponseVerbHandler.class );
-    
-    public void doVerb(Message message)
-    {     
-        String messageId = message.getMessageId();        
-        IAsyncCallback cb = MessagingService.getRegisteredCallback(messageId);
-        if ( cb != null )
-        {
-            if (logger_.isDebugEnabled())
-              logger_.debug("Processing response on a callback from " + message.getMessageId() + "@" + message.getFrom());
-            cb.response(message);
-        }
-        else
-        {            
-            IAsyncResult ar = MessagingService.getAsyncResult(messageId);
-            if ( ar != null )
-            {
-                if (logger_.isDebugEnabled())
-                  logger_.debug("Processing response on an async result from " + message.getMessageId() + "@" + message.getFrom());
-                ar.result(message);
-            }
-        }
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class ResponseVerbHandler implements IVerbHandler
+{
+    private static final Logger logger_ = Logger.getLogger( ResponseVerbHandler.class );
+    
+    public void doVerb(Message message)
+    {     
+        String messageId = message.getMessageId();        
+        IAsyncCallback cb = MessagingService.getRegisteredCallback(messageId);
+        if ( cb != null )
+        {
+            if (logger_.isDebugEnabled())
+              logger_.debug("Processing response on a callback from " + message.getMessageId() + "@" + message.getFrom());
+            cb.response(message);
+        }
+        else
+        {            
+            IAsyncResult ar = MessagingService.getAsyncResult(messageId);
+            if ( ar != null )
+            {
+                if (logger_.isDebugEnabled())
+                  logger_.debug("Processing response on an async result from " + message.getMessageId() + "@" + message.getFrom());
+                ar.result(message);
+            }
+        }
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/SelectionKeyHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/SelectionKeyHandler.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/SelectionKeyHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/SelectionKeyHandler.java Thu Jul 30 15:30:21 2009
@@ -1,84 +1,84 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net;
-
-import java.nio.channels.*;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class SelectionKeyHandler 
-{
-    /**
-     * Method which is called when the key becomes acceptable.
-     *
-     * @param key The key which is acceptable.
-     */
-    public void accept(SelectionKey key)
-    {
-         throw new UnsupportedOperationException("accept() cannot be called on " + getClass().getName() + "!");
-    }
-    
-    /**
-     * Method which is called when the key becomes connectable.
-     *
-     * @param key The key which is connectable.
-     */
-    public void connect(SelectionKey key)
-    {
-        throw new UnsupportedOperationException("connect() cannot be called on " + getClass().getName() + "!");
-    }
-    
-    /**
-     * Method which is called when the key becomes readable.
-     *
-     * @param key The key which is readable.
-     */
-    public void read(SelectionKey key)
-    {
-        throw new UnsupportedOperationException("read() cannot be called on " + getClass().getName() + "!");
-    }
-    
-    /**
-     * Method which is called when the key becomes writable.
-     *
-     * @param key The key which is writable.
-     */
-    public void write(SelectionKey key)
-    {
-        throw new UnsupportedOperationException("write() cannot be called on " + getClass().getName() + "!");
-    }
-    
-    protected static void turnOnInterestOps(SelectionKey key, int ops)
-    {
-        synchronized(key)
-        {
-            key.interestOps(key.interestOps() | ops);
-        }
-    }
-    
-    protected static void turnOffInterestOps(SelectionKey key, int ops)
-    {
-        synchronized(key)
-        {
-            key.interestOps(key.interestOps() & (~ops) );
-        }
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.nio.channels.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class SelectionKeyHandler 
+{
+    /**
+     * Method which is called when the key becomes acceptable.
+     *
+     * @param key The key which is acceptable.
+     */
+    public void accept(SelectionKey key)
+    {
+         throw new UnsupportedOperationException("accept() cannot be called on " + getClass().getName() + "!");
+    }
+    
+    /**
+     * Method which is called when the key becomes connectable.
+     *
+     * @param key The key which is connectable.
+     */
+    public void connect(SelectionKey key)
+    {
+        throw new UnsupportedOperationException("connect() cannot be called on " + getClass().getName() + "!");
+    }
+    
+    /**
+     * Method which is called when the key becomes readable.
+     *
+     * @param key The key which is readable.
+     */
+    public void read(SelectionKey key)
+    {
+        throw new UnsupportedOperationException("read() cannot be called on " + getClass().getName() + "!");
+    }
+    
+    /**
+     * Method which is called when the key becomes writable.
+     *
+     * @param key The key which is writable.
+     */
+    public void write(SelectionKey key)
+    {
+        throw new UnsupportedOperationException("write() cannot be called on " + getClass().getName() + "!");
+    }
+    
+    protected static void turnOnInterestOps(SelectionKey key, int ops)
+    {
+        synchronized(key)
+        {
+            key.interestOps(key.interestOps() | ops);
+        }
+    }
+    
+    protected static void turnOffInterestOps(SelectionKey key, int ops)
+    {
+        synchronized(key)
+        {
+            key.interestOps(key.interestOps() & (~ops) );
+        }
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/SelectorManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/SelectorManager.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/SelectorManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/SelectorManager.java Thu Jul 30 15:30:21 2009
@@ -1,178 +1,178 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net;
-
-import java.io.IOException;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-
-import org.apache.log4j.Logger;
-
-public class SelectorManager extends Thread
-{
-    private static final Logger logger = Logger.getLogger(SelectorManager.class); 
-
-    // the underlying selector used
-    protected Selector selector;
-
-    // workaround JDK select/register bug
-    Object gate = new Object();
-
-    // The static selector manager which is used by all applications
-    private static SelectorManager manager;
-    
-    // The static UDP selector manager which is used by all applications
-    private static SelectorManager udpManager;
-
-    private SelectorManager(String name)
-    {
-        super(name);
-
-        try
-        {
-            selector = Selector.open();
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-
-        setDaemon(false);
-    }
-
-    /**
-     * Registers a new channel with the selector, and attaches the given
-     * SelectionKeyHandler as the handler for the newly created key. Operations
-     * which the handler is interested in will be called as available.
-     * 
-     * @param channel
-     *            The channel to register with the selector
-     * @param handler
-     *            The handler to use for the callbacks
-     * @param ops
-     *            The initial interest operations
-     * @return The SelectionKey which uniquely identifies this channel
-     * @exception IOException if the channel is closed
-     */
-    public SelectionKey register(SelectableChannel channel,
-            SelectionKeyHandler handler, int ops) throws IOException
-    {
-        assert channel != null;
-        assert handler != null;
-
-        synchronized(gate)
-        {
-            selector.wakeup();
-            return channel.register(selector, ops, handler);
-        }
-    }      
-
-    /**
-     * This method starts the socket manager listening for events. It is
-     * designed to be started when this thread's start() method is invoked.
-     */
-    public void run()
-    {
-        while (true)
-        {
-            try
-            {
-                selector.select(1);
-                doProcess();
-                synchronized(gate) {}
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException(e);
-            }
-        }
-    }
-
-    protected void doProcess() throws IOException
-    {
-        SelectionKey[] keys = selector.selectedKeys().toArray(new SelectionKey[0]);
-
-        for (SelectionKey key : keys)
-        {
-            selector.selectedKeys().remove(key);
-
-            synchronized (key)
-            {
-                SelectionKeyHandler skh = (SelectionKeyHandler) key.attachment();
-
-                if (skh != null)
-                {
-                    // accept
-                    if (key.isValid() && key.isAcceptable())
-                    {
-                        skh.accept(key);
-                    }
-
-                    // connect
-                    if (key.isValid() && key.isConnectable())
-                    {
-                        skh.connect(key);
-                    }
-
-                    // read
-                    if (key.isValid() && key.isReadable())
-                    {
-                        skh.read(key);
-                    }
-
-                    // write
-                    if (key.isValid() && key.isWritable())
-                    {
-                        skh.write(key);
-                    }
-                }
-            }
-        }
-    }
-
-    /**
-     * Returns the SelectorManager applications should use.
-     * 
-     * @return The SelectorManager which applications should use
-     */
-    public static SelectorManager getSelectorManager()
-    {
-        synchronized (SelectorManager.class)
-        {
-            if (manager == null)
-            {
-                manager = new SelectorManager("TCP Selector Manager");
-            }            
-        }
-        return manager;
-    }
-    
-    public static SelectorManager getUdpSelectorManager()
-    {
-        synchronized (SelectorManager.class)
-        {
-            if (udpManager == null)
-            {
-                udpManager = new SelectorManager("UDP Selector Manager");
-            }            
-        }
-        return udpManager;
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+
+import org.apache.log4j.Logger;
+
+public class SelectorManager extends Thread
+{
+    private static final Logger logger = Logger.getLogger(SelectorManager.class); 
+
+    // the underlying selector used
+    protected Selector selector;
+
+    // workaround JDK select/register bug
+    Object gate = new Object();
+
+    // The static selector manager which is used by all applications
+    private static SelectorManager manager;
+    
+    // The static UDP selector manager which is used by all applications
+    private static SelectorManager udpManager;
+
+    private SelectorManager(String name)
+    {
+        super(name);
+
+        try
+        {
+            selector = Selector.open();
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+
+        setDaemon(false);
+    }
+
+    /**
+     * Registers a new channel with the selector, and attaches the given
+     * SelectionKeyHandler as the handler for the newly created key. Operations
+     * which the handler is interested in will be called as available.
+     * 
+     * @param channel
+     *            The channel to register with the selector
+     * @param handler
+     *            The handler to use for the callbacks
+     * @param ops
+     *            The initial interest operations
+     * @return The SelectionKey which uniquely identifies this channel
+     * @exception IOException if the channel is closed
+     */
+    public SelectionKey register(SelectableChannel channel,
+            SelectionKeyHandler handler, int ops) throws IOException
+    {
+        assert channel != null;
+        assert handler != null;
+
+        synchronized(gate)
+        {
+            selector.wakeup();
+            return channel.register(selector, ops, handler);
+        }
+    }      
+
+    /**
+     * This method starts the socket manager listening for events. It is
+     * designed to be started when this thread's start() method is invoked.
+     */
+    public void run()
+    {
+        while (true)
+        {
+            try
+            {
+                selector.select(1);
+                doProcess();
+                synchronized(gate) {}
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    protected void doProcess() throws IOException
+    {
+        SelectionKey[] keys = selector.selectedKeys().toArray(new SelectionKey[0]);
+
+        for (SelectionKey key : keys)
+        {
+            selector.selectedKeys().remove(key);
+
+            synchronized (key)
+            {
+                SelectionKeyHandler skh = (SelectionKeyHandler) key.attachment();
+
+                if (skh != null)
+                {
+                    // accept
+                    if (key.isValid() && key.isAcceptable())
+                    {
+                        skh.accept(key);
+                    }
+
+                    // connect
+                    if (key.isValid() && key.isConnectable())
+                    {
+                        skh.connect(key);
+                    }
+
+                    // read
+                    if (key.isValid() && key.isReadable())
+                    {
+                        skh.read(key);
+                    }
+
+                    // write
+                    if (key.isValid() && key.isWritable())
+                    {
+                        skh.write(key);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Returns the SelectorManager applications should use.
+     * 
+     * @return The SelectorManager which applications should use
+     */
+    public static SelectorManager getSelectorManager()
+    {
+        synchronized (SelectorManager.class)
+        {
+            if (manager == null)
+            {
+                manager = new SelectorManager("TCP Selector Manager");
+            }            
+        }
+        return manager;
+    }
+    
+    public static SelectorManager getUdpSelectorManager()
+    {
+        synchronized (SelectorManager.class)
+        {
+            if (udpManager == null)
+            {
+                udpManager = new SelectorManager("UDP Selector Manager");
+            }            
+        }
+        return udpManager;
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java Thu Jul 30 15:30:21 2009
@@ -1,520 +1,520 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net;
-
-import java.io.*;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.net.io.FastSerializer;
-import org.apache.cassandra.net.io.ISerializer;
-import org.apache.cassandra.net.io.ProtocolState;
-import org.apache.cassandra.net.io.StartState;
-import org.apache.cassandra.net.io.TcpReader;
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class TcpConnection extends SelectionKeyHandler implements Comparable
-{
-    // logging and profiling.
-    private static Logger logger_ = Logger.getLogger(TcpConnection.class);  
-    private static ISerializer serializer_ = new FastSerializer();
-    private SocketChannel socketChannel_;
-    private SelectionKey key_;
-    private TcpConnectionManager pool_;
-    private boolean isIncoming_ = false;       
-    private TcpReader tcpReader_;    
-    private ReadWorkItem readWork_ = new ReadWorkItem(); 
-    private Queue<ByteBuffer> pendingWrites_ = new ConcurrentLinkedQueue<ByteBuffer>();
-    private EndPoint localEp_;
-    private EndPoint remoteEp_;
-    boolean inUse_ = false;
-         
-    /* 
-     * Added for streaming support. We need the boolean
-     * to indicate that this connection is used for 
-     * streaming. The Condition and the Lock are used
-     * to signal the stream() that it can continue
-     * streaming when the socket becomes writable.
-    */
-    private boolean bStream_ = false;
-    private Lock lock_;
-    private Condition condition_;
-    
-    // used from getConnection - outgoing
-    TcpConnection(TcpConnectionManager pool, EndPoint from, EndPoint to) throws IOException
-    {          
-        socketChannel_ = SocketChannel.open();            
-        socketChannel_.configureBlocking(false);        
-        pool_ = pool;
-        
-        localEp_ = from;
-        remoteEp_ = to;
-        
-        if ( !socketChannel_.connect( remoteEp_.getInetAddress() ) )
-        {
-            key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_CONNECT);
-        }
-        else
-        {
-            key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
-        }
-    }
-    
-    /*
-     * Used for streaming purposes has no pooling semantics.
-    */
-    TcpConnection(EndPoint from, EndPoint to) throws IOException
-    {
-        socketChannel_ = SocketChannel.open();               
-        socketChannel_.configureBlocking(false);       
-        
-        localEp_ = from;
-        remoteEp_ = to;
-        
-        if ( !socketChannel_.connect( remoteEp_.getInetAddress() ) )
-        {
-            key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_CONNECT);
-        }
-        else
-        {
-            key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
-        }
-        bStream_ = true;
-        lock_ = new ReentrantLock();
-        condition_ = lock_.newCondition();
-    }
-    
-    /*
-     * This method is invoked by the TcpConnectionHandler to accept incoming TCP connections.
-     * Accept the connection and then register interest for reads.
-    */
-    static void acceptConnection(SocketChannel socketChannel, EndPoint localEp, boolean isIncoming) throws IOException
-    {
-        TcpConnection tcpConnection = new TcpConnection(socketChannel, localEp, true);
-        tcpConnection.registerReadInterest();
-    }
-    
-    private void registerReadInterest() throws IOException
-    {
-        key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
-    }
-    
-    // used for incoming connections
-    TcpConnection(SocketChannel socketChannel, EndPoint localEp, boolean isIncoming) throws IOException
-    {       
-        socketChannel_ = socketChannel;
-        socketChannel_.configureBlocking(false);                           
-        isIncoming_ = isIncoming;
-        localEp_ = localEp;
-    }
-    
-    EndPoint getLocalEp()
-    {
-        return localEp_;
-    }
-    
-    public void setLocalEp(EndPoint localEp)
-    {
-        localEp_ = localEp;
-    }
-
-    public EndPoint getEndPoint() 
-    {
-        return remoteEp_;
-    }
-    
-    public boolean isIncoming()
-    {
-        return isIncoming_;
-    }    
-    
-    public SocketChannel getSocketChannel()
-    {
-        return socketChannel_;
-    }    
-    
-    public void write(Message message) throws IOException
-    {           
-        byte[] data = serializer_.serialize(message);        
-        if ( data.length > 0 )
-        {    
-            boolean listening = !message.getFrom().equals(EndPoint.sentinelLocalEndPoint_);
-            ByteBuffer buffer = MessagingService.packIt( data , false, false, listening);   
-            synchronized(this)
-            {
-                if (!pendingWrites_.isEmpty() || !socketChannel_.isConnected())
-                {                     
-                    pendingWrites_.add(buffer);                
-                    return;
-                }
-                
-                socketChannel_.write(buffer);                
-                
-                if (buffer.remaining() > 0) 
-                {                   
-                    pendingWrites_.add(buffer);
-                    turnOnInterestOps(key_, SelectionKey.OP_WRITE);
-                }
-            }
-        }
-    }
-    
-    public void stream(File file, long startPosition, long endPosition) throws IOException, InterruptedException
-    {
-        if ( !bStream_ )
-            throw new IllegalStateException("Cannot stream since we are not set up to stream data.");
-                
-        lock_.lock();        
-        try
-        {            
-            /* transfer 64MB in each attempt */
-            int limit = 64*1024*1024;  
-            long total = endPosition - startPosition;
-            /* keeps track of total number of bytes transferred */
-            long bytesWritten = 0L;                          
-            RandomAccessFile raf = new RandomAccessFile(file, "r");            
-            FileChannel fc = raf.getChannel();            
-            
-            /* 
-             * If the connection is not yet established then wait for
-             * the timeout period of 2 seconds. Attempt to reconnect 3 times and then 
-             * bail with an IOException.
-            */
-            long waitTime = 2;
-            int retry = 0;
-            while (!socketChannel_.isConnected())
-            {
-                if ( retry == 3 )
-                    throw new IOException("Unable to connect to " + remoteEp_ + " after " + retry + " attempts.");
-                condition_.await(waitTime, TimeUnit.SECONDS);
-                ++retry;
-            }
-            
-            while ( bytesWritten < total )
-            {                                
-                if ( startPosition == 0 )
-                {
-                    ByteBuffer buffer = MessagingService.constructStreamHeader(false, true);                      
-                    socketChannel_.write(buffer);
-                    if (buffer.remaining() > 0)
-                    {
-                        pendingWrites_.add(buffer);
-                        turnOnInterestOps(key_, SelectionKey.OP_WRITE);
-                        condition_.await();
-                    }
-                }
-                
-                /* returns the number of bytes transferred from file to the socket */
-                long bytesTransferred = fc.transferTo(startPosition, limit, socketChannel_);
-                if (logger_.isDebugEnabled())
-                    logger_.debug("Bytes transferred " + bytesTransferred);                
-                bytesWritten += bytesTransferred;
-                startPosition += bytesTransferred; 
-                /*
-                 * If the number of bytes transferred is less than intended 
-                 * then we need to wait till socket becomes writeable again. 
-                */
-                if ( bytesTransferred < limit && bytesWritten != total )
-                {                    
-                    turnOnInterestOps(key_, SelectionKey.OP_WRITE);
-                    condition_.await();
-                }
-            }
-        }
-        finally
-        {
-            lock_.unlock();
-        }        
-    }
-
-    private void resumeStreaming()
-    {
-        /* if not in streaming mode do nothing */
-        if ( !bStream_ )
-            return;
-        
-        lock_.lock();
-        try
-        {
-            condition_.signal();
-        }
-        finally
-        {
-            lock_.unlock();
-        }
-    }
-    
-    public void close()
-    {
-        inUse_ = false;
-        if ( pool_.contains(this) )
-            pool_.decUsed();               
-    }
-
-    public boolean isConnected()
-    {
-        return socketChannel_.isConnected();
-    }
-    
-    public boolean equals(Object o)
-    {
-        if ( !(o instanceof TcpConnection) )
-            return false;
-        
-        TcpConnection rhs = (TcpConnection)o;        
-        if ( localEp_.equals(rhs.localEp_) && remoteEp_.equals(rhs.remoteEp_) )
-            return true;
-        else
-            return false;
-    }
-    
-    public int hashCode()
-    {
-        return (localEp_ + ":" + remoteEp_).hashCode();
-    }
-
-    public String toString()
-    {        
-        return socketChannel_.toString();
-    }
-    
-    void closeSocket()
-    {
-        logger_.warn("Closing down connection " + socketChannel_ + " with " + pendingWrites_.size() + " writes remaining.");            
-        if ( pool_ != null )
-        {
-            pool_.removeConnection(this);
-        }
-        cancel(key_);
-        pendingWrites_.clear();
-    }
-    
-    void errorClose() 
-    {        
-        logger_.warn("Closing down connection " + socketChannel_);
-        pendingWrites_.clear();
-        cancel(key_);
-        pendingWrites_.clear();        
-        if ( pool_ != null )
-        {
-            pool_.removeConnection(this);            
-        }
-    }
-    
-    private void cancel(SelectionKey key)
-    {
-        if ( key != null )
-        {
-            key.cancel();
-            try
-            {
-                key.channel().close();
-            }
-            catch (IOException e) {}
-        }
-    }
-    
-    // called in the selector thread
-    public void connect(SelectionKey key)
-    {       
-        turnOffInterestOps(key, SelectionKey.OP_CONNECT);
-        try
-        {
-            if (socketChannel_.finishConnect())
-            {
-                turnOnInterestOps(key, SelectionKey.OP_READ);
-                
-                synchronized(this)
-                {
-                    // this will flush the pending                
-                    if (!pendingWrites_.isEmpty()) 
-                    {
-                        turnOnInterestOps(key_, SelectionKey.OP_WRITE);
-                    }
-                }
-                resumeStreaming();
-            } 
-            else 
-            {  
-                logger_.error("Closing connection because socket channel could not finishConnect.");;
-                errorClose();
-            }
-        } 
-        catch(IOException e) 
-        {               
-            logger_.error("Encountered IOException on connection: "  + socketChannel_, e);
-            errorClose();
-        }
-    }
-    
-    // called in the selector thread
-    public void write(SelectionKey key)
-    {   
-        turnOffInterestOps(key, SelectionKey.OP_WRITE);                
-        doPendingWrites();
-        /*
-         * This is executed only if we are in streaming mode.
-         * Idea is that we read a chunk of data from a source
-         * and wait to read the next from the source until we 
-         * are siganlled to do so from here. 
-        */
-         resumeStreaming();        
-    }
-    
-    void doPendingWrites()
-    {
-        synchronized(this)
-        {
-            try
-            {                     
-                while(!pendingWrites_.isEmpty()) 
-                {
-                    ByteBuffer buffer = pendingWrites_.peek();
-                    socketChannel_.write(buffer);                    
-                    if (buffer.remaining() > 0) 
-                    {   
-                        break;
-                    }               
-                    pendingWrites_.remove();
-                } 
-            
-            }
-            catch(IOException ex)
-            {
-                logger_.error(LogUtil.throwableToString(ex));
-                // This is to fix the wierd Linux bug with NIO.
-                errorClose();
-            }
-            finally
-            {    
-                if (!pendingWrites_.isEmpty())
-                {                    
-                    turnOnInterestOps(key_, SelectionKey.OP_WRITE);
-                }
-            }
-        }
-    }
-    
-    // called in the selector thread
-    public void read(SelectionKey key)
-    {
-        turnOffInterestOps(key, SelectionKey.OP_READ);
-        // publish this event onto to the TCPReadEvent Queue.
-        MessagingService.getReadExecutor().execute(readWork_);
-    }
-    
-    class ReadWorkItem implements Runnable
-    {                 
-        // called from the TCP READ thread pool
-        public void run()
-        {                         
-            if ( tcpReader_ == null )
-            {
-                tcpReader_ = new TcpReader(TcpConnection.this);    
-                StartState nextState = tcpReader_.getSocketState(TcpReader.TcpReaderState.PREAMBLE);
-                if ( nextState == null )
-                {
-                    nextState = new ProtocolState(tcpReader_);
-                    tcpReader_.putSocketState(TcpReader.TcpReaderState.PREAMBLE, nextState);
-                }
-                tcpReader_.morphState(nextState);
-            }
-            
-            try
-            {           
-                byte[] bytes = new byte[0];
-                while ( (bytes = tcpReader_.read()).length > 0 )
-                {                       
-                    ProtocolHeader pH = tcpReader_.getProtocolHeader();                    
-                    if ( !pH.isStreamingMode_ )
-                    {
-                        /* first message received */
-                        if (remoteEp_ == null)
-                        {             
-                            int port = ( pH.isListening_ ) ? DatabaseDescriptor.getStoragePort() : EndPoint.sentinelPort_;
-                            remoteEp_ = new EndPoint( socketChannel_.socket().getInetAddress().getHostAddress(), port );                            
-                            // put connection into pool if possible
-                            pool_ = MessagingService.getConnectionPool(localEp_, remoteEp_);                            
-                            pool_.addToPool(TcpConnection.this);                            
-                        }
-                        
-                        /* Deserialize and handle the message */
-                        MessagingService.getDeserilizationExecutor().submit( new MessageDeserializationTask(pH.serializerType_, bytes) );                                                  
-                        tcpReader_.resetState();
-                    }
-                    else
-                    {
-                        MessagingService.setStreamingMode(false);
-                        /* Close this socket connection  used for streaming */
-                        closeSocket();
-                    }                    
-                }
-            }
-            catch ( IOException ex )
-            {                   
-                handleException(ex);
-            }
-            catch ( Throwable th )
-            {
-                handleException(th);
-            }
-            finally
-            {
-                turnOnInterestOps(key_, SelectionKey.OP_READ);
-            }
-        }
-        
-        private void handleException(Throwable th)
-        {
-            logger_.warn("Problem reading from socket connected to : " + socketChannel_);
-            logger_.warn(LogUtil.throwableToString(th));
-            // This is to fix the weird Linux bug with NIO.
-            errorClose();
-        }
-    }
-    
-    public int pending()
-    {
-        return pendingWrites_.size();
-    }
-    
-    public int compareTo(Object o)
-    {
-        if (o instanceof TcpConnection) 
-        {
-            return pendingWrites_.size() - ((TcpConnection) o).pendingWrites_.size();            
-        }
-                    
-        throw new IllegalArgumentException();
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.io.FastSerializer;
+import org.apache.cassandra.net.io.ISerializer;
+import org.apache.cassandra.net.io.ProtocolState;
+import org.apache.cassandra.net.io.StartState;
+import org.apache.cassandra.net.io.TcpReader;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class TcpConnection extends SelectionKeyHandler implements Comparable
+{
+    // logging and profiling.
+    private static Logger logger_ = Logger.getLogger(TcpConnection.class);  
+    private static ISerializer serializer_ = new FastSerializer();
+    private SocketChannel socketChannel_;
+    private SelectionKey key_;
+    private TcpConnectionManager pool_;
+    private boolean isIncoming_ = false;       
+    private TcpReader tcpReader_;    
+    private ReadWorkItem readWork_ = new ReadWorkItem(); 
+    private Queue<ByteBuffer> pendingWrites_ = new ConcurrentLinkedQueue<ByteBuffer>();
+    private EndPoint localEp_;
+    private EndPoint remoteEp_;
+    boolean inUse_ = false;
+         
+    /* 
+     * Added for streaming support. We need the boolean
+     * to indicate that this connection is used for 
+     * streaming. The Condition and the Lock are used
+     * to signal the stream() that it can continue
+     * streaming when the socket becomes writable.
+    */
+    private boolean bStream_ = false;
+    private Lock lock_;
+    private Condition condition_;
+    
+    // used from getConnection - outgoing
+    TcpConnection(TcpConnectionManager pool, EndPoint from, EndPoint to) throws IOException
+    {          
+        socketChannel_ = SocketChannel.open();            
+        socketChannel_.configureBlocking(false);        
+        pool_ = pool;
+        
+        localEp_ = from;
+        remoteEp_ = to;
+        
+        if ( !socketChannel_.connect( remoteEp_.getInetAddress() ) )
+        {
+            key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_CONNECT);
+        }
+        else
+        {
+            key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
+        }
+    }
+    
+    /*
+     * Used for streaming purposes has no pooling semantics.
+    */
+    TcpConnection(EndPoint from, EndPoint to) throws IOException
+    {
+        socketChannel_ = SocketChannel.open();               
+        socketChannel_.configureBlocking(false);       
+        
+        localEp_ = from;
+        remoteEp_ = to;
+        
+        if ( !socketChannel_.connect( remoteEp_.getInetAddress() ) )
+        {
+            key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_CONNECT);
+        }
+        else
+        {
+            key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
+        }
+        bStream_ = true;
+        lock_ = new ReentrantLock();
+        condition_ = lock_.newCondition();
+    }
+    
+    /*
+     * This method is invoked by the TcpConnectionHandler to accept incoming TCP connections.
+     * Accept the connection and then register interest for reads.
+    */
+    static void acceptConnection(SocketChannel socketChannel, EndPoint localEp, boolean isIncoming) throws IOException
+    {
+        TcpConnection tcpConnection = new TcpConnection(socketChannel, localEp, true);
+        tcpConnection.registerReadInterest();
+    }
+    
+    private void registerReadInterest() throws IOException
+    {
+        key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
+    }
+    
+    // used for incoming connections
+    TcpConnection(SocketChannel socketChannel, EndPoint localEp, boolean isIncoming) throws IOException
+    {       
+        socketChannel_ = socketChannel;
+        socketChannel_.configureBlocking(false);                           
+        isIncoming_ = isIncoming;
+        localEp_ = localEp;
+    }
+    
+    EndPoint getLocalEp()
+    {
+        return localEp_;
+    }
+    
+    public void setLocalEp(EndPoint localEp)
+    {
+        localEp_ = localEp;
+    }
+
+    public EndPoint getEndPoint() 
+    {
+        return remoteEp_;
+    }
+    
+    public boolean isIncoming()
+    {
+        return isIncoming_;
+    }    
+    
+    public SocketChannel getSocketChannel()
+    {
+        return socketChannel_;
+    }    
+    
+    public void write(Message message) throws IOException
+    {           
+        byte[] data = serializer_.serialize(message);        
+        if ( data.length > 0 )
+        {    
+            boolean listening = !message.getFrom().equals(EndPoint.sentinelLocalEndPoint_);
+            ByteBuffer buffer = MessagingService.packIt( data , false, false, listening);   
+            synchronized(this)
+            {
+                if (!pendingWrites_.isEmpty() || !socketChannel_.isConnected())
+                {                     
+                    pendingWrites_.add(buffer);                
+                    return;
+                }
+                
+                socketChannel_.write(buffer);                
+                
+                if (buffer.remaining() > 0) 
+                {                   
+                    pendingWrites_.add(buffer);
+                    turnOnInterestOps(key_, SelectionKey.OP_WRITE);
+                }
+            }
+        }
+    }
+    
+    public void stream(File file, long startPosition, long endPosition) throws IOException, InterruptedException
+    {
+        if ( !bStream_ )
+            throw new IllegalStateException("Cannot stream since we are not set up to stream data.");
+                
+        lock_.lock();        
+        try
+        {            
+            /* transfer 64MB in each attempt */
+            int limit = 64*1024*1024;  
+            long total = endPosition - startPosition;
+            /* keeps track of total number of bytes transferred */
+            long bytesWritten = 0L;                          
+            RandomAccessFile raf = new RandomAccessFile(file, "r");            
+            FileChannel fc = raf.getChannel();            
+            
+            /* 
+             * If the connection is not yet established then wait for
+             * the timeout period of 2 seconds. Attempt to reconnect 3 times and then 
+             * bail with an IOException.
+            */
+            long waitTime = 2;
+            int retry = 0;
+            while (!socketChannel_.isConnected())
+            {
+                if ( retry == 3 )
+                    throw new IOException("Unable to connect to " + remoteEp_ + " after " + retry + " attempts.");
+                condition_.await(waitTime, TimeUnit.SECONDS);
+                ++retry;
+            }
+            
+            while ( bytesWritten < total )
+            {                                
+                if ( startPosition == 0 )
+                {
+                    ByteBuffer buffer = MessagingService.constructStreamHeader(false, true);                      
+                    socketChannel_.write(buffer);
+                    if (buffer.remaining() > 0)
+                    {
+                        pendingWrites_.add(buffer);
+                        turnOnInterestOps(key_, SelectionKey.OP_WRITE);
+                        condition_.await();
+                    }
+                }
+                
+                /* returns the number of bytes transferred from file to the socket */
+                long bytesTransferred = fc.transferTo(startPosition, limit, socketChannel_);
+                if (logger_.isDebugEnabled())
+                    logger_.debug("Bytes transferred " + bytesTransferred);                
+                bytesWritten += bytesTransferred;
+                startPosition += bytesTransferred; 
+                /*
+                 * If the number of bytes transferred is less than intended 
+                 * then we need to wait till socket becomes writeable again. 
+                */
+                if ( bytesTransferred < limit && bytesWritten != total )
+                {                    
+                    turnOnInterestOps(key_, SelectionKey.OP_WRITE);
+                    condition_.await();
+                }
+            }
+        }
+        finally
+        {
+            lock_.unlock();
+        }        
+    }
+
+    private void resumeStreaming()
+    {
+        /* if not in streaming mode do nothing */
+        if ( !bStream_ )
+            return;
+        
+        lock_.lock();
+        try
+        {
+            condition_.signal();
+        }
+        finally
+        {
+            lock_.unlock();
+        }
+    }
+    
+    public void close()
+    {
+        inUse_ = false;
+        if ( pool_.contains(this) )
+            pool_.decUsed();               
+    }
+
+    public boolean isConnected()
+    {
+        return socketChannel_.isConnected();
+    }
+    
+    public boolean equals(Object o)
+    {
+        if ( !(o instanceof TcpConnection) )
+            return false;
+        
+        TcpConnection rhs = (TcpConnection)o;        
+        if ( localEp_.equals(rhs.localEp_) && remoteEp_.equals(rhs.remoteEp_) )
+            return true;
+        else
+            return false;
+    }
+    
+    public int hashCode()
+    {
+        return (localEp_ + ":" + remoteEp_).hashCode();
+    }
+
+    public String toString()
+    {        
+        return socketChannel_.toString();
+    }
+    
+    void closeSocket()
+    {
+        logger_.warn("Closing down connection " + socketChannel_ + " with " + pendingWrites_.size() + " writes remaining.");            
+        if ( pool_ != null )
+        {
+            pool_.removeConnection(this);
+        }
+        cancel(key_);
+        pendingWrites_.clear();
+    }
+    
+    void errorClose() 
+    {        
+        logger_.warn("Closing down connection " + socketChannel_);
+        pendingWrites_.clear();
+        cancel(key_);
+        pendingWrites_.clear();        
+        if ( pool_ != null )
+        {
+            pool_.removeConnection(this);            
+        }
+    }
+    
+    private void cancel(SelectionKey key)
+    {
+        if ( key != null )
+        {
+            key.cancel();
+            try
+            {
+                key.channel().close();
+            }
+            catch (IOException e) {}
+        }
+    }
+    
+    // called in the selector thread
+    public void connect(SelectionKey key)
+    {       
+        turnOffInterestOps(key, SelectionKey.OP_CONNECT);
+        try
+        {
+            if (socketChannel_.finishConnect())
+            {
+                turnOnInterestOps(key, SelectionKey.OP_READ);
+                
+                synchronized(this)
+                {
+                    // this will flush the pending                
+                    if (!pendingWrites_.isEmpty()) 
+                    {
+                        turnOnInterestOps(key_, SelectionKey.OP_WRITE);
+                    }
+                }
+                resumeStreaming();
+            } 
+            else 
+            {  
+                logger_.error("Closing connection because socket channel could not finishConnect.");;
+                errorClose();
+            }
+        } 
+        catch(IOException e) 
+        {               
+            logger_.error("Encountered IOException on connection: "  + socketChannel_, e);
+            errorClose();
+        }
+    }
+    
+    // called in the selector thread
+    public void write(SelectionKey key)
+    {   
+        turnOffInterestOps(key, SelectionKey.OP_WRITE);                
+        doPendingWrites();
+        /*
+         * This is executed only if we are in streaming mode.
+         * Idea is that we read a chunk of data from a source
+         * and wait to read the next from the source until we 
+         * are siganlled to do so from here. 
+        */
+         resumeStreaming();        
+    }
+    
+    void doPendingWrites()
+    {
+        synchronized(this)
+        {
+            try
+            {                     
+                while(!pendingWrites_.isEmpty()) 
+                {
+                    ByteBuffer buffer = pendingWrites_.peek();
+                    socketChannel_.write(buffer);                    
+                    if (buffer.remaining() > 0) 
+                    {   
+                        break;
+                    }               
+                    pendingWrites_.remove();
+                } 
+            
+            }
+            catch(IOException ex)
+            {
+                logger_.error(LogUtil.throwableToString(ex));
+                // This is to fix the wierd Linux bug with NIO.
+                errorClose();
+            }
+            finally
+            {    
+                if (!pendingWrites_.isEmpty())
+                {                    
+                    turnOnInterestOps(key_, SelectionKey.OP_WRITE);
+                }
+            }
+        }
+    }
+    
+    // called in the selector thread
+    public void read(SelectionKey key)
+    {
+        turnOffInterestOps(key, SelectionKey.OP_READ);
+        // publish this event onto to the TCPReadEvent Queue.
+        MessagingService.getReadExecutor().execute(readWork_);
+    }
+    
+    class ReadWorkItem implements Runnable
+    {                 
+        // called from the TCP READ thread pool
+        public void run()
+        {                         
+            if ( tcpReader_ == null )
+            {
+                tcpReader_ = new TcpReader(TcpConnection.this);    
+                StartState nextState = tcpReader_.getSocketState(TcpReader.TcpReaderState.PREAMBLE);
+                if ( nextState == null )
+                {
+                    nextState = new ProtocolState(tcpReader_);
+                    tcpReader_.putSocketState(TcpReader.TcpReaderState.PREAMBLE, nextState);
+                }
+                tcpReader_.morphState(nextState);
+            }
+            
+            try
+            {           
+                byte[] bytes = new byte[0];
+                while ( (bytes = tcpReader_.read()).length > 0 )
+                {                       
+                    ProtocolHeader pH = tcpReader_.getProtocolHeader();                    
+                    if ( !pH.isStreamingMode_ )
+                    {
+                        /* first message received */
+                        if (remoteEp_ == null)
+                        {             
+                            int port = ( pH.isListening_ ) ? DatabaseDescriptor.getStoragePort() : EndPoint.sentinelPort_;
+                            remoteEp_ = new EndPoint( socketChannel_.socket().getInetAddress().getHostAddress(), port );                            
+                            // put connection into pool if possible
+                            pool_ = MessagingService.getConnectionPool(localEp_, remoteEp_);                            
+                            pool_.addToPool(TcpConnection.this);                            
+                        }
+                        
+                        /* Deserialize and handle the message */
+                        MessagingService.getDeserilizationExecutor().submit( new MessageDeserializationTask(pH.serializerType_, bytes) );                                                  
+                        tcpReader_.resetState();
+                    }
+                    else
+                    {
+                        MessagingService.setStreamingMode(false);
+                        /* Close this socket connection  used for streaming */
+                        closeSocket();
+                    }                    
+                }
+            }
+            catch ( IOException ex )
+            {                   
+                handleException(ex);
+            }
+            catch ( Throwable th )
+            {
+                handleException(th);
+            }
+            finally
+            {
+                turnOnInterestOps(key_, SelectionKey.OP_READ);
+            }
+        }
+        
+        private void handleException(Throwable th)
+        {
+            logger_.warn("Problem reading from socket connected to : " + socketChannel_);
+            logger_.warn(LogUtil.throwableToString(th));
+            // This is to fix the weird Linux bug with NIO.
+            errorClose();
+        }
+    }
+    
+    public int pending()
+    {
+        return pendingWrites_.size();
+    }
+    
+    public int compareTo(Object o)
+    {
+        if (o instanceof TcpConnection) 
+        {
+            return pendingWrites_.size() - ((TcpConnection) o).pendingWrites_.size();            
+        }
+                    
+        throw new IllegalArgumentException();
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionHandler.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionHandler.java Thu Jul 30 15:30:21 2009
@@ -1,60 +1,60 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net;
-
-import java.nio.channels.*;
-import java.io.IOException;
-import java.net.*;
-
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class TcpConnectionHandler extends SelectionKeyHandler
-{
-    private static Logger logger_ = Logger.getLogger(TcpConnectionHandler.class);
-    EndPoint localEp_;    
-    
-    public TcpConnectionHandler(EndPoint localEp) 
-    {
-        localEp_ = localEp;
-    }
-
-    public void accept(SelectionKey key)
-    {
-        try 
-        {            
-            ServerSocketChannel serverChannel = (ServerSocketChannel)key.channel();
-            SocketChannel client = serverChannel.accept();
-            
-            if ( client != null )
-            {                        
-                //new TcpConnection(client, localEp_, true);
-                TcpConnection.acceptConnection(client, localEp_, true);                
-            }            
-        } 
-        catch(IOException e) 
-        {
-            logger_.warn(LogUtil.throwableToString(e));
-        }
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.nio.channels.*;
+import java.io.IOException;
+import java.net.*;
+
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class TcpConnectionHandler extends SelectionKeyHandler
+{
+    private static Logger logger_ = Logger.getLogger(TcpConnectionHandler.class);
+    EndPoint localEp_;    
+    
+    public TcpConnectionHandler(EndPoint localEp) 
+    {
+        localEp_ = localEp;
+    }
+
+    public void accept(SelectionKey key)
+    {
+        try 
+        {            
+            ServerSocketChannel serverChannel = (ServerSocketChannel)key.channel();
+            SocketChannel client = serverChannel.accept();
+            
+            if ( client != null )
+            {                        
+                //new TcpConnection(client, localEp_, true);
+                TcpConnection.acceptConnection(client, localEp_, true);                
+            }            
+        } 
+        catch(IOException e) 
+        {
+            logger_.warn(LogUtil.throwableToString(e));
+        }
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java Thu Jul 30 15:30:21 2009
@@ -1,217 +1,217 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.locks.*;
-
-import org.apache.log4j.Logger;
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-class TcpConnectionManager
-{
-    private Lock lock_ = new ReentrantLock();
-    private List<TcpConnection> allConnections_;
-    private EndPoint localEp_;
-    private EndPoint remoteEp_;
-    private int initialSize_;
-    private int growthFactor_;
-    private int maxSize_;
-    private long lastTimeUsed_;
-    private boolean isShut_;
-    
-    private int inUse_;
-
-    TcpConnectionManager(int initialSize, int growthFactor, int maxSize, EndPoint localEp, EndPoint remoteEp)
-    {
-        initialSize_ = initialSize;
-        growthFactor_ = growthFactor;
-        maxSize_ = maxSize;
-        localEp_ = localEp;
-        remoteEp_ = remoteEp;     
-        isShut_ = false;                
-        lastTimeUsed_ = System.currentTimeMillis();        
-        allConnections_ = new Vector<TcpConnection>(); 
-    }
-    
-    TcpConnection getConnection() throws IOException
-    {
-        lock_.lock();
-        try
-        {
-            if (allConnections_.isEmpty()) 
-            {                
-                TcpConnection conn = new TcpConnection(this, localEp_, remoteEp_);
-                addToPool(conn);
-                conn.inUse_ = true;
-                incUsed();
-                return conn;
-            }
-            
-            TcpConnection least = getLeastLoaded();
-            
-            if ( (least != null && least.pending() == 0) || allConnections_.size() == maxSize_) {
-                least.inUse_ = true;
-                incUsed();
-                return least;
-            }
-                                    
-            TcpConnection connection = new TcpConnection(this, localEp_, remoteEp_);
-            if ( connection != null && !contains(connection) )
-            {
-                addToPool(connection);
-                connection.inUse_ = true;
-                incUsed();
-                return connection;
-            }
-            else
-            {
-                if ( connection != null )
-                {                
-                    connection.closeSocket();
-                }
-                return getLeastLoaded();
-            }
-        }
-        finally
-        {
-            lock_.unlock();
-        }
-    }
-    
-    protected TcpConnection getLeastLoaded() 
-    {  
-        TcpConnection connection = null;
-        lock_.lock();
-        try
-        {
-            Collections.sort(allConnections_);
-            connection = (allConnections_.size() > 0 ) ? allConnections_.get(0) : null;
-        }
-        finally
-        {
-            lock_.unlock();
-        }
-        return connection;
-    }
-    
-    void removeConnection(TcpConnection connection)
-    {
-        allConnections_.remove(connection);        
-    }
-    
-    void incUsed()
-    {
-        inUse_++;
-    }
-    
-    void decUsed()
-    {        
-        inUse_--;
-    }
-    
-    int getConnectionsInUse()
-    {
-        return inUse_;
-    }
-
-    void addToPool(TcpConnection connection)
-    { 
-        
-        if ( contains(connection) )
-            return;
-        
-        lock_.lock();
-        try
-        {
-            if ( allConnections_.size() < maxSize_ )
-            {                 
-                allConnections_.add(connection);                
-            }
-            else
-            {                
-                connection.closeSocket();
-            }
-        }
-        finally
-        {
-            lock_.unlock();
-        }
-    }
-    
-    void shutdown()
-    {    
-        lock_.lock();
-        try
-        {
-            while ( allConnections_.size() > 0 )
-            {
-                TcpConnection connection = allConnections_.remove(0);                        
-                connection.closeSocket();
-            }
-        }
-        finally
-        {
-            lock_.unlock();
-        }
-        isShut_ = true;
-    }
-
-    int getPoolSize()
-    {
-        return allConnections_.size();
-    }
-
-    EndPoint getLocalEndPoint()
-    {
-        return localEp_;
-    }
-    
-    EndPoint getRemoteEndPoint()
-    {
-        return remoteEp_;
-    }
-    
-    int getPendingWrites()
-    {
-        int total = 0;
-        lock_.lock();
-        try
-        {
-            for ( TcpConnection connection : allConnections_ )
-            {
-                total += connection.pending();
-            }
-        }
-        finally
-        {
-            lock_.unlock();
-        }
-        return total;
-    }
-    
-    boolean contains(TcpConnection connection)
-    {
-        return allConnections_.contains(connection);
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.*;
+
+import org.apache.log4j.Logger;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class TcpConnectionManager
+{
+    private Lock lock_ = new ReentrantLock();
+    private List<TcpConnection> allConnections_;
+    private EndPoint localEp_;
+    private EndPoint remoteEp_;
+    private int initialSize_;
+    private int growthFactor_;
+    private int maxSize_;
+    private long lastTimeUsed_;
+    private boolean isShut_;
+    
+    private int inUse_;
+
+    TcpConnectionManager(int initialSize, int growthFactor, int maxSize, EndPoint localEp, EndPoint remoteEp)
+    {
+        initialSize_ = initialSize;
+        growthFactor_ = growthFactor;
+        maxSize_ = maxSize;
+        localEp_ = localEp;
+        remoteEp_ = remoteEp;     
+        isShut_ = false;                
+        lastTimeUsed_ = System.currentTimeMillis();        
+        allConnections_ = new Vector<TcpConnection>(); 
+    }
+    
+    TcpConnection getConnection() throws IOException
+    {
+        lock_.lock();
+        try
+        {
+            if (allConnections_.isEmpty()) 
+            {                
+                TcpConnection conn = new TcpConnection(this, localEp_, remoteEp_);
+                addToPool(conn);
+                conn.inUse_ = true;
+                incUsed();
+                return conn;
+            }
+            
+            TcpConnection least = getLeastLoaded();
+            
+            if ( (least != null && least.pending() == 0) || allConnections_.size() == maxSize_) {
+                least.inUse_ = true;
+                incUsed();
+                return least;
+            }
+                                    
+            TcpConnection connection = new TcpConnection(this, localEp_, remoteEp_);
+            if ( connection != null && !contains(connection) )
+            {
+                addToPool(connection);
+                connection.inUse_ = true;
+                incUsed();
+                return connection;
+            }
+            else
+            {
+                if ( connection != null )
+                {                
+                    connection.closeSocket();
+                }
+                return getLeastLoaded();
+            }
+        }
+        finally
+        {
+            lock_.unlock();
+        }
+    }
+    
+    protected TcpConnection getLeastLoaded() 
+    {  
+        TcpConnection connection = null;
+        lock_.lock();
+        try
+        {
+            Collections.sort(allConnections_);
+            connection = (allConnections_.size() > 0 ) ? allConnections_.get(0) : null;
+        }
+        finally
+        {
+            lock_.unlock();
+        }
+        return connection;
+    }
+    
+    void removeConnection(TcpConnection connection)
+    {
+        allConnections_.remove(connection);        
+    }
+    
+    void incUsed()
+    {
+        inUse_++;
+    }
+    
+    void decUsed()
+    {        
+        inUse_--;
+    }
+    
+    int getConnectionsInUse()
+    {
+        return inUse_;
+    }
+
+    void addToPool(TcpConnection connection)
+    { 
+        
+        if ( contains(connection) )
+            return;
+        
+        lock_.lock();
+        try
+        {
+            if ( allConnections_.size() < maxSize_ )
+            {                 
+                allConnections_.add(connection);                
+            }
+            else
+            {                
+                connection.closeSocket();
+            }
+        }
+        finally
+        {
+            lock_.unlock();
+        }
+    }
+    
+    void shutdown()
+    {    
+        lock_.lock();
+        try
+        {
+            while ( allConnections_.size() > 0 )
+            {
+                TcpConnection connection = allConnections_.remove(0);                        
+                connection.closeSocket();
+            }
+        }
+        finally
+        {
+            lock_.unlock();
+        }
+        isShut_ = true;
+    }
+
+    int getPoolSize()
+    {
+        return allConnections_.size();
+    }
+
+    EndPoint getLocalEndPoint()
+    {
+        return localEp_;
+    }
+    
+    EndPoint getRemoteEndPoint()
+    {
+        return remoteEp_;
+    }
+    
+    int getPendingWrites()
+    {
+        int total = 0;
+        lock_.lock();
+        try
+        {
+            for ( TcpConnection connection : allConnections_ )
+            {
+                total += connection.pending();
+            }
+        }
+        finally
+        {
+            lock_.unlock();
+        }
+        return total;
+    }
+    
+    boolean contains(TcpConnection connection)
+    {
+        return allConnections_.contains(connection);
+    }
+}