You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/07/30 17:30:27 UTC

svn commit: r799331 [24/29] - in /incubator/cassandra/trunk: ./ src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/gms/ src/...

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/MultiQuorumResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/MultiQuorumResponseHandler.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/MultiQuorumResponseHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/MultiQuorumResponseHandler.java Thu Jul 30 15:30:21 2009
@@ -1,252 +1,252 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.service;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.*;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ReadCommand;
-import org.apache.cassandra.db.Row;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.IAsyncCallback;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class MultiQuorumResponseHandler implements IAsyncCallback
-{ 
-    private static Logger logger_ = Logger.getLogger( QuorumResponseHandler.class );
-    private Lock lock_ = new ReentrantLock();
-    private Condition condition_;
-    /* This maps the keys to the original data read messages */
-    private Map<String, ReadCommand> readMessages_ = new HashMap<String, ReadCommand>();
-    /* This maps the key to its set of replicas */
-    private Map<String, EndPoint[]> endpoints_ = new HashMap<String, EndPoint[]>();
-    /* This maps the groupId to the individual callback for the set of messages */
-    private Map<String, SingleQuorumResponseHandler> handlers_ = new HashMap<String, SingleQuorumResponseHandler>();
-    /* This should hold all the responses for the keys */
-    private List<Row> responses_ = new ArrayList<Row>();
-    private AtomicBoolean done_ = new AtomicBoolean(false);
-    
-    /**
-     * This is used to handle the responses from the individual messages
-     * that are sent out to the replicas.
-     * @author alakshman
-     *
-    */
-    private class SingleQuorumResponseHandler implements IAsyncCallback
-    {
-        private Lock lock_ = new ReentrantLock();
-        private IResponseResolver<Row> responseResolver_;
-        private List<Message> responses_ = new ArrayList<Message>();
-        
-        SingleQuorumResponseHandler(IResponseResolver<Row> responseResolver)
-        {
-            responseResolver_ = responseResolver;
-        }
-        
-        public void attachContext(Object o)
-        {
-            throw new UnsupportedOperationException("This operation is not supported in this implementation");
-        }
-        
-        public void response(Message response)
-        {
-            lock_.lock();
-            try
-            {
-                responses_.add(response);
-                int majority = (DatabaseDescriptor.getReplicationFactor() >> 1) + 1;                            
-                if ( responses_.size() >= majority && responseResolver_.isDataPresent(responses_))
-                {
-                    onCompletion();               
-                }
-            }
-            catch ( IOException ex )
-            {
-                logger_.info( LogUtil.throwableToString(ex) );
-            }
-            finally
-            {
-                lock_.unlock();
-            }
-        }
-        
-        private void onCompletion() throws IOException
-        {
-            try
-            {
-                Row row = responseResolver_.resolve(responses_);
-                MultiQuorumResponseHandler.this.onCompleteResponse(row);
-            }
-            catch ( DigestMismatchException ex )
-            {
-                /* 
-                 * The DigestMismatchException has the key for which the mismatch
-                 * occurred bundled in it as context 
-                */
-                String key = ex.getMessage();
-                onDigestMismatch(key);
-            }
-        }
-        
-        /**
-         * This method is invoked on a digest match. We pass in the key
-         * in order to retrieve the appropriate data message that needs
-         * to be sent out to the replicas. 
-         * 
-         * @param key for which the mismatch occurred.
-        */
-        private void onDigestMismatch(String key) throws IOException
-        {
-            if ( DatabaseDescriptor.getConsistencyCheck())
-            {                                
-                ReadCommand readCommand = readMessages_.get(key);
-                readCommand.setDigestQuery(false);
-                Message messageRepair = readCommand.makeReadMessage();
-                EndPoint[] endpoints = MultiQuorumResponseHandler.this.endpoints_.get(readCommand.key);
-                Message[][] messages = new Message[][]{ {messageRepair, messageRepair, messageRepair} };
-                EndPoint[][] epList = new EndPoint[][]{ endpoints };
-                MessagingService.getMessagingInstance().sendRR(messages, epList, MultiQuorumResponseHandler.this);                
-            }
-        }
-    }
-    
-    public MultiQuorumResponseHandler(Map<String, ReadCommand> readMessages, Map<String, EndPoint[]> endpoints)
-    {        
-        condition_ = lock_.newCondition();
-        readMessages_ = readMessages;
-        endpoints_ = endpoints;
-    }
-    
-    public Row[] get() throws TimeoutException
-    {
-        long startTime = System.currentTimeMillis();
-        lock_.lock();
-        try
-        {            
-            boolean bVal = true;            
-            try
-            {
-                if ( !done_.get() )
-                {                   
-                    bVal = condition_.await(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
-                }
-            }
-            catch ( InterruptedException ex )
-            {
-                if (logger_.isDebugEnabled())
-                  logger_.debug( LogUtil.throwableToString(ex) );
-            }
-            
-            if ( !bVal && !done_.get() )
-            {
-                StringBuilder sb = new StringBuilder("");
-                for ( Row row : responses_ )
-                {
-                    sb.append(row.key());
-                    sb.append(":");
-                }                
-                throw new TimeoutException("Operation timed out - received only " +  responses_.size() + " responses from " + sb.toString() + " .");
-            }
-        }
-        finally
-        {
-            lock_.unlock();
-        }
-        
-        logger_.info("MultiQuorumResponseHandler: " + (System.currentTimeMillis() - startTime) + " ms.");
-        return responses_.toArray( new Row[0] );
-    }
-    
-    /**
-     * Invoked when a complete response has been obtained
-     * for one of the sub-groups a.k.a keys for the query 
-     * has been performed.
-     * 
-     * @param row obtained as a result of the response.
-     */
-    void onCompleteResponse(Row row)
-    {        
-        if ( !done_.get() )
-        {
-            responses_.add(row);
-            if ( responses_.size() == readMessages_.size() )
-            {
-                done_.set(true);
-                condition_.signal();                
-            }
-        }
-    }
-    
-    /**
-     * The handler of the response message that has been
-     * sent by one of the replicas for one of the keys.
-     * 
-     * @param message the response message for one of the
-     *        message that we sent out.
-     */
-    public void response(Message message)
-    {
-        lock_.lock();
-        try
-        {
-            SingleQuorumResponseHandler handler = handlers_.get(message.getMessageId());
-            handler.response(message);
-        }
-        finally
-        {
-            lock_.unlock();
-        }
-    }
-    
-    /**
-     * The context that is passed in for the query of
-     * multiple keys in the system. For each message 
-     * id in the context register a callback handler 
-     * for the same. This is done so that all responses
-     * for a given key use the same callback handler.
-     * 
-     * @param o the context which is an array of strings
-     *        corresponding to the message id's for each
-     *        key.
-     */
-    public void attachContext(Object o)
-    {
-        String[] gids = (String[])o;
-        for ( String gid : gids )
-        {
-            IResponseResolver<Row> responseResolver = new ReadResponseResolver();
-            SingleQuorumResponseHandler handler = new SingleQuorumResponseHandler(responseResolver);
-            handlers_.put(gid, handler);
-        }
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class MultiQuorumResponseHandler implements IAsyncCallback
+{ 
+    private static Logger logger_ = Logger.getLogger( QuorumResponseHandler.class );
+    private Lock lock_ = new ReentrantLock();
+    private Condition condition_;
+    /* This maps the keys to the original data read messages */
+    private Map<String, ReadCommand> readMessages_ = new HashMap<String, ReadCommand>();
+    /* This maps the key to its set of replicas */
+    private Map<String, EndPoint[]> endpoints_ = new HashMap<String, EndPoint[]>();
+    /* This maps the groupId to the individual callback for the set of messages */
+    private Map<String, SingleQuorumResponseHandler> handlers_ = new HashMap<String, SingleQuorumResponseHandler>();
+    /* This should hold all the responses for the keys */
+    private List<Row> responses_ = new ArrayList<Row>();
+    private AtomicBoolean done_ = new AtomicBoolean(false);
+    
+    /**
+     * This is used to handle the responses from the individual messages
+     * that are sent out to the replicas.
+     * @author alakshman
+     *
+    */
+    private class SingleQuorumResponseHandler implements IAsyncCallback
+    {
+        private Lock lock_ = new ReentrantLock();
+        private IResponseResolver<Row> responseResolver_;
+        private List<Message> responses_ = new ArrayList<Message>();
+        
+        SingleQuorumResponseHandler(IResponseResolver<Row> responseResolver)
+        {
+            responseResolver_ = responseResolver;
+        }
+        
+        public void attachContext(Object o)
+        {
+            throw new UnsupportedOperationException("This operation is not supported in this implementation");
+        }
+        
+        public void response(Message response)
+        {
+            lock_.lock();
+            try
+            {
+                responses_.add(response);
+                int majority = (DatabaseDescriptor.getReplicationFactor() >> 1) + 1;                            
+                if ( responses_.size() >= majority && responseResolver_.isDataPresent(responses_))
+                {
+                    onCompletion();               
+                }
+            }
+            catch ( IOException ex )
+            {
+                logger_.info( LogUtil.throwableToString(ex) );
+            }
+            finally
+            {
+                lock_.unlock();
+            }
+        }
+        
+        private void onCompletion() throws IOException
+        {
+            try
+            {
+                Row row = responseResolver_.resolve(responses_);
+                MultiQuorumResponseHandler.this.onCompleteResponse(row);
+            }
+            catch ( DigestMismatchException ex )
+            {
+                /* 
+                 * The DigestMismatchException has the key for which the mismatch
+                 * occurred bundled in it as context 
+                */
+                String key = ex.getMessage();
+                onDigestMismatch(key);
+            }
+        }
+        
+        /**
+         * This method is invoked on a digest match. We pass in the key
+         * in order to retrieve the appropriate data message that needs
+         * to be sent out to the replicas. 
+         * 
+         * @param key for which the mismatch occurred.
+        */
+        private void onDigestMismatch(String key) throws IOException
+        {
+            if ( DatabaseDescriptor.getConsistencyCheck())
+            {                                
+                ReadCommand readCommand = readMessages_.get(key);
+                readCommand.setDigestQuery(false);
+                Message messageRepair = readCommand.makeReadMessage();
+                EndPoint[] endpoints = MultiQuorumResponseHandler.this.endpoints_.get(readCommand.key);
+                Message[][] messages = new Message[][]{ {messageRepair, messageRepair, messageRepair} };
+                EndPoint[][] epList = new EndPoint[][]{ endpoints };
+                MessagingService.getMessagingInstance().sendRR(messages, epList, MultiQuorumResponseHandler.this);                
+            }
+        }
+    }
+    
+    public MultiQuorumResponseHandler(Map<String, ReadCommand> readMessages, Map<String, EndPoint[]> endpoints)
+    {        
+        condition_ = lock_.newCondition();
+        readMessages_ = readMessages;
+        endpoints_ = endpoints;
+    }
+    
+    public Row[] get() throws TimeoutException
+    {
+        long startTime = System.currentTimeMillis();
+        lock_.lock();
+        try
+        {            
+            boolean bVal = true;            
+            try
+            {
+                if ( !done_.get() )
+                {                   
+                    bVal = condition_.await(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
+                }
+            }
+            catch ( InterruptedException ex )
+            {
+                if (logger_.isDebugEnabled())
+                  logger_.debug( LogUtil.throwableToString(ex) );
+            }
+            
+            if ( !bVal && !done_.get() )
+            {
+                StringBuilder sb = new StringBuilder("");
+                for ( Row row : responses_ )
+                {
+                    sb.append(row.key());
+                    sb.append(":");
+                }                
+                throw new TimeoutException("Operation timed out - received only " +  responses_.size() + " responses from " + sb.toString() + " .");
+            }
+        }
+        finally
+        {
+            lock_.unlock();
+        }
+        
+        logger_.info("MultiQuorumResponseHandler: " + (System.currentTimeMillis() - startTime) + " ms.");
+        return responses_.toArray( new Row[0] );
+    }
+    
+    /**
+     * Invoked when a complete response has been obtained
+     * for one of the sub-groups a.k.a keys for the query 
+     * has been performed.
+     * 
+     * @param row obtained as a result of the response.
+     */
+    void onCompleteResponse(Row row)
+    {        
+        if ( !done_.get() )
+        {
+            responses_.add(row);
+            if ( responses_.size() == readMessages_.size() )
+            {
+                done_.set(true);
+                condition_.signal();                
+            }
+        }
+    }
+    
+    /**
+     * The handler of the response message that has been
+     * sent by one of the replicas for one of the keys.
+     * 
+     * @param message the response message for one of the
+     *        message that we sent out.
+     */
+    public void response(Message message)
+    {
+        lock_.lock();
+        try
+        {
+            SingleQuorumResponseHandler handler = handlers_.get(message.getMessageId());
+            handler.response(message);
+        }
+        finally
+        {
+            lock_.unlock();
+        }
+    }
+    
+    /**
+     * The context that is passed in for the query of
+     * multiple keys in the system. For each message 
+     * id in the context register a callback handler 
+     * for the same. This is done so that all responses
+     * for a given key use the same callback handler.
+     * 
+     * @param o the context which is an array of strings
+     *        corresponding to the message id's for each
+     *        key.
+     */
+    public void attachContext(Object o)
+    {
+        String[] gids = (String[])o;
+        for ( String gid : gids )
+        {
+            IResponseResolver<Row> responseResolver = new ReadResponseResolver();
+            SingleQuorumResponseHandler handler = new SingleQuorumResponseHandler(responseResolver);
+            handlers_.put(gid, handler);
+        }
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java Thu Jul 30 15:30:21 2009
@@ -1,126 +1,126 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.service;
-
-import java.util.List;
-import java.util.ArrayList;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.*;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.net.IAsyncCallback;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class QuorumResponseHandler<T> implements IAsyncCallback
-{
-    private static Logger logger_ = Logger.getLogger( QuorumResponseHandler.class );
-    private Lock lock_ = new ReentrantLock();
-    private Condition condition_;
-    private int responseCount_;
-    private List<Message> responses_ = new ArrayList<Message>();
-    private IResponseResolver<T> responseResolver_;
-    private AtomicBoolean done_ = new AtomicBoolean(false);
-
-    public QuorumResponseHandler(int responseCount, IResponseResolver<T> responseResolver) throws InvalidRequestException
-    {
-        if (responseCount > DatabaseDescriptor.getReplicationFactor())
-            throw new InvalidRequestException("Cannot block for more than the replication factor of " + DatabaseDescriptor.getReplicationFactor());
-        if (responseCount < 1)
-            throw new InvalidRequestException("Cannot block for less than one replica");
-        condition_ = lock_.newCondition();
-        responseCount_ = responseCount;
-        responseResolver_ =  responseResolver;
-    }
-    
-    public T get() throws TimeoutException, DigestMismatchException
-    {
-    	lock_.lock();
-        try
-        {            
-            boolean bVal = true;            
-            try
-            {
-            	if ( !done_.get() )
-                {            		
-            		bVal = condition_.await(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
-                }
-            }
-            catch ( InterruptedException ex )
-            {
-                if (logger_.isDebugEnabled())
-                  logger_.debug( LogUtil.throwableToString(ex) );
-            }
-            
-            if ( !bVal && !done_.get() )
-            {
-                StringBuilder sb = new StringBuilder("");
-                for ( Message message : responses_ )
-                {
-                    sb.append(message.getFrom());                    
-                }                
-                throw new TimeoutException("Operation timed out - received only " +  responses_.size() + " responses from " + sb.toString() + " .");
-            }
-        }
-        finally
-        {
-            lock_.unlock();
-            for(Message response : responses_)
-            {
-            	MessagingService.removeRegisteredCallback( response.getMessageId() );
-            }
-        }
-
-    	return responseResolver_.resolve( responses_);
-    }
-    
-    public void response(Message message)
-    {
-        lock_.lock();
-        try
-        {            
-            if ( !done_.get() )
-            {
-            	responses_.add( message );
-            	if ( responses_.size() >= responseCount_ && responseResolver_.isDataPresent(responses_))
-            	{
-            		done_.set(true);
-            		condition_.signal();            	
-            	}
-            }
-        }
-        finally
-        {
-            lock_.unlock();
-        }
-    }
-    
-    public void attachContext(Object o)
-    {
-        throw new UnsupportedOperationException("This operation is not supported in this version of the callback handler");
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class QuorumResponseHandler<T> implements IAsyncCallback
+{
+    private static Logger logger_ = Logger.getLogger( QuorumResponseHandler.class );
+    private Lock lock_ = new ReentrantLock();
+    private Condition condition_;
+    private int responseCount_;
+    private List<Message> responses_ = new ArrayList<Message>();
+    private IResponseResolver<T> responseResolver_;
+    private AtomicBoolean done_ = new AtomicBoolean(false);
+
+    public QuorumResponseHandler(int responseCount, IResponseResolver<T> responseResolver) throws InvalidRequestException
+    {
+        if (responseCount > DatabaseDescriptor.getReplicationFactor())
+            throw new InvalidRequestException("Cannot block for more than the replication factor of " + DatabaseDescriptor.getReplicationFactor());
+        if (responseCount < 1)
+            throw new InvalidRequestException("Cannot block for less than one replica");
+        condition_ = lock_.newCondition();
+        responseCount_ = responseCount;
+        responseResolver_ =  responseResolver;
+    }
+    
+    public T get() throws TimeoutException, DigestMismatchException
+    {
+    	lock_.lock();
+        try
+        {            
+            boolean bVal = true;            
+            try
+            {
+            	if ( !done_.get() )
+                {            		
+            		bVal = condition_.await(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
+                }
+            }
+            catch ( InterruptedException ex )
+            {
+                if (logger_.isDebugEnabled())
+                  logger_.debug( LogUtil.throwableToString(ex) );
+            }
+            
+            if ( !bVal && !done_.get() )
+            {
+                StringBuilder sb = new StringBuilder("");
+                for ( Message message : responses_ )
+                {
+                    sb.append(message.getFrom());                    
+                }                
+                throw new TimeoutException("Operation timed out - received only " +  responses_.size() + " responses from " + sb.toString() + " .");
+            }
+        }
+        finally
+        {
+            lock_.unlock();
+            for(Message response : responses_)
+            {
+            	MessagingService.removeRegisteredCallback( response.getMessageId() );
+            }
+        }
+
+    	return responseResolver_.resolve( responses_);
+    }
+    
+    public void response(Message message)
+    {
+        lock_.lock();
+        try
+        {            
+            if ( !done_.get() )
+            {
+            	responses_.add( message );
+            	if ( responses_.size() >= responseCount_ && responseResolver_.isDataPresent(responses_))
+            	{
+            		done_.set(true);
+            		condition_.signal();            	
+            	}
+            }
+        }
+        finally
+        {
+            lock_.unlock();
+        }
+    }
+    
+    public void attachContext(Object o)
+    {
+        throw new UnsupportedOperationException("This operation is not supported in this version of the callback handler");
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java Thu Jul 30 15:30:21 2009
@@ -1,125 +1,125 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.service;
-
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.concurrent.locks.*;
-
-import org.apache.cassandra.db.Column;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.RowMutation;
-import org.apache.cassandra.db.RowMutationMessage;
-import org.apache.cassandra.db.SuperColumn;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.Header;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.Cachetable;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.ICacheExpungeHook;
-import org.apache.cassandra.utils.ICachetable;
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
-
-
-
-/*
- * This class manages the read repairs . This is a singleton class
- * it basically uses the cache table construct to schedule writes that have to be 
- * made for read repairs. 
- * A cachetable is created which wakes up every n  milliseconds specified by 
- * expirationTimeInMillis and calls a global hook function on pending entries 
- * This function basically sends the message to the appropriate servers to update them
- * with the latest changes.
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-class ReadRepairManager
-{
-    private static Logger logger_ = Logger.getLogger(ReadRepairManager.class);
-	private static final long expirationTimeInMillis = 2000;
-	private static Lock lock_ = new ReentrantLock();
-	private static ReadRepairManager self_ = null;
-
-	/*
-	 * This is the internal class which actually
-	 * implements the global hook function called by the read repair manager
-	 */
-	static class ReadRepairPerformer implements
-			ICacheExpungeHook<String, Message>
-	{
-		/*
-		 * The hook function which takes the end point and the row mutation that 
-		 * needs to be sent to the end point in order 
-		 * to perform read repair.
-		 */
-		public void callMe(String target,
-				Message message)
-		{
-			String[] pieces = FBUtilities.strip(target, ":");
-			EndPoint to = new EndPoint(pieces[0], Integer.parseInt(pieces[1]));
-			MessagingService.getMessagingInstance().sendOneWay(message, to);			
-		}
-
-	}
-
-	private ICachetable<String, Message> readRepairTable_ = new Cachetable<String, Message>(expirationTimeInMillis, new ReadRepairManager.ReadRepairPerformer());
-
-	protected ReadRepairManager()
-	{
-
-	}
-
-	public  static ReadRepairManager instance()
-	{
-		if (self_ == null)
-		{
-            lock_.lock();
-            try
-            {
-                if ( self_ == null )
-                    self_ = new ReadRepairManager();
-            }
-            finally
-            {
-                lock_.unlock();
-            }
-		}
-		return self_;
-	}
-
-	/*
-	 * Schedules a read repair.
-	 * @param target endpoint on which the read repair should happen
-	 * @param rowMutationMessage the row mutation message that has the repaired row.
-	 */
-	public void schedule(EndPoint target, RowMutationMessage rowMutationMessage)
-	{
-        try
-        {
-            Message message = rowMutationMessage.makeRowMutationMessage(StorageService.readRepairVerbHandler_);
-    		String key = target + ":" + message.getMessageId();
-    		readRepairTable_.put(key, message);
-        }
-        catch ( IOException ex )
-        {
-            logger_.error(LogUtil.throwableToString(ex));
-        }
-	}
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.concurrent.locks.*;
+
+import org.apache.cassandra.db.Column;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.RowMutationMessage;
+import org.apache.cassandra.db.SuperColumn;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Header;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.Cachetable;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.ICacheExpungeHook;
+import org.apache.cassandra.utils.ICachetable;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+
+/*
+ * This class manages the read repairs . This is a singleton class
+ * it basically uses the cache table construct to schedule writes that have to be 
+ * made for read repairs. 
+ * A cachetable is created which wakes up every n  milliseconds specified by 
+ * expirationTimeInMillis and calls a global hook function on pending entries 
+ * This function basically sends the message to the appropriate servers to update them
+ * with the latest changes.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+class ReadRepairManager
+{
+    private static Logger logger_ = Logger.getLogger(ReadRepairManager.class);
+	private static final long expirationTimeInMillis = 2000;
+	private static Lock lock_ = new ReentrantLock();
+	private static ReadRepairManager self_ = null;
+
+	/*
+	 * This is the internal class which actually
+	 * implements the global hook function called by the read repair manager
+	 */
+	static class ReadRepairPerformer implements
+			ICacheExpungeHook<String, Message>
+	{
+		/*
+		 * The hook function which takes the end point and the row mutation that 
+		 * needs to be sent to the end point in order 
+		 * to perform read repair.
+		 */
+		public void callMe(String target,
+				Message message)
+		{
+			String[] pieces = FBUtilities.strip(target, ":");
+			EndPoint to = new EndPoint(pieces[0], Integer.parseInt(pieces[1]));
+			MessagingService.getMessagingInstance().sendOneWay(message, to);			
+		}
+
+	}
+
+	private ICachetable<String, Message> readRepairTable_ = new Cachetable<String, Message>(expirationTimeInMillis, new ReadRepairManager.ReadRepairPerformer());
+
+	protected ReadRepairManager()
+	{
+
+	}
+
+	public  static ReadRepairManager instance()
+	{
+		if (self_ == null)
+		{
+            lock_.lock();
+            try
+            {
+                if ( self_ == null )
+                    self_ = new ReadRepairManager();
+            }
+            finally
+            {
+                lock_.unlock();
+            }
+		}
+		return self_;
+	}
+
+	/*
+	 * Schedules a read repair.
+	 * @param target endpoint on which the read repair should happen
+	 * @param rowMutationMessage the row mutation message that has the repaired row.
+	 */
+	public void schedule(EndPoint target, RowMutationMessage rowMutationMessage)
+	{
+        try
+        {
+            Message message = rowMutationMessage.makeRowMutationMessage(StorageService.readRepairVerbHandler_);
+    		String key = target + ":" + message.getMessageId();
+    		readRepairTable_.put(key, message);
+        }
+        catch ( IOException ex )
+        {
+            logger_.error(LogUtil.throwableToString(ex));
+        }
+	}
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java Thu Jul 30 15:30:21 2009
@@ -1,177 +1,177 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.service;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.ReadResponse;
-import org.apache.cassandra.db.Row;
-import org.apache.cassandra.db.RowMutation;
-import org.apache.cassandra.db.RowMutationMessage;
-import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
-
-
-/**
- * This class is used by all read functions and is called by the Quorum 
- * when at least a few of the servers (few is specified in Quorum)
- * have sent the response . The resolve function then schedules read repair 
- * and resolution of read data from the various servers.
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-public class ReadResponseResolver implements IResponseResolver<Row>
-{
-	private static Logger logger_ = Logger.getLogger(WriteResponseResolver.class);
-
-	/*
-	 * This method for resolving read data should look at the timestamps of each
-	 * of the columns that are read and should pick up columns with the latest
-	 * timestamp. For those columns where the timestamp is not the latest a
-	 * repair request should be scheduled.
-	 * 
-	 */
-	public Row resolve(List<Message> responses) throws DigestMismatchException
-	{
-        long startTime = System.currentTimeMillis();
-		Row retRow = null;
-		List<Row> rowList = new ArrayList<Row>();
-		List<EndPoint> endPoints = new ArrayList<EndPoint>();
-		String key = null;
-		String table = null;
-		byte[] digest = new byte[0];
-		boolean isDigestQuery = false;
-        
-        /*
-		 * Populate the list of rows from each of the messages
-		 * Check to see if there is a digest query. If a digest 
-         * query exists then we need to compare the digest with 
-         * the digest of the data that is received.
-        */
-        DataInputBuffer bufIn = new DataInputBuffer();
-		for (Message response : responses)
-		{					            
-            byte[] body = response.getMessageBody();
-            bufIn.reset(body, body.length);
-            try
-            {
-                long start = System.currentTimeMillis();
-                ReadResponse result = ReadResponse.serializer().deserialize(bufIn);
-                if (logger_.isDebugEnabled())
-                  logger_.debug( "Response deserialization time : " + (System.currentTimeMillis() - start) + " ms.");
-    			if(!result.isDigestQuery())
-    			{
-    				rowList.add(result.row());
-    				endPoints.add(response.getFrom());
-    				key = result.row().key();
-    				table = result.row().getTable();
-    			}
-    			else
-    			{
-    				digest = result.digest();
-    				isDigestQuery = true;
-    			}
-            }
-            catch( IOException ex )
-            {
-                logger_.info(LogUtil.throwableToString(ex));
-            }
-		}
-		// If there was a digest query compare it with all the data digests 
-		// If there is a mismatch then throw an exception so that read repair can happen.
-		if(isDigestQuery)
-		{
-			for(Row row: rowList)
-			{
-				if( !Arrays.equals(row.digest(), digest) )
-				{
-                    /* Wrap the key as the context in this exception */
-					throw new DigestMismatchException(row.key());
-				}
-			}
-		}
-		
-        /* If the rowList is empty then we had some exception above. */
-        if ( rowList.size() == 0 )
-        {
-            return retRow;
-        }
-        
-        /* Now calculate the resolved row */
-		retRow = new Row(table, key);
-		for (int i = 0 ; i < rowList.size(); i++)
-		{
-			retRow.repair(rowList.get(i));			
-		}
-
-        // At  this point  we have the return row .
-		// Now we need to calculate the difference 
-		// so that we can schedule read repairs 
-		for (int i = 0 ; i < rowList.size(); i++)
-		{
-			// since retRow is the resolved row it can be used as the super set
-			Row diffRow = rowList.get(i).diff(retRow);
-			if(diffRow == null) // no repair needs to happen
-				continue;
-			// create the row mutation message based on the diff and schedule a read repair 
-			RowMutation rowMutation = new RowMutation(table, key);            			
-	        for (ColumnFamily cf : diffRow.getColumnFamilies())
-	        {
-	            rowMutation.add(cf);
-	        }
-            RowMutationMessage rowMutationMessage = new RowMutationMessage(rowMutation);
-	        ReadRepairManager.instance().schedule(endPoints.get(i),rowMutationMessage);
-		}
-        logger_.info("resolve: " + (System.currentTimeMillis() - startTime) + " ms.");
-		return retRow;
-	}
-
-	public boolean isDataPresent(List<Message> responses)
-	{
-		boolean isDataPresent = false;
-		for (Message response : responses)
-		{
-            byte[] body = response.getMessageBody();
-			DataInputBuffer bufIn = new DataInputBuffer();
-            bufIn.reset(body, body.length);
-            try
-            {
-    			ReadResponse result = ReadResponse.serializer().deserialize(bufIn);
-    			if(!result.isDigestQuery())
-    			{
-    				isDataPresent = true;
-    			}
-                bufIn.close();
-            }
-            catch(IOException ex)
-            {
-                logger_.info(LogUtil.throwableToString(ex));
-            }                        
-		}
-		return isDataPresent;
-	}
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ReadResponse;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.RowMutationMessage;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+/**
+ * This class is used by all read functions and is called by the Quorum 
+ * when at least a few of the servers (few is specified in Quorum)
+ * have sent the response . The resolve function then schedules read repair 
+ * and resolution of read data from the various servers.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public class ReadResponseResolver implements IResponseResolver<Row>
+{
+	private static Logger logger_ = Logger.getLogger(WriteResponseResolver.class);
+
+	/*
+	 * This method for resolving read data should look at the timestamps of each
+	 * of the columns that are read and should pick up columns with the latest
+	 * timestamp. For those columns where the timestamp is not the latest a
+	 * repair request should be scheduled.
+	 * 
+	 */
+	public Row resolve(List<Message> responses) throws DigestMismatchException
+	{
+        long startTime = System.currentTimeMillis();
+		Row retRow = null;
+		List<Row> rowList = new ArrayList<Row>();
+		List<EndPoint> endPoints = new ArrayList<EndPoint>();
+		String key = null;
+		String table = null;
+		byte[] digest = new byte[0];
+		boolean isDigestQuery = false;
+        
+        /*
+		 * Populate the list of rows from each of the messages
+		 * Check to see if there is a digest query. If a digest 
+         * query exists then we need to compare the digest with 
+         * the digest of the data that is received.
+        */
+        DataInputBuffer bufIn = new DataInputBuffer();
+		for (Message response : responses)
+		{					            
+            byte[] body = response.getMessageBody();
+            bufIn.reset(body, body.length);
+            try
+            {
+                long start = System.currentTimeMillis();
+                ReadResponse result = ReadResponse.serializer().deserialize(bufIn);
+                if (logger_.isDebugEnabled())
+                  logger_.debug( "Response deserialization time : " + (System.currentTimeMillis() - start) + " ms.");
+    			if(!result.isDigestQuery())
+    			{
+    				rowList.add(result.row());
+    				endPoints.add(response.getFrom());
+    				key = result.row().key();
+    				table = result.row().getTable();
+    			}
+    			else
+    			{
+    				digest = result.digest();
+    				isDigestQuery = true;
+    			}
+            }
+            catch( IOException ex )
+            {
+                logger_.info(LogUtil.throwableToString(ex));
+            }
+		}
+		// If there was a digest query compare it with all the data digests 
+		// If there is a mismatch then throw an exception so that read repair can happen.
+		if(isDigestQuery)
+		{
+			for(Row row: rowList)
+			{
+				if( !Arrays.equals(row.digest(), digest) )
+				{
+                    /* Wrap the key as the context in this exception */
+					throw new DigestMismatchException(row.key());
+				}
+			}
+		}
+		
+        /* If the rowList is empty then we had some exception above. */
+        if ( rowList.size() == 0 )
+        {
+            return retRow;
+        }
+        
+        /* Now calculate the resolved row */
+		retRow = new Row(table, key);
+		for (int i = 0 ; i < rowList.size(); i++)
+		{
+			retRow.repair(rowList.get(i));			
+		}
+
+        // At  this point  we have the return row .
+		// Now we need to calculate the difference 
+		// so that we can schedule read repairs 
+		for (int i = 0 ; i < rowList.size(); i++)
+		{
+			// since retRow is the resolved row it can be used as the super set
+			Row diffRow = rowList.get(i).diff(retRow);
+			if(diffRow == null) // no repair needs to happen
+				continue;
+			// create the row mutation message based on the diff and schedule a read repair 
+			RowMutation rowMutation = new RowMutation(table, key);            			
+	        for (ColumnFamily cf : diffRow.getColumnFamilies())
+	        {
+	            rowMutation.add(cf);
+	        }
+            RowMutationMessage rowMutationMessage = new RowMutationMessage(rowMutation);
+	        ReadRepairManager.instance().schedule(endPoints.get(i),rowMutationMessage);
+		}
+        logger_.info("resolve: " + (System.currentTimeMillis() - startTime) + " ms.");
+		return retRow;
+	}
+
+	public boolean isDataPresent(List<Message> responses)
+	{
+		boolean isDataPresent = false;
+		for (Message response : responses)
+		{
+            byte[] body = response.getMessageBody();
+			DataInputBuffer bufIn = new DataInputBuffer();
+            bufIn.reset(body, body.length);
+            try
+            {
+    			ReadResponse result = ReadResponse.serializer().deserialize(bufIn);
+    			if(!result.isDigestQuery())
+    			{
+    				isDataPresent = true;
+    			}
+                bufIn.close();
+            }
+            catch(IOException ex)
+            {
+                logger_.info(LogUtil.throwableToString(ex));
+            }                        
+		}
+		return isDataPresent;
+	}
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java Thu Jul 30 15:30:21 2009
@@ -1,403 +1,403 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.service;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.log4j.Logger;
-
-import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
-import org.apache.cassandra.concurrent.SingleThreadedStage;
-import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.concurrent.ThreadFactoryImpl;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.gms.ApplicationState;
-import org.apache.cassandra.gms.EndPointState;
-import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.gms.IEndPointStateChangeSubscriber;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-
-/*
- * The load balancing algorithm here is an implementation of
- * the algorithm as described in the paper "Scalable range query
- * processing for large-scale distributed database applications".
- * This class keeps track of load information across the system.
- * It registers itself with the Gossiper for ApplicationState namely
- * load information i.e number of requests processed w.r.t distinct
- * keys at an Endpoint. Monitor load information for a 5 minute
- * interval and then do load balancing operations if necessary.
- * 
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-final class StorageLoadBalancer implements IEndPointStateChangeSubscriber
-{
-    class LoadBalancer implements Runnable
-    {
-        LoadBalancer()
-        {
-            /* Copy the entries in loadInfo_ into loadInfo2_ and use it for all calculations */
-            loadInfo2_.putAll(loadInfo_);
-        }
-
-        /**
-         * Obtain a node which is a potential target. Start with
-         * the neighbours i.e either successor or predecessor.
-         * Send the target a MoveMessage. If the node cannot be
-         * relocated on the ring then we pick another candidate for
-         * relocation.
-        */        
-        public void run()
-        {
-            /*
-            int threshold = (int)(StorageLoadBalancer.ratio_ * averageSystemLoad());
-            int myLoad = localLoad();            
-            EndPoint predecessor = storageService_.getPredecessor(StorageService.getLocalStorageEndPoint());
-            if (logger_.isDebugEnabled())
-              logger_.debug("Trying to relocate the predecessor " + predecessor);
-            boolean value = tryThisNode(myLoad, threshold, predecessor);
-            if ( !value )
-            {
-                loadInfo2_.remove(predecessor);
-                EndPoint successor = storageService_.getSuccessor(StorageService.getLocalStorageEndPoint());
-                if (logger_.isDebugEnabled())
-                  logger_.debug("Trying to relocate the successor " + successor);
-                value = tryThisNode(myLoad, threshold, successor);
-                if ( !value )
-                {
-                    loadInfo2_.remove(successor);
-                    while ( !loadInfo2_.isEmpty() )
-                    {
-                        EndPoint target = findARandomLightNode();
-                        if ( target != null )
-                        {
-                            if (logger_.isDebugEnabled())
-                              logger_.debug("Trying to relocate the random node " + target);
-                            value = tryThisNode(myLoad, threshold, target);
-                            if ( !value )
-                            {
-                                loadInfo2_.remove(target);
-                            }
-                            else
-                            {
-                                break;
-                            }
-                        }
-                        else
-                        {
-                            // No light nodes available - this is NOT good.
-                            logger_.warn("Not even a single lightly loaded node is available ...");
-                            break;
-                        }
-                    }
-
-                    loadInfo2_.clear();                    
-                     // If we are here and no node was available to
-                     // perform load balance with we need to report and bail.                    
-                    if ( !value )
-                    {
-                        logger_.warn("Load Balancing operations weren't performed for this node");
-                    }
-                }                
-            }
-            */        
-        }
-
-        /*
-        private boolean tryThisNode(int myLoad, int threshold, EndPoint target)
-        {
-            boolean value = false;
-            LoadInfo li = loadInfo2_.get(target);
-            int pLoad = li.count();
-            if ( ((myLoad + pLoad) >> 1) <= threshold )
-            {
-                //calculate the number of keys to be transferred
-                int keyCount = ( (myLoad - pLoad) >> 1 );
-                if (logger_.isDebugEnabled())
-                  logger_.debug("Number of keys we attempt to transfer to " + target + " " + keyCount);
-                // Determine the token that the target should join at.         
-                BigInteger targetToken = BootstrapAndLbHelper.getTokenBasedOnPrimaryCount(keyCount);
-                // Send a MoveMessage and see if this node is relocateable
-                MoveMessage moveMessage = new MoveMessage(targetToken);
-                Message message = new Message(StorageService.getLocalStorageEndPoint(), StorageLoadBalancer.lbStage_, StorageLoadBalancer.moveMessageVerbHandler_, new Object[]{moveMessage});
-                if (logger_.isDebugEnabled())
-                  logger_.debug("Sending a move message to " + target);
-                IAsyncResult result = MessagingService.getMessagingInstance().sendRR(message, target);
-                value = (Boolean)result.get()[0];
-                if (logger_.isDebugEnabled())
-                  logger_.debug("Response for query to relocate " + target + " is " + value);
-            }
-            return value;
-        }
-        */
-    }
-
-    class MoveMessageVerbHandler implements IVerbHandler
-    {
-        public void doVerb(Message message)
-        {
-            Message reply = message.getReply(StorageService.getLocalStorageEndPoint(), new byte[] {(byte)(isMoveable_.get() ? 1 : 0)});
-            MessagingService.getMessagingInstance().sendOneWay(reply, message.getFrom());
-            if ( isMoveable_.get() )
-            {
-                // MoveMessage moveMessage = (MoveMessage)message.getMessageBody()[0];
-                /* Start the leave operation and join the ring at the position specified */
-                isMoveable_.set(false);
-            }
-        }
-    }
-
-    private static final Logger logger_ = Logger.getLogger(StorageLoadBalancer.class);
-    private static final String lbStage_ = "LOAD-BALANCER-STAGE";
-    private static final String moveMessageVerbHandler_ = "MOVE-MESSAGE-VERB-HANDLER";
-    /* time to delay in minutes the actual load balance procedure if heavily loaded */
-    private static final int delay_ = 5;
-    /* Ratio of highest loaded node and the average load. */
-    private static final double ratio_ = 1.5;
-
-    private StorageService storageService_;
-    /* this indicates whether this node is already helping someone else */
-    private AtomicBoolean isMoveable_ = new AtomicBoolean(false);
-    private Map<EndPoint, LoadInfo> loadInfo_ = new HashMap<EndPoint, LoadInfo>();
-    /* This map is a clone of the one above and is used for various calculations during LB operation */
-    private Map<EndPoint, LoadInfo> loadInfo2_ = new HashMap<EndPoint, LoadInfo>();
-    /* This thread pool is used for initiating load balancing operations */
-    private ScheduledThreadPoolExecutor lb_ = new DebuggableScheduledThreadPoolExecutor(
-            1,
-            new ThreadFactoryImpl("LB-OPERATIONS")
-            );
-    /* This thread pool is used by target node to leave the ring. */
-    private ExecutorService lbOperations_ = new DebuggableThreadPoolExecutor("LB-TARGET");
-
-    StorageLoadBalancer(StorageService storageService)
-    {
-        storageService_ = storageService;
-        /* register the load balancer stage */
-        StageManager.registerStage(StorageLoadBalancer.lbStage_, new SingleThreadedStage(StorageLoadBalancer.lbStage_));
-        /* register the load balancer verb handler */
-        MessagingService.getMessagingInstance().registerVerbHandlers(StorageLoadBalancer.moveMessageVerbHandler_, new MoveMessageVerbHandler());
-    }
-
-    public void start()
-    {
-        /* Register with the Gossiper for EndPointState notifications */
-        Gossiper.instance().register(this);
-    }
-
-    public void onChange(EndPoint endpoint, EndPointState epState)
-    {
-        // load information for this specified endpoint for load balancing 
-        ApplicationState loadInfoState = epState.getApplicationState(LoadDisseminator.loadInfo_);
-        if ( loadInfoState != null )
-        {
-            String lInfoState = loadInfoState.getState();
-            LoadInfo lInfo = new LoadInfo(lInfoState);
-            loadInfo_.put(endpoint, lInfo);
-            
-            /*
-            int currentLoad = Integer.parseInt(loadInfoState.getState());
-            // update load information for this endpoint
-            loadInfo_.put(endpoint, currentLoad);
-
-            // clone load information to perform calculations
-            loadInfo2_.putAll(loadInfo_);
-            // Perform the analysis for load balance operations
-            if ( isHeavyNode() )
-            {
-                if (logger_.isDebugEnabled())
-                  logger_.debug(StorageService.getLocalStorageEndPoint() + " is a heavy node with load " + localLoad());
-                // lb_.schedule( new LoadBalancer(), StorageLoadBalancer.delay_, TimeUnit.MINUTES );
-            }
-            */
-        }       
-    }
-
-    /*
-     * Load information associated with a given endpoint.
-    */
-    LoadInfo getLoad(EndPoint ep)
-    {
-        LoadInfo li = loadInfo_.get(ep);        
-        return li;        
-    }
-
-    /*
-    private boolean isMoveable()
-    {
-        if ( !isMoveable_.get() )
-            return false;
-        int myload = localLoad();
-        EndPoint successor = storageService_.getSuccessor(StorageService.getLocalStorageEndPoint());
-        LoadInfo li = loadInfo2_.get(successor);
-        // "load" is NULL means that the successor node has not
-        // yet gossiped its load information. We should return
-        // false in this case since we want to err on the side
-        // of caution.
-        if ( li == null )
-            return false;
-        else
-        {            
-            if ( ( myload + li.count() ) > StorageLoadBalancer.ratio_*averageSystemLoad() )
-                return false;
-            else
-                return true;
-        }
-    }
-    */
-
-    /*
-    private int localLoad()
-    {
-        LoadInfo value = loadInfo2_.get(StorageService.getLocalStorageEndPoint());
-        return (value == null) ? 0 : value.count();
-    }
-    */
-
-    /*
-    private int averageSystemLoad()
-    {
-        int nodeCount = loadInfo2_.size();
-        Set<EndPoint> nodes = loadInfo2_.keySet();
-
-        int systemLoad = 0;
-        for ( EndPoint node : nodes )
-        {
-            LoadInfo load = loadInfo2_.get(node);
-            if ( load != null )
-                systemLoad += load.count();
-        }
-        int averageLoad = (nodeCount > 0) ? (systemLoad / nodeCount) : 0;
-        if (logger_.isDebugEnabled())
-          logger_.debug("Average system load should be " + averageLoad);
-        return averageLoad;
-    }
-    */
-    
-    /*
-    private boolean isHeavyNode()
-    {
-        return ( localLoad() > ( StorageLoadBalancer.ratio_ * averageSystemLoad() ) );
-    }
-    */
-    
-    /*
-    private boolean isMoveable(EndPoint target)
-    {
-        int threshold = (int)(StorageLoadBalancer.ratio_ * averageSystemLoad());
-        if ( isANeighbour(target) )
-        {
-            // If the target is a neighbour then it is
-            // moveable if its
-            LoadInfo load = loadInfo2_.get(target);
-            if ( load == null )
-                return false;
-            else
-            {
-                int myload = localLoad();
-                int avgLoad = (load.count() + myload) >> 1;
-                if ( avgLoad <= threshold )
-                    return true;
-                else
-                    return false;
-            }
-        }
-        else
-        {
-            EndPoint successor = storageService_.getSuccessor(target);
-            LoadInfo sLoad = loadInfo2_.get(successor);
-            LoadInfo targetLoad = loadInfo2_.get(target);
-            if ( (sLoad.count() + targetLoad.count()) > threshold )
-                return false;
-            else
-                return true;
-        }
-    }
-    */
-
-    private boolean isANeighbour(EndPoint neighbour)
-    {
-        EndPoint predecessor = storageService_.getPredecessor(StorageService.getLocalStorageEndPoint());
-        if ( predecessor.equals(neighbour) )
-            return true;
-
-        EndPoint successor = storageService_.getSuccessor(StorageService.getLocalStorageEndPoint());
-        if ( successor.equals(neighbour) )
-            return true;
-
-        return false;
-    }
-
-    /*
-     * Determine the nodes that are lightly loaded. Choose at
-     * random one of the lightly loaded nodes and use them as
-     * a potential target for load balance.
-    */
-    /*
-    private EndPoint findARandomLightNode()
-    {
-        List<EndPoint> potentialCandidates = new ArrayList<EndPoint>();
-        Set<EndPoint> allTargets = loadInfo2_.keySet();
-        int avgLoad =  averageSystemLoad();
-
-        for( EndPoint target : allTargets )
-        {
-            LoadInfo load = loadInfo2_.get(target);
-            if ( load.count() < avgLoad )
-                potentialCandidates.add(target);
-        }
-
-        if ( potentialCandidates.size() > 0 )
-        {
-            Random random = new Random();
-            int index = random.nextInt(potentialCandidates.size());
-            return potentialCandidates.get(index);
-        }
-        return null;
-    }
-    */
-}
-
-class MoveMessage implements Serializable
-{
-    private Token targetToken_;
-
-    private MoveMessage()
-    {
-    }
-
-    MoveMessage(Token targetToken)
-    {
-        targetToken_ = targetToken;
-    }
-
-    Token getTargetToken()
-    {
-        return targetToken_;
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.SingleThreadedStage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndPointState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.IEndPointStateChangeSubscriber;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+
+/*
+ * The load balancing algorithm here is an implementation of
+ * the algorithm as described in the paper "Scalable range query
+ * processing for large-scale distributed database applications".
+ * This class keeps track of load information across the system.
+ * It registers itself with the Gossiper for ApplicationState namely
+ * load information i.e number of requests processed w.r.t distinct
+ * keys at an Endpoint. Monitor load information for a 5 minute
+ * interval and then do load balancing operations if necessary.
+ * 
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+final class StorageLoadBalancer implements IEndPointStateChangeSubscriber
+{
+    class LoadBalancer implements Runnable
+    {
+        LoadBalancer()
+        {
+            /* Copy the entries in loadInfo_ into loadInfo2_ and use it for all calculations */
+            loadInfo2_.putAll(loadInfo_);
+        }
+
+        /**
+         * Obtain a node which is a potential target. Start with
+         * the neighbours i.e either successor or predecessor.
+         * Send the target a MoveMessage. If the node cannot be
+         * relocated on the ring then we pick another candidate for
+         * relocation.
+        */        
+        public void run()
+        {
+            /*
+            int threshold = (int)(StorageLoadBalancer.ratio_ * averageSystemLoad());
+            int myLoad = localLoad();            
+            EndPoint predecessor = storageService_.getPredecessor(StorageService.getLocalStorageEndPoint());
+            if (logger_.isDebugEnabled())
+              logger_.debug("Trying to relocate the predecessor " + predecessor);
+            boolean value = tryThisNode(myLoad, threshold, predecessor);
+            if ( !value )
+            {
+                loadInfo2_.remove(predecessor);
+                EndPoint successor = storageService_.getSuccessor(StorageService.getLocalStorageEndPoint());
+                if (logger_.isDebugEnabled())
+                  logger_.debug("Trying to relocate the successor " + successor);
+                value = tryThisNode(myLoad, threshold, successor);
+                if ( !value )
+                {
+                    loadInfo2_.remove(successor);
+                    while ( !loadInfo2_.isEmpty() )
+                    {
+                        EndPoint target = findARandomLightNode();
+                        if ( target != null )
+                        {
+                            if (logger_.isDebugEnabled())
+                              logger_.debug("Trying to relocate the random node " + target);
+                            value = tryThisNode(myLoad, threshold, target);
+                            if ( !value )
+                            {
+                                loadInfo2_.remove(target);
+                            }
+                            else
+                            {
+                                break;
+                            }
+                        }
+                        else
+                        {
+                            // No light nodes available - this is NOT good.
+                            logger_.warn("Not even a single lightly loaded node is available ...");
+                            break;
+                        }
+                    }
+
+                    loadInfo2_.clear();                    
+                     // If we are here and no node was available to
+                     // perform load balance with we need to report and bail.                    
+                    if ( !value )
+                    {
+                        logger_.warn("Load Balancing operations weren't performed for this node");
+                    }
+                }                
+            }
+            */        
+        }
+
+        /*
+        private boolean tryThisNode(int myLoad, int threshold, EndPoint target)
+        {
+            boolean value = false;
+            LoadInfo li = loadInfo2_.get(target);
+            int pLoad = li.count();
+            if ( ((myLoad + pLoad) >> 1) <= threshold )
+            {
+                //calculate the number of keys to be transferred
+                int keyCount = ( (myLoad - pLoad) >> 1 );
+                if (logger_.isDebugEnabled())
+                  logger_.debug("Number of keys we attempt to transfer to " + target + " " + keyCount);
+                // Determine the token that the target should join at.         
+                BigInteger targetToken = BootstrapAndLbHelper.getTokenBasedOnPrimaryCount(keyCount);
+                // Send a MoveMessage and see if this node is relocateable
+                MoveMessage moveMessage = new MoveMessage(targetToken);
+                Message message = new Message(StorageService.getLocalStorageEndPoint(), StorageLoadBalancer.lbStage_, StorageLoadBalancer.moveMessageVerbHandler_, new Object[]{moveMessage});
+                if (logger_.isDebugEnabled())
+                  logger_.debug("Sending a move message to " + target);
+                IAsyncResult result = MessagingService.getMessagingInstance().sendRR(message, target);
+                value = (Boolean)result.get()[0];
+                if (logger_.isDebugEnabled())
+                  logger_.debug("Response for query to relocate " + target + " is " + value);
+            }
+            return value;
+        }
+        */
+    }
+
+    class MoveMessageVerbHandler implements IVerbHandler
+    {
+        public void doVerb(Message message)
+        {
+            Message reply = message.getReply(StorageService.getLocalStorageEndPoint(), new byte[] {(byte)(isMoveable_.get() ? 1 : 0)});
+            MessagingService.getMessagingInstance().sendOneWay(reply, message.getFrom());
+            if ( isMoveable_.get() )
+            {
+                // MoveMessage moveMessage = (MoveMessage)message.getMessageBody()[0];
+                /* Start the leave operation and join the ring at the position specified */
+                isMoveable_.set(false);
+            }
+        }
+    }
+
+    private static final Logger logger_ = Logger.getLogger(StorageLoadBalancer.class);
+    private static final String lbStage_ = "LOAD-BALANCER-STAGE";
+    private static final String moveMessageVerbHandler_ = "MOVE-MESSAGE-VERB-HANDLER";
+    /* time to delay in minutes the actual load balance procedure if heavily loaded */
+    private static final int delay_ = 5;
+    /* Ratio of highest loaded node and the average load. */
+    private static final double ratio_ = 1.5;
+
+    private StorageService storageService_;
+    /* this indicates whether this node is already helping someone else */
+    private AtomicBoolean isMoveable_ = new AtomicBoolean(false);
+    private Map<EndPoint, LoadInfo> loadInfo_ = new HashMap<EndPoint, LoadInfo>();
+    /* This map is a clone of the one above and is used for various calculations during LB operation */
+    private Map<EndPoint, LoadInfo> loadInfo2_ = new HashMap<EndPoint, LoadInfo>();
+    /* This thread pool is used for initiating load balancing operations */
+    private ScheduledThreadPoolExecutor lb_ = new DebuggableScheduledThreadPoolExecutor(
+            1,
+            new ThreadFactoryImpl("LB-OPERATIONS")
+            );
+    /* This thread pool is used by target node to leave the ring. */
+    private ExecutorService lbOperations_ = new DebuggableThreadPoolExecutor("LB-TARGET");
+
+    StorageLoadBalancer(StorageService storageService)
+    {
+        storageService_ = storageService;
+        /* register the load balancer stage */
+        StageManager.registerStage(StorageLoadBalancer.lbStage_, new SingleThreadedStage(StorageLoadBalancer.lbStage_));
+        /* register the load balancer verb handler */
+        MessagingService.getMessagingInstance().registerVerbHandlers(StorageLoadBalancer.moveMessageVerbHandler_, new MoveMessageVerbHandler());
+    }
+
+    public void start()
+    {
+        /* Register with the Gossiper for EndPointState notifications */
+        Gossiper.instance().register(this);
+    }
+
+    public void onChange(EndPoint endpoint, EndPointState epState)
+    {
+        // load information for this specified endpoint for load balancing 
+        ApplicationState loadInfoState = epState.getApplicationState(LoadDisseminator.loadInfo_);
+        if ( loadInfoState != null )
+        {
+            String lInfoState = loadInfoState.getState();
+            LoadInfo lInfo = new LoadInfo(lInfoState);
+            loadInfo_.put(endpoint, lInfo);
+            
+            /*
+            int currentLoad = Integer.parseInt(loadInfoState.getState());
+            // update load information for this endpoint
+            loadInfo_.put(endpoint, currentLoad);
+
+            // clone load information to perform calculations
+            loadInfo2_.putAll(loadInfo_);
+            // Perform the analysis for load balance operations
+            if ( isHeavyNode() )
+            {
+                if (logger_.isDebugEnabled())
+                  logger_.debug(StorageService.getLocalStorageEndPoint() + " is a heavy node with load " + localLoad());
+                // lb_.schedule( new LoadBalancer(), StorageLoadBalancer.delay_, TimeUnit.MINUTES );
+            }
+            */
+        }       
+    }
+
+    /*
+     * Load information associated with a given endpoint.
+    */
+    LoadInfo getLoad(EndPoint ep)
+    {
+        LoadInfo li = loadInfo_.get(ep);        
+        return li;        
+    }
+
+    /*
+    private boolean isMoveable()
+    {
+        if ( !isMoveable_.get() )
+            return false;
+        int myload = localLoad();
+        EndPoint successor = storageService_.getSuccessor(StorageService.getLocalStorageEndPoint());
+        LoadInfo li = loadInfo2_.get(successor);
+        // "load" is NULL means that the successor node has not
+        // yet gossiped its load information. We should return
+        // false in this case since we want to err on the side
+        // of caution.
+        if ( li == null )
+            return false;
+        else
+        {            
+            if ( ( myload + li.count() ) > StorageLoadBalancer.ratio_*averageSystemLoad() )
+                return false;
+            else
+                return true;
+        }
+    }
+    */
+
+    /*
+    private int localLoad()
+    {
+        LoadInfo value = loadInfo2_.get(StorageService.getLocalStorageEndPoint());
+        return (value == null) ? 0 : value.count();
+    }
+    */
+
+    /*
+    private int averageSystemLoad()
+    {
+        int nodeCount = loadInfo2_.size();
+        Set<EndPoint> nodes = loadInfo2_.keySet();
+
+        int systemLoad = 0;
+        for ( EndPoint node : nodes )
+        {
+            LoadInfo load = loadInfo2_.get(node);
+            if ( load != null )
+                systemLoad += load.count();
+        }
+        int averageLoad = (nodeCount > 0) ? (systemLoad / nodeCount) : 0;
+        if (logger_.isDebugEnabled())
+          logger_.debug("Average system load should be " + averageLoad);
+        return averageLoad;
+    }
+    */
+    
+    /*
+    private boolean isHeavyNode()
+    {
+        return ( localLoad() > ( StorageLoadBalancer.ratio_ * averageSystemLoad() ) );
+    }
+    */
+    
+    /*
+    private boolean isMoveable(EndPoint target)
+    {
+        int threshold = (int)(StorageLoadBalancer.ratio_ * averageSystemLoad());
+        if ( isANeighbour(target) )
+        {
+            // If the target is a neighbour then it is
+            // moveable if its
+            LoadInfo load = loadInfo2_.get(target);
+            if ( load == null )
+                return false;
+            else
+            {
+                int myload = localLoad();
+                int avgLoad = (load.count() + myload) >> 1;
+                if ( avgLoad <= threshold )
+                    return true;
+                else
+                    return false;
+            }
+        }
+        else
+        {
+            EndPoint successor = storageService_.getSuccessor(target);
+            LoadInfo sLoad = loadInfo2_.get(successor);
+            LoadInfo targetLoad = loadInfo2_.get(target);
+            if ( (sLoad.count() + targetLoad.count()) > threshold )
+                return false;
+            else
+                return true;
+        }
+    }
+    */
+
+    private boolean isANeighbour(EndPoint neighbour)
+    {
+        EndPoint predecessor = storageService_.getPredecessor(StorageService.getLocalStorageEndPoint());
+        if ( predecessor.equals(neighbour) )
+            return true;
+
+        EndPoint successor = storageService_.getSuccessor(StorageService.getLocalStorageEndPoint());
+        if ( successor.equals(neighbour) )
+            return true;
+
+        return false;
+    }
+
+    /*
+     * Determine the nodes that are lightly loaded. Choose at
+     * random one of the lightly loaded nodes and use them as
+     * a potential target for load balance.
+    */
+    /*
+    private EndPoint findARandomLightNode()
+    {
+        List<EndPoint> potentialCandidates = new ArrayList<EndPoint>();
+        Set<EndPoint> allTargets = loadInfo2_.keySet();
+        int avgLoad =  averageSystemLoad();
+
+        for( EndPoint target : allTargets )
+        {
+            LoadInfo load = loadInfo2_.get(target);
+            if ( load.count() < avgLoad )
+                potentialCandidates.add(target);
+        }
+
+        if ( potentialCandidates.size() > 0 )
+        {
+            Random random = new Random();
+            int index = random.nextInt(potentialCandidates.size());
+            return potentialCandidates.get(index);
+        }
+        return null;
+    }
+    */
+}
+
+class MoveMessage implements Serializable
+{
+    private Token targetToken_;
+
+    private MoveMessage()
+    {
+    }
+
+    MoveMessage(Token targetToken)
+    {
+        targetToken_ = targetToken;
+    }
+
+    Token getTargetToken()
+    {
+        return targetToken_;
+    }
+}