You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2009/03/02 07:13:17 UTC

svn commit: r749207 [2/12] - in /incubator/cassandra/src/org/apache/cassandra: loader/ locator/ net/ net/http/ net/io/ net/sink/ procedures/ service/ test/ tools/

Added: incubator/cassandra/src/org/apache/cassandra/locator/RackAwareStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/locator/RackAwareStrategy.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/locator/RackAwareStrategy.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/locator/RackAwareStrategy.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,205 @@
+package org.apache.cassandra.locator;
+
+import java.math.BigInteger;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+
+
+/*
+ * This class returns the nodes responsible for a given
+ * key but does respects rack awareness. It makes a best
+ * effort to get a node from a different data center and
+ * a node in a different rack in the same datacenter as
+ * the primary.
+ */
+public class RackAwareStrategy extends AbstractStrategy
+{
+    public RackAwareStrategy(TokenMetadata tokenMetadata)
+    {
+        super(tokenMetadata);
+    }
+    
+    public EndPoint[] getStorageEndPoints(BigInteger token)
+    {
+        int startIndex = 0 ;
+        List<EndPoint> list = new ArrayList<EndPoint>();
+        boolean bDataCenter = false;
+        boolean bOtherRack = false;
+        int foundCount = 0;
+        int N = DatabaseDescriptor.getReplicationFactor();
+        Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+        List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
+        Collections.sort(tokens);
+        int index = Collections.binarySearch(tokens, token);
+        if(index < 0)
+        {
+            index = (index + 1) * (-1);
+            if (index >= tokens.size())
+                index = 0;
+        }
+        int totalNodes = tokens.size();
+        // Add the node at the index by default
+        list.add(tokenToEndPointMap.get(tokens.get(index)));
+        foundCount++;
+        if( N == 1 )
+        {
+            return list.toArray(new EndPoint[0]);
+        }
+        startIndex = (index + 1)%totalNodes;
+        IEndPointSnitch endPointSnitch = StorageService.instance().getEndPointSnitch();
+        
+        for (int i = startIndex, count = 1; count < totalNodes && foundCount < N; ++count, i = (i+1)%totalNodes)
+        {
+            try
+            {
+                // First try to find one in a different data center
+                if(!endPointSnitch.isInSameDataCenter(tokenToEndPointMap.get(tokens.get(index)), tokenToEndPointMap.get(tokens.get(i))))
+                {
+                    // If we have already found something in a diff datacenter no need to find another
+                    if( !bDataCenter )
+                    {
+                        list.add(tokenToEndPointMap.get(tokens.get(i)));
+                        bDataCenter = true;
+                        foundCount++;
+                    }
+                    continue;
+                }
+                // Now  try to find one on a different rack
+                if(!endPointSnitch.isOnSameRack(tokenToEndPointMap.get(tokens.get(index)), tokenToEndPointMap.get(tokens.get(i))) &&
+                        endPointSnitch.isInSameDataCenter(tokenToEndPointMap.get(tokens.get(index)), tokenToEndPointMap.get(tokens.get(i))))
+                {
+                    // If we have already found something in a diff rack no need to find another
+                    if( !bOtherRack )
+                    {
+                        list.add(tokenToEndPointMap.get(tokens.get(i)));
+                        bOtherRack = true;
+                        foundCount++;
+                    }
+                    continue;
+                }
+            }
+            catch (UnknownHostException e)
+            {
+                logger_.debug(LogUtil.throwableToString(e));
+            }
+
+        }
+        // If we found N number of nodes we are good. This loop wil just exit. Otherwise just
+        // loop through the list and add until we have N nodes.
+        for (int i = startIndex, count = 1; count < totalNodes && foundCount < N; ++count, i = (i+1)%totalNodes)
+        {
+            if( ! list.contains(tokenToEndPointMap.get(tokens.get(i))))
+            {
+                list.add(tokenToEndPointMap.get(tokens.get(i)));
+                foundCount++;
+                continue;
+            }
+        }
+        retrofitPorts(list);
+        return list.toArray(new EndPoint[0]);
+    }
+    
+    public Map<String, EndPoint[]> getStorageEndPoints(String[] keys)
+    {
+    	Map<String, EndPoint[]> results = new HashMap<String, EndPoint[]>();
+    	List<EndPoint> list = new ArrayList<EndPoint>();
+    	int startIndex = 0 ;
+    	int foundCount = 0;
+    	boolean bDataCenter = false;
+        boolean bOtherRack = false;
+    	
+    	Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+    	int N = DatabaseDescriptor.getReplicationFactor();
+        List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
+        Collections.sort(tokens);
+        
+        for ( String key : keys )
+        {
+        	BigInteger token = StorageService.hash(key);
+        	int index = Collections.binarySearch(tokens, token);
+            if(index < 0)
+            {
+                index = (index + 1) * (-1);
+                if (index >= tokens.size())
+                    index = 0;
+            }
+            int totalNodes = tokens.size();
+            // Add the node at the index by default
+            list.add(tokenToEndPointMap.get(tokens.get(index)));
+            foundCount++;
+            if( N == 1 )
+            {
+            	results.put( key, list.toArray(new EndPoint[0]) );
+                return results;
+            }
+            startIndex = (index + 1)%totalNodes;
+            IEndPointSnitch endPointSnitch = StorageService.instance().getEndPointSnitch();
+            
+            for (int i = startIndex, count = 1; count < totalNodes && foundCount < N; ++count, i = (i+1)%totalNodes)
+            {
+                try
+                {
+                    // First try to find one in a different data center
+                    if(!endPointSnitch.isInSameDataCenter(tokenToEndPointMap.get(tokens.get(index)), tokenToEndPointMap.get(tokens.get(i))))
+                    {
+                        // If we have already found something in a diff datacenter no need to find another
+                        if( !bDataCenter )
+                        {
+                            list.add(tokenToEndPointMap.get(tokens.get(i)));
+                            bDataCenter = true;
+                            foundCount++;
+                        }
+                        continue;
+                    }
+                    // Now  try to find one on a different rack
+                    if(!endPointSnitch.isOnSameRack(tokenToEndPointMap.get(tokens.get(index)), tokenToEndPointMap.get(tokens.get(i))) &&
+                            endPointSnitch.isInSameDataCenter(tokenToEndPointMap.get(tokens.get(index)), tokenToEndPointMap.get(tokens.get(i))))
+                    {
+                        // If we have already found something in a diff rack no need to find another
+                        if( !bOtherRack )
+                        {
+                            list.add(tokenToEndPointMap.get(tokens.get(i)));
+                            bOtherRack = true;
+                            foundCount++;
+                        }
+                        continue;
+                    }
+                }
+                catch (UnknownHostException e)
+                {
+                    logger_.debug(LogUtil.throwableToString(e));
+                }
+
+            }
+            // If we found N number of nodes we are good. This loop wil just exit. Otherwise just
+            // loop through the list and add until we have N nodes.
+            for (int i = startIndex, count = 1; count < totalNodes && foundCount < N; ++count, i = (i+1)%totalNodes)
+            {
+                if( ! list.contains(tokenToEndPointMap.get(tokens.get(i))))
+                {
+                    list.add(tokenToEndPointMap.get(tokens.get(i)));
+                    foundCount++;
+                    continue;
+                }
+            }
+            retrofitPorts(list);
+            results.put(key, list.toArray(new EndPoint[0]));
+        }
+        
+        return results;
+    }
+    
+    public EndPoint[] getStorageEndPoints(BigInteger token, Map<BigInteger, EndPoint> tokenToEndPointMap)
+    {
+        throw new UnsupportedOperationException("This operation is not currently supported");
+    }
+}
\ No newline at end of file

Added: incubator/cassandra/src/org/apache/cassandra/locator/RackUnawareStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/locator/RackUnawareStrategy.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/locator/RackUnawareStrategy.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/locator/RackUnawareStrategy.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,154 @@
+package org.apache.cassandra.locator;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.service.StorageService;
+
+
+/**
+ * This class returns the nodes responsible for a given
+ * key but does not respect rack awareness. Basically
+ * returns the 3 nodes that lie right next to each other
+ * on the ring.
+ */
+public class RackUnawareStrategy extends AbstractStrategy
+{   
+    /* Use this flag to check if initialization is in order. */
+    private AtomicBoolean initialized_ = new AtomicBoolean(false);
+    private Map<Range, List<EndPoint>> rangeToEndPointMap_;
+    
+    public RackUnawareStrategy(TokenMetadata tokenMetadata)
+    {
+        super(tokenMetadata);
+    }
+    
+    public EndPoint[] getStorageEndPoints(BigInteger token)
+    {
+        return getStorageEndPoints(token, tokenMetadata_.cloneTokenEndPointMap());            
+    }
+    
+    public EndPoint[] getStorageEndPoints(BigInteger token, Map<BigInteger, EndPoint> tokenToEndPointMap)
+    {
+        int startIndex = 0 ;
+        List<EndPoint> list = new ArrayList<EndPoint>();
+        int foundCount = 0;
+        int N = DatabaseDescriptor.getReplicationFactor();
+        List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
+        Collections.sort(tokens);
+        int index = Collections.binarySearch(tokens, token);
+        if(index < 0)
+        {
+            index = (index + 1) * (-1);
+            if (index >= tokens.size())
+                index = 0;
+        }
+        int totalNodes = tokens.size();
+        // Add the node at the index by default
+        list.add(tokenToEndPointMap.get(tokens.get(index)));
+        foundCount++;
+        startIndex = (index + 1)%totalNodes;
+        // If we found N number of nodes we are good. This loop will just exit. Otherwise just
+        // loop through the list and add until we have N nodes.
+        for (int i = startIndex, count = 1; count < totalNodes && foundCount < N; ++count, i = (i+1)%totalNodes)
+        {
+            if( ! list.contains(tokenToEndPointMap.get(tokens.get(i))))
+            {
+                list.add(tokenToEndPointMap.get(tokens.get(i)));
+                foundCount++;
+                continue;
+            }
+        }
+        retrofitPorts(list);
+        return list.toArray(new EndPoint[0]);
+    }
+    
+    private void doInitialization()
+    {
+        if ( !initialized_.get() )
+        {
+            /* construct the mapping from the ranges to the replicas responsible for them */
+            rangeToEndPointMap_ = StorageService.instance().getRangeToEndPointMap();            
+            initialized_.set(true);
+        }
+    }
+    
+    /**
+     * This method determines which range in the array actually contains
+     * the hash of the key
+     * @param ranges
+     * @param key
+     * @return
+     */
+    private int findRangeIndexForKey(Range[] ranges, String key)
+    {
+        int index = 0;
+        BigInteger hash = StorageService.hash(key);
+        for ( int i = 0; i < ranges.length; ++i )
+        {
+            if ( ranges[i].contains(hash) )
+            {
+                index = i;
+                break;
+            }
+        }
+        
+        return index;
+    }
+    
+    public Map<String, EndPoint[]> getStorageEndPoints(String[] keys)
+    {              
+        Arrays.sort(keys);
+        Range[] ranges = StorageService.instance().getAllRanges();
+        
+    	Map<String, EndPoint[]> results = new HashMap<String, EndPoint[]>();
+    	List<EndPoint> list = new ArrayList<EndPoint>();
+    	int startIndex = 0 ;
+    	int foundCount = 0;
+    	
+    	Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+    	int N = DatabaseDescriptor.getReplicationFactor();
+        List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
+        Collections.sort(tokens);
+        for ( String key : keys )
+        {
+        	BigInteger token = StorageService.hash(key);
+        	int index = Collections.binarySearch(tokens, token);
+            if(index < 0)
+            {
+                index = (index + 1) * (-1);
+                if (index >= tokens.size())
+                    index = 0;
+            }
+            int totalNodes = tokens.size();
+            // Add the node at the index by default
+            list.add(tokenToEndPointMap.get(tokens.get(index)));
+            foundCount++;
+            startIndex = (index + 1)%totalNodes;
+            // If we found N number of nodes we are good. This loop will just exit. Otherwise just
+            // loop through the list and add until we have N nodes.
+            for (int i = startIndex, count = 1; count < totalNodes && foundCount < N; ++count, i = (i+1)%totalNodes)
+            {
+                if( ! list.contains(tokenToEndPointMap.get(tokens.get(i))))
+                {
+                    list.add(tokenToEndPointMap.get(tokens.get(i)));
+                    foundCount++;
+                    continue;
+                }
+            }
+            retrofitPorts(list);
+            results.put(key, list.toArray(new EndPoint[0]));
+        }
+        
+        return results;
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/locator/TokenMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/locator/TokenMetadata.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/locator/TokenMetadata.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/locator/TokenMetadata.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.CompactEndPointSerializationHelper;
+import org.apache.cassandra.net.EndPoint;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class TokenMetadata
+{
+    private static ICompactSerializer<TokenMetadata> serializer_ = new TokenMetadataSerializer();
+    
+    public static ICompactSerializer<TokenMetadata> serializer()
+    {
+        return serializer_;
+    }
+    
+    /* Maintains token to endpoint map of every node in the cluster. */    
+    private Map<BigInteger, EndPoint> tokenToEndPointMap_ = new HashMap<BigInteger, EndPoint>();    
+    /* Maintains a reverse index of endpoint to token in the cluster. */
+    private Map<EndPoint, BigInteger> endPointToTokenMap_ = new HashMap<EndPoint, BigInteger>();
+    
+    /* Use this lock for manipulating the token map */
+    private ReadWriteLock lock_ = new ReentrantReadWriteLock(true);
+    
+    /*
+     * For JAXB purposes. 
+    */
+    public TokenMetadata()
+    {
+    }
+    
+    protected TokenMetadata(Map<BigInteger, EndPoint> tokenToEndPointMap, Map<EndPoint, BigInteger> endPointToTokenMap)
+    {
+        tokenToEndPointMap_ = tokenToEndPointMap;
+        endPointToTokenMap_ = endPointToTokenMap;
+    }
+    
+    public TokenMetadata cloneMe()
+    {
+        Map<BigInteger, EndPoint> tokenToEndPointMap = cloneTokenEndPointMap();
+        Map<EndPoint, BigInteger> endPointToTokenMap = cloneEndPointTokenMap();
+        return new TokenMetadata( tokenToEndPointMap, endPointToTokenMap );
+    }
+    
+    /**
+     * Update the two maps in an safe mode. 
+    */
+    public void update(BigInteger token, EndPoint endpoint)
+    {
+        lock_.writeLock().lock();
+        try
+        {            
+            BigInteger oldToken = endPointToTokenMap_.get(endpoint);
+            if ( oldToken != null )
+                tokenToEndPointMap_.remove(oldToken);
+            tokenToEndPointMap_.put(token, endpoint);
+            endPointToTokenMap_.put(endpoint, token);
+        }
+        finally
+        {
+            lock_.writeLock().unlock();
+        }
+    }
+    
+    /**
+     * Remove the entries in the two maps.
+     * @param endpoint
+     */
+    public void remove(EndPoint endpoint)
+    {
+        lock_.writeLock().lock();
+        try
+        {            
+            BigInteger oldToken = endPointToTokenMap_.get(endpoint);
+            if ( oldToken != null )
+                tokenToEndPointMap_.remove(oldToken);            
+            endPointToTokenMap_.remove(endpoint);
+        }
+        finally
+        {
+            lock_.writeLock().unlock();
+        }
+    }
+    
+    public BigInteger getToken(EndPoint endpoint)
+    {
+        lock_.readLock().lock();
+        try
+        {
+            return endPointToTokenMap_.get(endpoint);
+        }
+        finally
+        {
+            lock_.readLock().unlock();
+        }
+    }
+    
+    public boolean isKnownEndPoint(EndPoint ep)
+    {
+        lock_.readLock().lock();
+        try
+        {
+            return endPointToTokenMap_.containsKey(ep);
+        }
+        finally
+        {
+            lock_.readLock().unlock();
+        }
+    }
+    
+    /*
+     * Returns a safe clone of tokenToEndPointMap_.
+    */
+    public Map<BigInteger, EndPoint> cloneTokenEndPointMap()
+    {
+        lock_.readLock().lock();
+        try
+        {            
+            return new HashMap<BigInteger, EndPoint>( tokenToEndPointMap_ );
+        }
+        finally
+        {
+            lock_.readLock().unlock();
+        }
+    }
+    
+    /*
+     * Returns a safe clone of endPointTokenMap_.
+    */
+    public Map<EndPoint, BigInteger> cloneEndPointTokenMap()
+    {
+        lock_.readLock().lock();
+        try
+        {            
+            return new HashMap<EndPoint, BigInteger>( endPointToTokenMap_ );
+        }
+        finally
+        {
+            lock_.readLock().unlock();
+        }
+    }
+    
+    public String toString()
+    {
+        StringBuilder sb = new StringBuilder();
+        Set<EndPoint> eps = endPointToTokenMap_.keySet();
+        
+        for ( EndPoint ep : eps )
+        {
+            sb.append(ep);
+            sb.append(":");
+            sb.append(endPointToTokenMap_.get(ep));
+            sb.append(System.getProperty("line.separator"));
+        }
+        
+        return sb.toString();
+    }
+}
+
+class TokenMetadataSerializer implements ICompactSerializer<TokenMetadata>
+{
+    public void serialize(TokenMetadata tkMetadata, DataOutputStream dos) throws IOException
+    {        
+        Map<BigInteger, EndPoint> tokenToEndPointMap = tkMetadata.cloneTokenEndPointMap();
+        Set<BigInteger> tokens = tokenToEndPointMap.keySet();
+        /* write the size */
+        dos.writeInt(tokens.size());        
+        for ( BigInteger token : tokens )
+        {
+            byte[] bytes = token.toByteArray();
+            /* Convert the BigInteger to byte[] and persist */
+            dos.writeInt(bytes.length);
+            dos.write(bytes); 
+            /* Write the endpoint out */
+            CompactEndPointSerializationHelper.serialize(tokenToEndPointMap.get(token), dos);
+        }
+    }
+    
+    public TokenMetadata deserialize(DataInputStream dis) throws IOException
+    {
+        TokenMetadata tkMetadata = null;
+        int size = dis.readInt();
+        
+        if ( size > 0 )
+        {
+            Map<BigInteger, EndPoint> tokenToEndPointMap = new HashMap<BigInteger, EndPoint>();
+            Map<EndPoint, BigInteger> endPointToTokenMap = new HashMap<EndPoint, BigInteger>();
+            
+            for ( int i = 0; i < size; ++i )
+            {
+                /* Read the byte[] and convert to BigInteger */
+                byte[] bytes = new byte[dis.readInt()];
+                dis.readFully(bytes);
+                BigInteger token = new BigInteger(bytes);
+                /* Read the endpoint out */
+                EndPoint endpoint = CompactEndPointSerializationHelper.deserialize(dis);
+                tokenToEndPointMap.put(token, endpoint);
+                endPointToTokenMap.put(endpoint, token);
+            }
+            
+            tkMetadata = new TokenMetadata( tokenToEndPointMap, endPointToTokenMap );
+        }
+        
+        return tkMetadata;
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/net/AsyncResult.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/AsyncResult.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/AsyncResult.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/AsyncResult.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.util.List;
+import java.util.Hashtable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.service.QuorumResponseHandler;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class AsyncResult implements IAsyncResult
+{
+    private static Logger logger_ = Logger.getLogger( AsyncResult.class );
+    private Object[] result_ = new Object[0];    
+    private AtomicBoolean done_ = new AtomicBoolean(false);
+    private Lock lock_ = new ReentrantLock();
+    private Condition condition_;
+
+    public AsyncResult()
+    {        
+        condition_ = lock_.newCondition();
+    }    
+    
+    public Object[] get()
+    {
+        lock_.lock();
+        try
+        {
+            if ( !done_.get() )
+            {
+                condition_.await();                    
+            }
+        }
+        catch ( InterruptedException ex )
+        {
+            logger_.warn( LogUtil.throwableToString(ex) );
+        }
+        finally
+        {
+            lock_.unlock();            
+        }        
+        return result_;
+    }
+    
+    public boolean isDone()
+    {
+        return done_.get();
+    }
+    
+    public Object[] get(long timeout, TimeUnit tu) throws TimeoutException
+    {
+        lock_.lock();
+        try
+        {            
+            boolean bVal = true;
+            try
+            {
+                if ( !done_.get() )
+                {                    
+                    bVal = condition_.await(timeout, tu);
+                }
+            }
+            catch ( InterruptedException ex )
+            {
+                logger_.warn( LogUtil.throwableToString(ex) );
+            }
+            
+            if ( !bVal && !done_.get() )
+            {                                           
+                throw new TimeoutException("Operation timed out.");
+            }
+        }
+        finally
+        {
+            lock_.unlock();      
+        }
+        return result_;
+    }
+    
+    void result(Object[] result)
+    {        
+        try
+        {
+            lock_.lock();
+            if ( !done_.get() )
+            {
+                result_ = result;
+                done_.set(true);
+                condition_.signal();
+            }
+        }
+        finally
+        {
+            lock_.unlock();
+        }        
+    }    
+}

Added: incubator/cassandra/src/org/apache/cassandra/net/CompactEndPointSerializationHelper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/CompactEndPointSerializationHelper.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/CompactEndPointSerializationHelper.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/CompactEndPointSerializationHelper.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.*;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class CompactEndPointSerializationHelper
+{
+    public static void serialize(EndPoint endPoint, DataOutputStream dos) throws IOException
+    {        
+        dos.write(EndPoint.toBytes(endPoint));
+    }
+    
+    public static EndPoint deserialize(DataInputStream dis) throws IOException
+    {     
+        byte[] bytes = new byte[6];
+        dis.readFully(bytes, 0, bytes.length);
+        return EndPoint.fromBytes(bytes);       
+    }
+    
+    private static byte[] getIPAddress(String host) throws UnknownHostException
+    {
+        InetAddress ia = InetAddress.getByName(host);         
+        return ia.getAddress();
+    }
+    
+    private static String getHostName(byte[] ipAddr) throws UnknownHostException
+    {
+        InetAddress ia = InetAddress.getByAddress(ipAddr);
+        return ia.getCanonicalHostName();
+    }
+    
+    public static void main(String[] args) throws Throwable
+    {
+        EndPoint ep = new EndPoint(7000);
+        byte[] bytes = EndPoint.toBytes(ep);
+        System.out.println(bytes.length);
+        EndPoint ep2 = EndPoint.fromBytes(bytes);
+        System.out.println(ep2);
+    }
+}
\ No newline at end of file

Added: incubator/cassandra/src/org/apache/cassandra/net/ConnectionStatistics.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/ConnectionStatistics.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/ConnectionStatistics.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/ConnectionStatistics.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class ConnectionStatistics
+{
+    private String localHost;
+    private int localPort;
+    private String remoteHost;
+    private int remotePort;
+    private int totalConnections;
+    private int connectionsInUse;
+
+    ConnectionStatistics(EndPoint localEp, EndPoint remoteEp, int tc, int ciu)
+    {
+        localHost = localEp.getHost();
+        localPort = localEp.getPort();
+        remoteHost = remoteEp.getHost();
+        remotePort = remoteEp.getPort();
+        totalConnections = tc;
+        connectionsInUse = ciu;
+    }
+    
+    public String getLocalHost()
+    {
+        return localHost;
+    }
+    
+    public int getLocalPort()
+    {
+        return localPort;
+    }
+    
+    public String getRemoteHost()
+    {
+        return remoteHost;
+    }
+    
+    public int getRemotePort()
+    {
+        return remotePort;
+    }
+    
+    public int getTotalConnections()
+    {
+        return totalConnections;
+    }
+    
+    public int getConnectionInUse()
+    {
+        return connectionsInUse;
+    }
+
+    public String toString()
+    {
+        return localHost + ":" + localPort + "->" + remoteHost + ":" + remotePort + " Total Connections open : " + totalConnections + " Connections in use : " + connectionsInUse;
+    }
+}
\ No newline at end of file

Added: incubator/cassandra/src/org/apache/cassandra/net/EndPoint.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/EndPoint.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/EndPoint.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/EndPoint.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1 @@
+/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.cassandra.net;


import java.io.IOException;
import java.io.Serializable;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.u
 til.HashMap;
import java.util.Map;

import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.LogUtil;
import org.apache.log4j.Logger;

/**
 * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
 */

public class EndPoint implements Serializable, Comparable<EndPoint>
{
	// logging and profiling.
	private static Logger logger_ = Logger.getLogger(EndPoint.class);
	private static final long serialVersionUID = -4962625949179835907L;
	private static Map<CharBuffer, String> hostNames_ = new HashMap<CharBuffer, String>();
    protected static final int randomPort_ = 5555;
    public static EndPoint randomLocalEndPoint_;
    
    static
    {
        try
        {
            randomLocalEndPoint_ = new EndPoint(FBUtilities.getHostName(), EndPoint.randomPort_);
        }        
        catch ( IOException ex )
        {
            logger_.warn(LogUtil.throwableToString(ex));
        }
    }

	private String host_;
	pri
 vate int port_;

	private transient InetSocketAddress ia_;

	/* Ctor for JAXB. DO NOT DELETE */
	private EndPoint()
	{
	}

	public EndPoint(String host, int port)
	{
		/*
		 * Attempts to resolve the host, but does not fail if it cannot.
		 */
		host_ = host;
		port_ = port;
	}

	// create a local endpoint id
	public EndPoint(int port)
	{
		try
		{
			host_ = FBUtilities.getLocalHostName();
			port_ = port;
		}
		catch (UnknownHostException e)
		{
			logger_.warn(LogUtil.throwableToString(e));
		}
	}

	public String getHost()
	{
		return host_;
	}

	public int getPort()
	{
		return port_;
	}

	public void setPort(int port)
	{
		port_ = port;
	}

	public InetSocketAddress getInetAddress()
	{
		if (ia_ == null || ia_.isUnresolved())
		{
			ia_ = new InetSocketAddress(host_, port_);
		}
		return ia_;
	}

	public boolean equals(Object o)
	{
		if (!(o instanceof EndPoint))
			return false;

		EndPoint rhs = (EndPoint) o;
		return (host_.equals(rhs.host_) && port_ == rhs.port_);
	
 }

	public int hashCode()
	{
		return (host_ + port_).hashCode();
	}

	public int compareTo(EndPoint rhs)
	{
		return host_.compareTo(rhs.host_);
	}

	public String toString()
	{
		return (host_ + ":" + port_);
	}

	public static EndPoint fromString(String str)
	{
		String[] values = str.split(":");
		return new EndPoint(values[0], Integer.parseInt(values[1]));
	}

	public static byte[] toBytes(EndPoint ep)
	{
		ByteBuffer buffer = ByteBuffer.allocate(6);
		byte[] iaBytes = ep.getInetAddress().getAddress().getAddress();
		buffer.put(iaBytes);
		buffer.put(MessagingService.toByteArray((short) ep.getPort()));
		buffer.flip();
		return buffer.array();
	}

	public static EndPoint fromBytes(byte[] bytes)
	{
		ByteBuffer buffer = ByteBuffer.allocate(4);
		System.arraycopy(bytes, 0, buffer.array(), 0, 4);
		byte[] portBytes = new byte[2];
		System.arraycopy(bytes, 4, portBytes, 0, portBytes.length);
		try
		{
			CharBuffer charBuffer = buffer.asCharBuffer();
			String host = hostNa
 mes_.get(charBuffer);
			if (host == null)
			{				
				host = InetAddress.getByAddress(buffer.array()).getHostName();				
				hostNames_.put(charBuffer, host);
			}
			int port = (int) MessagingService.byteArrayToShort(portBytes);
			return new EndPoint(host, port);
		}
		catch (UnknownHostException e)
		{
			throw new IllegalArgumentException(e);
		}
	}
}
\ No newline at end of file

Added: incubator/cassandra/src/org/apache/cassandra/net/FileStreamTask.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/FileStreamTask.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/FileStreamTask.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/FileStreamTask.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.*;
+import java.net.SocketException;
+
+import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class FileStreamTask implements Runnable
+{
+    private static Logger logger_ = Logger.getLogger( FileStreamTask.class );
+    
+    private String file_;
+    private long startPosition_;
+    private long total_;
+    private EndPoint from_;
+    private EndPoint to_;
+    
+    FileStreamTask(String file, long startPosition, long total, EndPoint from, EndPoint to)
+    {
+        file_ = file;
+        startPosition_ = startPosition;
+        total_ = total;
+        from_ = from;
+        to_ = to;
+    }
+    
+    public void run()
+    {
+        TcpConnection connection = null;
+        try
+        {                        
+            connection = new TcpConnection(from_, to_);
+            File file = new File(file_);             
+            connection.stream(file, startPosition_, total_);
+            MessagingService.setStreamingMode(false);
+            logger_.debug("Done streaming " + file);
+        }            
+        catch ( SocketException se )
+        {                        
+            logger_.info(LogUtil.throwableToString(se));
+        }
+        catch ( IOException e )
+        {
+            logConnectAndIOException(e, connection);
+        }
+        catch (Throwable th)
+        {
+            logger_.warn(LogUtil.throwableToString(th));
+        }        
+    }
+    
+    private void logConnectAndIOException(IOException ex, TcpConnection connection)
+    {                    
+        if ( connection != null )
+        {
+            connection.errorClose();
+        }
+        logger_.info(LogUtil.throwableToString(ex));
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/net/Header.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/Header.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/Header.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/Header.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.utils.GuidGenerator;
+
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class Header implements java.io.Serializable
+{
+    static final long serialVersionUID = -3194851946523170022L;
+    private static ICompactSerializer<Header> serializer_;
+    private static AtomicInteger idGen_ = new AtomicInteger(0);
+    
+    static
+    {
+        serializer_ = new HeaderSerializer();        
+    }
+    
+    static ICompactSerializer<Header> serializer()
+    {
+        return serializer_;
+    }
+
+    private EndPoint from_;
+    private String type_;
+    private String verb_;
+    private String messageId_;
+    protected Map<String, byte[]> details_ = new Hashtable<String, byte[]>();
+    
+    Header(String id, EndPoint from, String messageType, String verb)
+    {
+        messageId_ = id;
+        from_ = from;
+        type_ = messageType;
+        verb_ = verb;        
+    }
+    
+    Header(String id, EndPoint from, String messageType, String verb, Map<String, byte[]> details)
+    {
+        this(id, from, messageType, verb);
+        details_ = details;
+    }
+
+    Header(EndPoint from, String messageType, String verb)
+    {
+        messageId_ = Integer.toString(idGen_.incrementAndGet());
+        from_ = from;
+        type_ = messageType;
+        verb_ = verb;
+    }        
+
+    EndPoint getFrom()
+    {
+        return from_;
+    }
+
+    String getMessageType()
+    {
+        return type_;
+    }
+
+    String getVerb()
+    {
+        return verb_;
+    }
+
+    String getMessageId()
+    {
+        return messageId_;
+    }
+
+    void setMessageId(String id)
+    {
+        messageId_ = id;
+    }
+    
+    void setMessageType(String type)
+    {
+        type_ = type;
+    }
+    
+    void setMessageVerb(String verb)
+    {
+        verb_ = verb;
+    }
+    
+    byte[] getDetail(Object key)
+    {
+        return details_.get(key);
+    }
+    
+    void removeDetail(Object key)
+    {
+        details_.remove(key);
+    }
+    
+    void addDetail(String key, byte[] value)
+    {
+        details_.put(key, value);
+    }
+    
+    Map<String, byte[]> getDetails()
+    {
+        return details_;
+    }
+}
+
+class HeaderSerializer implements ICompactSerializer<Header>
+{
+    public void serialize(Header t, DataOutputStream dos) throws IOException
+    {           
+        dos.writeUTF(t.getMessageId());
+        CompactEndPointSerializationHelper.serialize(t.getFrom(), dos);
+        dos.writeUTF(t.getMessageType());
+        dos.writeUTF( t.getVerb() );
+        
+        /* Serialize the message header */
+        int size = t.details_.size();
+        dos.writeInt(size);
+        Set<String> keys = t.details_.keySet();
+        
+        for( String key : keys )
+        {
+            dos.writeUTF(key);
+            byte[] value = t.details_.get(key);
+            dos.writeInt(value.length);
+            dos.write(value);
+        }
+    }
+
+    public Header deserialize(DataInputStream dis) throws IOException
+    {
+        String id = dis.readUTF();
+        EndPoint from = CompactEndPointSerializationHelper.deserialize(dis);
+        String type = dis.readUTF();
+        String verb = dis.readUTF();
+        
+        /* Deserializing the message header */
+        int size = dis.readInt();
+        Map<String, byte[]> details = new Hashtable<String, byte[]>(size);
+        for ( int i = 0; i < size; ++i )
+        {
+            String key = dis.readUTF();
+            int length = dis.readInt();
+            byte[] bytes = new byte[length];
+            dis.readFully(bytes);
+            details.put(key, bytes);
+        }
+        
+        return new Header(id, from, type, verb, details);
+    }
+}
+
+

Added: incubator/cassandra/src/org/apache/cassandra/net/HeaderTypes.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/HeaderTypes.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/HeaderTypes.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/HeaderTypes.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class HeaderTypes 
+{
+    public final static String TASK_PROFILE_CHAIN = "TASK_PROFILE_CHAIN";
+    public static String TASK_ID = "TASK_ID";
+}

Added: incubator/cassandra/src/org/apache/cassandra/net/IAsyncCallback.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/IAsyncCallback.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/IAsyncCallback.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/IAsyncCallback.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IAsyncCallback 
+{
+	/**
+	 * @param response responses to be returned
+	 */
+	public void response(Message msg);
+}

Added: incubator/cassandra/src/org/apache/cassandra/net/IAsyncResult.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/IAsyncResult.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/IAsyncResult.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/IAsyncResult.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IAsyncResult
+{
+    public Object[] get();
+    public boolean isDone();
+    public Object[] get(long timeout, TimeUnit tu) throws TimeoutException; 
+}

Added: incubator/cassandra/src/org/apache/cassandra/net/IMessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/IMessagingService.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/IMessagingService.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/IMessagingService.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+
+import javax.xml.bind.JAXBException;
+
+import org.apache.cassandra.concurrent.IStage;
+
+
+/**
+ * An IMessagingService provides the methods for sending messages to remote
+ * endpoints. IMessagingService enables the sending of request-response style
+ * messages and fire-forget style messages.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IMessagingService
+{   
+	/**
+     * Register a verb and the corresponding verb handler with the
+     * Messaging Service.
+     * @param type name of the verb.     
+     * @param verbHandler handler for the specified verb
+     */
+    public void registerVerbHandlers(String type, IVerbHandler verbHandler);
+    
+    /**
+     * Deregister all verbhandlers corresponding to localEndPoint.
+     * @param localEndPoint
+     */
+    public void deregisterAllVerbHandlers(EndPoint localEndPoint);
+    
+    /**
+     * Deregister a verbhandler corresponding to the verb from the
+     * Messaging Service.
+     * @param type name of the verb.      
+     */
+    public void deregisterVerbHandlers(String type);
+    
+    /**
+     * Listen on the specified port.
+     * @param ep EndPoint whose port to listen on.
+     * @param isHttp specify if the port is an Http port.     
+     */
+    public void listen(EndPoint ep, boolean isHttp) throws IOException;
+    
+    /**
+     * Listen on the specified port.
+     * @param ep EndPoint whose port to listen on.     
+     */
+    public void listenUDP(EndPoint ep);
+    
+    /**
+     * Send a message to a given endpoint. 
+     * @param message message to be sent.
+     * @param to endpoint to which the message needs to be sent
+     * @return an reference to an IAsyncResult which can be queried for the
+     * response
+     */
+    public IAsyncResult sendRR(Message message, EndPoint to);
+
+    /**
+     * Send a message to the given set of endpoints and informs the MessagingService
+     * to wait for at least <code>howManyResults</code> responses to determine success
+     * of failure.
+     * @param message message to be sent.
+     * @param to endpoints to which the message needs to be sent
+     * @param cb callback interface which is used to pass the responses
+     * @return an reference to message id used to match with the result
+     */
+    public String sendRR(Message message, EndPoint[] to, IAsyncCallback cb);
+    
+    /**
+     * Send a message to a given endpoint. This method specifies a callback
+     * which is invoked with the actual response.
+     * @param message message to be sent.
+     * @param to endpoint to which the message needs to be sent
+     * @param cb callback interface which is used to pass the responses or
+     *           suggest that a timeout occured to the invoker of the send().
+     *           suggest that a timeout occured to the invoker of the send().
+     * @return an reference to message id used to match with the result
+     */
+    public String sendRR(Message message, EndPoint to, IAsyncCallback cb);
+
+    /**
+     * Send a message to a given endpoint. The ith element in the <code>messages</code>
+     * array is sent to the ith element in the <code>to</code> array.This method assumes
+     * there is a one-one mapping between the <code>messages</code> array and
+     * the <code>to</code> array. Otherwise an  IllegalArgumentException will be thrown.
+     * This method also informs the MessagingService to wait for at least
+     * <code>howManyResults</code> responses to determine success of failure.
+     * @param messages messages to be sent.
+     * @param to endpoints to which the message needs to be sent
+     * @param cb callback interface which is used to pass the responses or
+     *           suggest that a timeout occured to the invoker of the send().
+     *           suggest that a timeout occured to the invoker of the send().
+     * @return an reference to message id used to match with the result
+     */
+    public String sendRR(Message[] messages, EndPoint[] to, IAsyncCallback cb);
+
+    /**
+     * Send a message to a given endpoint. This method adheres to the fire and forget
+     * style messaging.
+     * @param message messages to be sent.
+     * @param to endpoint to which the message needs to be sent
+     */
+    public void sendOneWay(Message message, EndPoint to);
+        
+    /**
+     * Send a message to a given endpoint. This method adheres to the fire and forget
+     * style messaging.
+     * @param message messages to be sent.
+     * @param to endpoint to which the message needs to be sent
+     */
+    public void sendUdpOneWay(Message message, EndPoint to);
+    
+    /**
+     * Stream a file from source to destination. This is highly optimized
+     * to not hold any of the contents of the file in memory.
+     * @param file name of file to stream.
+     * param start position inside the file
+     * param total number of bytes to stream
+     * param to endpoint to which we need to stream the file.
+    */
+    public void stream(String file, long startPosition, long total, EndPoint from, EndPoint to);
+
+    /**
+     * This method returns the verb handler associated with the registered
+     * verb. If no handler has been registered then null is returned.
+     * @param verb for which the verb handler is sought
+     * @return a reference to IVerbHandler which is the handler for the specified verb
+     */
+    public IVerbHandler getVerbHandler(String verb);    
+}

Added: incubator/cassandra/src/org/apache/cassandra/net/IVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/IVerbHandler.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/IVerbHandler.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/IVerbHandler.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+/**
+ * IVerbHandler provides the method that all verb handlers need to implement.
+ * The concrete implementation of this interface would provide the functionality
+ * for a given verb.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IVerbHandler
+{
+    /**
+     * This method delivers a message to the implementing class (if the implementing
+     * class was registered by a call to MessagingService.registerVerbHandlers).
+     * Note that the caller should not be holding any locks when calling this method
+     * because the implementation may be synchronized.
+     * 
+     * @param message - incoming message that needs handling.     
+     */
+    public void doVerb(Message message);
+}

Added: incubator/cassandra/src/org/apache/cassandra/net/Message.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/Message.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/Message.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/Message.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.lang.reflect.Array;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.*;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.utils.*;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class Message implements java.io.Serializable
+{    
+    static final long serialVersionUID = 6329198792470413221L;
+    private static ICompactSerializer<Message> serializer_;
+    
+    static
+    {
+        serializer_ = new MessageSerializer();        
+    }
+    
+    public static ICompactSerializer<Message> serializer()
+    {
+        return serializer_;
+    }
+    
+    Header header_;
+    private Object[] body_ = new Object[0];
+    
+    /* Ctor for JAXB. DO NOT DELETE */
+    private Message()
+    {
+    }
+
+    protected Message(String id, EndPoint from, String messageType, String verb, Object[] body)
+    {
+        header_ = new Header(id, from, messageType, verb);
+        body_ = body;
+    }
+    
+    protected Message(Header header, Object[] body)
+    {
+        header_ = header;
+        body_ = body;
+    }
+
+    public Message(EndPoint from, String messageType, String verb, Object[] body)
+    {
+        header_ = new Header(from, messageType, verb);
+        body_ = body;
+    }    
+    
+    public byte[] getHeader(Object key)
+    {
+        return header_.getDetail(key);
+    }
+    
+    public void removeHeader(Object key)
+    {
+        header_.removeDetail(key);
+    }
+    
+    public void setMessageType(String type)
+    {
+        header_.setMessageType(type);
+    }
+
+    public void setMessageVerb(String verb)
+    {
+        header_.setMessageVerb(verb);
+    }
+
+    public void addHeader(String key, byte[] value)
+    {
+        header_.addDetail(key, value);
+    }
+    
+    public Map<String, byte[]> getHeaders()
+    {
+        return header_.getDetails();
+    }
+
+    public Object[] getMessageBody()
+    {
+        return body_;
+    }
+    
+    public void setMessageBody(Object[] body)
+    {
+        body_ = body;
+    }
+
+    public EndPoint getFrom()
+    {
+        return header_.getFrom();
+    }
+
+    public String getMessageType()
+    {
+        return header_.getMessageType();
+    }
+
+    public String getVerb()
+    {
+        return header_.getVerb();
+    }
+
+    public String getMessageId()
+    {
+        return header_.getMessageId();
+    }
+    
+    public Class[] getTypes()
+    {
+        List<Class> types = new ArrayList<Class>();
+        
+        for ( int i = 0; i < body_.length; ++i )
+        {
+            if ( body_[i].getClass().isArray() )
+            {
+                int size = Array.getLength(body_[i]);
+                if ( size > 0 )
+                {
+                    types.add( Array.get( body_[i], 0).getClass() );
+                }
+            }
+            else
+            {
+                types.add(body_[i].getClass());
+            }
+        }
+        
+        return types.toArray( new Class[0] );
+    }    
+
+    void setMessageId(String id)
+    {
+        header_.setMessageId(id);
+    }    
+
+    public Message getReply(EndPoint from, Object[] args)
+    {        
+        Message response = new Message(getMessageId(),
+                                       from,
+                                       MessagingService.responseStage_,
+                                       MessagingService.responseVerbHandler_,
+                                       args);
+        return response;
+    }
+    
+    public String toString()
+    {
+        StringBuffer sbuf = new StringBuffer("");
+        String separator = System.getProperty("line.separator");
+        sbuf.append("ID:" + getMessageId());
+        sbuf.append(separator);
+        sbuf.append("FROM:" + getFrom());
+        sbuf.append(separator);
+        sbuf.append("TYPE:" + getMessageType());
+        sbuf.append(separator);
+        sbuf.append("VERB:" + getVerb());
+        sbuf.append(separator);
+        sbuf.append("BODY TYPE:" + getBodyTypes());        
+        sbuf.append(separator);
+        return sbuf.toString();
+    }
+    
+    private String getBodyTypes()
+    {
+        StringBuffer sbuf = new StringBuffer("");
+        Class[] types = getTypes();
+        for ( int i = 0; i < types.length; ++i )
+        {
+            sbuf.append(types[i].getName());
+            sbuf.append(" ");         
+        }
+        return sbuf.toString();
+    }    
+}
+
+class MessageSerializer implements ICompactSerializer<Message>
+{
+    public void serialize(Message t, DataOutputStream dos) throws IOException
+    {
+        Header.serializer().serialize( t.header_, dos);
+        byte[] bytes = (byte[])t.getMessageBody()[0];
+        dos.writeInt(bytes.length);
+        dos.write(bytes);
+    }
+
+    public Message deserialize(DataInputStream dis) throws IOException
+    {
+        Header header = Header.serializer().deserialize(dis);
+        int size = dis.readInt();
+        byte[] bytes = new byte[size];
+        dis.readFully(bytes);
+        // return new Message(header.getMessageId(), header.getFrom(), header.getMessageType(), header.getVerb(), new Object[]{bytes});
+        return new Message(header, new Object[]{bytes});
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/net/MessageDeliveryTask.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/MessageDeliveryTask.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/MessageDeliveryTask.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/MessageDeliveryTask.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import org.apache.cassandra.continuations.Suspendable;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class MessageDeliveryTask implements Runnable
+{
+    private Message message_;
+    private static Logger logger_ = Logger.getLogger(MessageDeliveryTask.class);    
+    
+    public MessageDeliveryTask(Message message)
+    {
+        message_ = message;    
+    }
+    
+    public void run()
+    { 
+        try
+        {            
+            String verb = message_.getVerb();                               
+            IVerbHandler verbHandler = MessagingService.getMessagingInstance().getVerbHandler(verb);           
+            if ( verbHandler != null )
+            {
+                verbHandler.doVerb(message_);        
+            }
+        }
+        catch (Throwable th)
+        {
+            logger_.warn( LogUtil.throwableToString(th) );
+        }
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/net/MessageDeserializationTask.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/MessageDeserializationTask.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/MessageDeserializationTask.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/MessageDeserializationTask.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.net.io.FastSerializer;
+import org.apache.cassandra.net.io.ISerializer;
+import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class MessageDeserializationTask implements Runnable
+{
+    private static Logger logger_ = Logger.getLogger(MessageDeserializationTask.class); 
+    private static ISerializer serializer_ = new FastSerializer();
+    private int serializerType_;
+    private byte[] bytes_ = new byte[0];    
+    
+    MessageDeserializationTask(int serializerType, byte[] bytes)
+    {
+        serializerType_ = serializerType;
+        bytes_ = bytes;        
+    }
+    
+    public void run()
+    {
+    	/* For DEBUG only. Printing queue length */   
+    	DebuggableThreadPoolExecutor es = (DebuggableThreadPoolExecutor)MessagingService.getDeserilizationExecutor();
+        logger_.debug( "Message Deserialization Task: " + (es.getTaskCount() - es.getCompletedTaskCount()) );
+        /* END DEBUG */
+        try
+        {                        
+            Message message = (Message)serializer_.deserialize(bytes_);                                                           
+            
+            if ( message != null )
+            {
+                message = SinkManager.processServerMessageSink(message);
+                MessagingService.receive(message);                                                                                                    
+            }
+        }
+        catch ( IOException ex )
+        {            
+            logger_.warn(LogUtil.throwableToString(ex));              
+        }
+    }
+
+}

Added: incubator/cassandra/src/org/apache/cassandra/net/MessageSerializationTask.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/MessageSerializationTask.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/MessageSerializationTask.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/MessageSerializationTask.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+import java.net.SocketException;
+
+import org.apache.cassandra.concurrent.Context;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.ThreadLocalContext;
+import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class MessageSerializationTask implements Runnable
+{
+    private static Logger logger_ = Logger.getLogger(MessageSerializationTask.class);
+    private Message message_;
+    private EndPoint to_;    
+    
+    public MessageSerializationTask(Message message, EndPoint to)
+    {
+        message_ = message;
+        to_ = to;        
+    }
+    
+    public Message getMessage()
+    {
+        return message_;
+    }
+
+    public void run()
+    {        
+    	/* For DEBUG only. Printing queue length */   
+    	DebuggableThreadPoolExecutor es = (DebuggableThreadPoolExecutor)MessagingService.getWriteExecutor();
+        logger_.debug( "Message Serialization Task: " + (es.getTaskCount() - es.getCompletedTaskCount()) );
+        /* END DEBUG */
+        
+        /* Adding the message to be serialized in the TLS. For accessing in the afterExecute() */
+        Context ctx = new Context();
+        ctx.put(this.getClass().getName(), message_);
+        ThreadLocalContext.put(ctx);
+        
+        TcpConnection connection = null;
+        try
+        {
+            Message message = SinkManager.processClientMessageSink(message_);
+            if(null == message) 
+                return;
+            connection = MessagingService.getConnection(message_.getFrom(), to_);
+            connection.write(message);            
+        }            
+        catch ( SocketException se )
+        {            
+            // Shutting down the entire pool. May be too conservative an approach.
+            MessagingService.getConnectionPool(message_.getFrom(), to_).shutdown();
+            logger_.warn(LogUtil.throwableToString(se));
+        }
+        catch ( IOException e )
+        {
+            logConnectAndIOException(e, connection);
+        }
+        catch (Throwable th)
+        {
+            logger_.warn(LogUtil.throwableToString(th));
+        }
+        finally
+        {
+            if ( connection != null )
+            {
+                connection.close();
+            }            
+        }
+    }
+    
+    private void logConnectAndIOException(IOException ex, TcpConnection connection)
+    {                    
+        if ( connection != null )
+        {
+            connection.errorClose();
+        }
+        logger_.warn(LogUtil.throwableToString(ex));
+    }
+}
+

Added: incubator/cassandra/src/org/apache/cassandra/net/MessagingConfig.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/MessagingConfig.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/MessagingConfig.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/MessagingConfig.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class MessagingConfig
+{
+    // The expected time for one message round trip.  It does not reflect message processing
+    // time at the receiver.
+    private static int expectedRoundTripTime_ = 400;
+    private static int numberOfPorts_ = 2;
+    private static int threadCount_ = 4;
+
+    public static int getMessagingThreadCount()
+    {
+        return threadCount_;
+    }
+
+    public static void setMessagingThreadCount(int threadCount)
+    {
+        threadCount_ = threadCount;
+    }
+
+    public static void setExpectedRoundTripTime(int roundTripTimeMillis) {
+    	if(roundTripTimeMillis > 0 )
+    		expectedRoundTripTime_ = roundTripTimeMillis;
+    }
+
+    public static int getExpectedRoundTripTime()
+    {
+        return expectedRoundTripTime_;
+    }
+
+    public static int getConnectionPoolInitialSize()
+    {
+        return ConnectionPoolConfiguration.initialSize_;
+    }
+
+    public static int getConnectionPoolGrowthFactor()
+    {
+        return ConnectionPoolConfiguration.growthFactor_;
+    }
+
+    public static int getConnectionPoolMaxSize()
+    {
+        return ConnectionPoolConfiguration.maxSize_;
+    }
+
+    public static int getConnectionPoolWaitTimeout()
+    {
+        return ConnectionPoolConfiguration.waitTimeout_;
+    }
+
+    public static int getConnectionPoolMonitorInterval()
+    {
+        return ConnectionPoolConfiguration.monitorInterval_;
+    }
+
+    public static void setNumberOfPorts(int n)
+    {
+        numberOfPorts_ = n;
+    }
+
+    public static int getNumberOfPorts()
+    {
+        return numberOfPorts_;
+    }
+}
+
+class ConnectionPoolConfiguration
+{
+    public static int initialSize_ = 1;
+    public static int growthFactor_ = 1;
+    public static int maxSize_ = 1;
+    public static int waitTimeout_ = 10;
+    public static int monitorInterval_ = 300;
+}