You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/07/30 17:30:27 UTC
svn commit: r799331 [24/29] - in /incubator/cassandra/trunk: ./
src/java/org/apache/cassandra/concurrent/
src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/gms/ src/...
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/MultiQuorumResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/MultiQuorumResponseHandler.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/MultiQuorumResponseHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/MultiQuorumResponseHandler.java Thu Jul 30 15:30:21 2009
@@ -1,252 +1,252 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.service;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.*;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ReadCommand;
-import org.apache.cassandra.db.Row;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.IAsyncCallback;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class MultiQuorumResponseHandler implements IAsyncCallback
-{
- private static Logger logger_ = Logger.getLogger( QuorumResponseHandler.class );
- private Lock lock_ = new ReentrantLock();
- private Condition condition_;
- /* This maps the keys to the original data read messages */
- private Map<String, ReadCommand> readMessages_ = new HashMap<String, ReadCommand>();
- /* This maps the key to its set of replicas */
- private Map<String, EndPoint[]> endpoints_ = new HashMap<String, EndPoint[]>();
- /* This maps the groupId to the individual callback for the set of messages */
- private Map<String, SingleQuorumResponseHandler> handlers_ = new HashMap<String, SingleQuorumResponseHandler>();
- /* This should hold all the responses for the keys */
- private List<Row> responses_ = new ArrayList<Row>();
- private AtomicBoolean done_ = new AtomicBoolean(false);
-
- /**
- * This is used to handle the responses from the individual messages
- * that are sent out to the replicas.
- * @author alakshman
- *
- */
- private class SingleQuorumResponseHandler implements IAsyncCallback
- {
- private Lock lock_ = new ReentrantLock();
- private IResponseResolver<Row> responseResolver_;
- private List<Message> responses_ = new ArrayList<Message>();
-
- SingleQuorumResponseHandler(IResponseResolver<Row> responseResolver)
- {
- responseResolver_ = responseResolver;
- }
-
- public void attachContext(Object o)
- {
- throw new UnsupportedOperationException("This operation is not supported in this implementation");
- }
-
- public void response(Message response)
- {
- lock_.lock();
- try
- {
- responses_.add(response);
- int majority = (DatabaseDescriptor.getReplicationFactor() >> 1) + 1;
- if ( responses_.size() >= majority && responseResolver_.isDataPresent(responses_))
- {
- onCompletion();
- }
- }
- catch ( IOException ex )
- {
- logger_.info( LogUtil.throwableToString(ex) );
- }
- finally
- {
- lock_.unlock();
- }
- }
-
- private void onCompletion() throws IOException
- {
- try
- {
- Row row = responseResolver_.resolve(responses_);
- MultiQuorumResponseHandler.this.onCompleteResponse(row);
- }
- catch ( DigestMismatchException ex )
- {
- /*
- * The DigestMismatchException has the key for which the mismatch
- * occurred bundled in it as context
- */
- String key = ex.getMessage();
- onDigestMismatch(key);
- }
- }
-
- /**
- * This method is invoked on a digest match. We pass in the key
- * in order to retrieve the appropriate data message that needs
- * to be sent out to the replicas.
- *
- * @param key for which the mismatch occurred.
- */
- private void onDigestMismatch(String key) throws IOException
- {
- if ( DatabaseDescriptor.getConsistencyCheck())
- {
- ReadCommand readCommand = readMessages_.get(key);
- readCommand.setDigestQuery(false);
- Message messageRepair = readCommand.makeReadMessage();
- EndPoint[] endpoints = MultiQuorumResponseHandler.this.endpoints_.get(readCommand.key);
- Message[][] messages = new Message[][]{ {messageRepair, messageRepair, messageRepair} };
- EndPoint[][] epList = new EndPoint[][]{ endpoints };
- MessagingService.getMessagingInstance().sendRR(messages, epList, MultiQuorumResponseHandler.this);
- }
- }
- }
-
- public MultiQuorumResponseHandler(Map<String, ReadCommand> readMessages, Map<String, EndPoint[]> endpoints)
- {
- condition_ = lock_.newCondition();
- readMessages_ = readMessages;
- endpoints_ = endpoints;
- }
-
- public Row[] get() throws TimeoutException
- {
- long startTime = System.currentTimeMillis();
- lock_.lock();
- try
- {
- boolean bVal = true;
- try
- {
- if ( !done_.get() )
- {
- bVal = condition_.await(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
- }
- }
- catch ( InterruptedException ex )
- {
- if (logger_.isDebugEnabled())
- logger_.debug( LogUtil.throwableToString(ex) );
- }
-
- if ( !bVal && !done_.get() )
- {
- StringBuilder sb = new StringBuilder("");
- for ( Row row : responses_ )
- {
- sb.append(row.key());
- sb.append(":");
- }
- throw new TimeoutException("Operation timed out - received only " + responses_.size() + " responses from " + sb.toString() + " .");
- }
- }
- finally
- {
- lock_.unlock();
- }
-
- logger_.info("MultiQuorumResponseHandler: " + (System.currentTimeMillis() - startTime) + " ms.");
- return responses_.toArray( new Row[0] );
- }
-
- /**
- * Invoked when a complete response has been obtained
- * for one of the sub-groups a.k.a keys for the query
- * has been performed.
- *
- * @param row obtained as a result of the response.
- */
- void onCompleteResponse(Row row)
- {
- if ( !done_.get() )
- {
- responses_.add(row);
- if ( responses_.size() == readMessages_.size() )
- {
- done_.set(true);
- condition_.signal();
- }
- }
- }
-
- /**
- * The handler of the response message that has been
- * sent by one of the replicas for one of the keys.
- *
- * @param message the response message for one of the
- * message that we sent out.
- */
- public void response(Message message)
- {
- lock_.lock();
- try
- {
- SingleQuorumResponseHandler handler = handlers_.get(message.getMessageId());
- handler.response(message);
- }
- finally
- {
- lock_.unlock();
- }
- }
-
- /**
- * The context that is passed in for the query of
- * multiple keys in the system. For each message
- * id in the context register a callback handler
- * for the same. This is done so that all responses
- * for a given key use the same callback handler.
- *
- * @param o the context which is an array of strings
- * corresponding to the message id's for each
- * key.
- */
- public void attachContext(Object o)
- {
- String[] gids = (String[])o;
- for ( String gid : gids )
- {
- IResponseResolver<Row> responseResolver = new ReadResponseResolver();
- SingleQuorumResponseHandler handler = new SingleQuorumResponseHandler(responseResolver);
- handlers_.put(gid, handler);
- }
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class MultiQuorumResponseHandler implements IAsyncCallback
+{
+ private static Logger logger_ = Logger.getLogger( QuorumResponseHandler.class );
+ private Lock lock_ = new ReentrantLock();
+ private Condition condition_;
+ /* This maps the keys to the original data read messages */
+ private Map<String, ReadCommand> readMessages_ = new HashMap<String, ReadCommand>();
+ /* This maps the key to its set of replicas */
+ private Map<String, EndPoint[]> endpoints_ = new HashMap<String, EndPoint[]>();
+ /* This maps the groupId to the individual callback for the set of messages */
+ private Map<String, SingleQuorumResponseHandler> handlers_ = new HashMap<String, SingleQuorumResponseHandler>();
+ /* This should hold all the responses for the keys */
+ private List<Row> responses_ = new ArrayList<Row>();
+ private AtomicBoolean done_ = new AtomicBoolean(false);
+
+ /**
+ * This is used to handle the responses from the individual messages
+ * that are sent out to the replicas.
+ * @author alakshman
+ *
+ */
+ private class SingleQuorumResponseHandler implements IAsyncCallback
+ {
+ private Lock lock_ = new ReentrantLock();
+ private IResponseResolver<Row> responseResolver_;
+ private List<Message> responses_ = new ArrayList<Message>();
+
+ SingleQuorumResponseHandler(IResponseResolver<Row> responseResolver)
+ {
+ responseResolver_ = responseResolver;
+ }
+
+ public void attachContext(Object o)
+ {
+ throw new UnsupportedOperationException("This operation is not supported in this implementation");
+ }
+
+ public void response(Message response)
+ {
+ lock_.lock();
+ try
+ {
+ responses_.add(response);
+ int majority = (DatabaseDescriptor.getReplicationFactor() >> 1) + 1;
+ if ( responses_.size() >= majority && responseResolver_.isDataPresent(responses_))
+ {
+ onCompletion();
+ }
+ }
+ catch ( IOException ex )
+ {
+ logger_.info( LogUtil.throwableToString(ex) );
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+
+ private void onCompletion() throws IOException
+ {
+ try
+ {
+ Row row = responseResolver_.resolve(responses_);
+ MultiQuorumResponseHandler.this.onCompleteResponse(row);
+ }
+ catch ( DigestMismatchException ex )
+ {
+ /*
+ * The DigestMismatchException has the key for which the mismatch
+ * occurred bundled in it as context
+ */
+ String key = ex.getMessage();
+ onDigestMismatch(key);
+ }
+ }
+
+ /**
+ * This method is invoked on a digest match. We pass in the key
+ * in order to retrieve the appropriate data message that needs
+ * to be sent out to the replicas.
+ *
+ * @param key for which the mismatch occurred.
+ */
+ private void onDigestMismatch(String key) throws IOException
+ {
+ if ( DatabaseDescriptor.getConsistencyCheck())
+ {
+ ReadCommand readCommand = readMessages_.get(key);
+ readCommand.setDigestQuery(false);
+ Message messageRepair = readCommand.makeReadMessage();
+ EndPoint[] endpoints = MultiQuorumResponseHandler.this.endpoints_.get(readCommand.key);
+ Message[][] messages = new Message[][]{ {messageRepair, messageRepair, messageRepair} };
+ EndPoint[][] epList = new EndPoint[][]{ endpoints };
+ MessagingService.getMessagingInstance().sendRR(messages, epList, MultiQuorumResponseHandler.this);
+ }
+ }
+ }
+
+ public MultiQuorumResponseHandler(Map<String, ReadCommand> readMessages, Map<String, EndPoint[]> endpoints)
+ {
+ condition_ = lock_.newCondition();
+ readMessages_ = readMessages;
+ endpoints_ = endpoints;
+ }
+
+ public Row[] get() throws TimeoutException
+ {
+ long startTime = System.currentTimeMillis();
+ lock_.lock();
+ try
+ {
+ boolean bVal = true;
+ try
+ {
+ if ( !done_.get() )
+ {
+ bVal = condition_.await(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
+ }
+ }
+ catch ( InterruptedException ex )
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug( LogUtil.throwableToString(ex) );
+ }
+
+ if ( !bVal && !done_.get() )
+ {
+ StringBuilder sb = new StringBuilder("");
+ for ( Row row : responses_ )
+ {
+ sb.append(row.key());
+ sb.append(":");
+ }
+ throw new TimeoutException("Operation timed out - received only " + responses_.size() + " responses from " + sb.toString() + " .");
+ }
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+
+ logger_.info("MultiQuorumResponseHandler: " + (System.currentTimeMillis() - startTime) + " ms.");
+ return responses_.toArray( new Row[0] );
+ }
+
+ /**
+ * Invoked when a complete response has been obtained
+ * for one of the sub-groups a.k.a keys for the query
+ * has been performed.
+ *
+ * @param row obtained as a result of the response.
+ */
+ void onCompleteResponse(Row row)
+ {
+ if ( !done_.get() )
+ {
+ responses_.add(row);
+ if ( responses_.size() == readMessages_.size() )
+ {
+ done_.set(true);
+ condition_.signal();
+ }
+ }
+ }
+
+ /**
+ * The handler of the response message that has been
+ * sent by one of the replicas for one of the keys.
+ *
+ * @param message the response message for one of the
+ * message that we sent out.
+ */
+ public void response(Message message)
+ {
+ lock_.lock();
+ try
+ {
+ SingleQuorumResponseHandler handler = handlers_.get(message.getMessageId());
+ handler.response(message);
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+
+ /**
+ * The context that is passed in for the query of
+ * multiple keys in the system. For each message
+ * id in the context register a callback handler
+ * for the same. This is done so that all responses
+ * for a given key use the same callback handler.
+ *
+ * @param o the context which is an array of strings
+ * corresponding to the message id's for each
+ * key.
+ */
+ public void attachContext(Object o)
+ {
+ String[] gids = (String[])o;
+ for ( String gid : gids )
+ {
+ IResponseResolver<Row> responseResolver = new ReadResponseResolver();
+ SingleQuorumResponseHandler handler = new SingleQuorumResponseHandler(responseResolver);
+ handlers_.put(gid, handler);
+ }
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java Thu Jul 30 15:30:21 2009
@@ -1,126 +1,126 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.service;
-
-import java.util.List;
-import java.util.ArrayList;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.*;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.net.IAsyncCallback;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class QuorumResponseHandler<T> implements IAsyncCallback
-{
- private static Logger logger_ = Logger.getLogger( QuorumResponseHandler.class );
- private Lock lock_ = new ReentrantLock();
- private Condition condition_;
- private int responseCount_;
- private List<Message> responses_ = new ArrayList<Message>();
- private IResponseResolver<T> responseResolver_;
- private AtomicBoolean done_ = new AtomicBoolean(false);
-
- public QuorumResponseHandler(int responseCount, IResponseResolver<T> responseResolver) throws InvalidRequestException
- {
- if (responseCount > DatabaseDescriptor.getReplicationFactor())
- throw new InvalidRequestException("Cannot block for more than the replication factor of " + DatabaseDescriptor.getReplicationFactor());
- if (responseCount < 1)
- throw new InvalidRequestException("Cannot block for less than one replica");
- condition_ = lock_.newCondition();
- responseCount_ = responseCount;
- responseResolver_ = responseResolver;
- }
-
- public T get() throws TimeoutException, DigestMismatchException
- {
- lock_.lock();
- try
- {
- boolean bVal = true;
- try
- {
- if ( !done_.get() )
- {
- bVal = condition_.await(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
- }
- }
- catch ( InterruptedException ex )
- {
- if (logger_.isDebugEnabled())
- logger_.debug( LogUtil.throwableToString(ex) );
- }
-
- if ( !bVal && !done_.get() )
- {
- StringBuilder sb = new StringBuilder("");
- for ( Message message : responses_ )
- {
- sb.append(message.getFrom());
- }
- throw new TimeoutException("Operation timed out - received only " + responses_.size() + " responses from " + sb.toString() + " .");
- }
- }
- finally
- {
- lock_.unlock();
- for(Message response : responses_)
- {
- MessagingService.removeRegisteredCallback( response.getMessageId() );
- }
- }
-
- return responseResolver_.resolve( responses_);
- }
-
- public void response(Message message)
- {
- lock_.lock();
- try
- {
- if ( !done_.get() )
- {
- responses_.add( message );
- if ( responses_.size() >= responseCount_ && responseResolver_.isDataPresent(responses_))
- {
- done_.set(true);
- condition_.signal();
- }
- }
- }
- finally
- {
- lock_.unlock();
- }
- }
-
- public void attachContext(Object o)
- {
- throw new UnsupportedOperationException("This operation is not supported in this version of the callback handler");
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class QuorumResponseHandler<T> implements IAsyncCallback
+{
+ private static Logger logger_ = Logger.getLogger( QuorumResponseHandler.class );
+ private Lock lock_ = new ReentrantLock();
+ private Condition condition_;
+ private int responseCount_;
+ private List<Message> responses_ = new ArrayList<Message>();
+ private IResponseResolver<T> responseResolver_;
+ private AtomicBoolean done_ = new AtomicBoolean(false);
+
+ public QuorumResponseHandler(int responseCount, IResponseResolver<T> responseResolver) throws InvalidRequestException
+ {
+ if (responseCount > DatabaseDescriptor.getReplicationFactor())
+ throw new InvalidRequestException("Cannot block for more than the replication factor of " + DatabaseDescriptor.getReplicationFactor());
+ if (responseCount < 1)
+ throw new InvalidRequestException("Cannot block for less than one replica");
+ condition_ = lock_.newCondition();
+ responseCount_ = responseCount;
+ responseResolver_ = responseResolver;
+ }
+
+ public T get() throws TimeoutException, DigestMismatchException
+ {
+ lock_.lock();
+ try
+ {
+ boolean bVal = true;
+ try
+ {
+ if ( !done_.get() )
+ {
+ bVal = condition_.await(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
+ }
+ }
+ catch ( InterruptedException ex )
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug( LogUtil.throwableToString(ex) );
+ }
+
+ if ( !bVal && !done_.get() )
+ {
+ StringBuilder sb = new StringBuilder("");
+ for ( Message message : responses_ )
+ {
+ sb.append(message.getFrom());
+ }
+ throw new TimeoutException("Operation timed out - received only " + responses_.size() + " responses from " + sb.toString() + " .");
+ }
+ }
+ finally
+ {
+ lock_.unlock();
+ for(Message response : responses_)
+ {
+ MessagingService.removeRegisteredCallback( response.getMessageId() );
+ }
+ }
+
+ return responseResolver_.resolve( responses_);
+ }
+
+ public void response(Message message)
+ {
+ lock_.lock();
+ try
+ {
+ if ( !done_.get() )
+ {
+ responses_.add( message );
+ if ( responses_.size() >= responseCount_ && responseResolver_.isDataPresent(responses_))
+ {
+ done_.set(true);
+ condition_.signal();
+ }
+ }
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+
+ public void attachContext(Object o)
+ {
+ throw new UnsupportedOperationException("This operation is not supported in this version of the callback handler");
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java Thu Jul 30 15:30:21 2009
@@ -1,125 +1,125 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.service;
-
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.concurrent.locks.*;
-
-import org.apache.cassandra.db.Column;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.RowMutation;
-import org.apache.cassandra.db.RowMutationMessage;
-import org.apache.cassandra.db.SuperColumn;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.Header;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.Cachetable;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.ICacheExpungeHook;
-import org.apache.cassandra.utils.ICachetable;
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
-
-
-
-/*
- * This class manages the read repairs . This is a singleton class
- * it basically uses the cache table construct to schedule writes that have to be
- * made for read repairs.
- * A cachetable is created which wakes up every n milliseconds specified by
- * expirationTimeInMillis and calls a global hook function on pending entries
- * This function basically sends the message to the appropriate servers to update them
- * with the latest changes.
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-class ReadRepairManager
-{
- private static Logger logger_ = Logger.getLogger(ReadRepairManager.class);
- private static final long expirationTimeInMillis = 2000;
- private static Lock lock_ = new ReentrantLock();
- private static ReadRepairManager self_ = null;
-
- /*
- * This is the internal class which actually
- * implements the global hook function called by the read repair manager
- */
- static class ReadRepairPerformer implements
- ICacheExpungeHook<String, Message>
- {
- /*
- * The hook function which takes the end point and the row mutation that
- * needs to be sent to the end point in order
- * to perform read repair.
- */
- public void callMe(String target,
- Message message)
- {
- String[] pieces = FBUtilities.strip(target, ":");
- EndPoint to = new EndPoint(pieces[0], Integer.parseInt(pieces[1]));
- MessagingService.getMessagingInstance().sendOneWay(message, to);
- }
-
- }
-
- private ICachetable<String, Message> readRepairTable_ = new Cachetable<String, Message>(expirationTimeInMillis, new ReadRepairManager.ReadRepairPerformer());
-
- protected ReadRepairManager()
- {
-
- }
-
- public static ReadRepairManager instance()
- {
- if (self_ == null)
- {
- lock_.lock();
- try
- {
- if ( self_ == null )
- self_ = new ReadRepairManager();
- }
- finally
- {
- lock_.unlock();
- }
- }
- return self_;
- }
-
- /*
- * Schedules a read repair.
- * @param target endpoint on which the read repair should happen
- * @param rowMutationMessage the row mutation message that has the repaired row.
- */
- public void schedule(EndPoint target, RowMutationMessage rowMutationMessage)
- {
- try
- {
- Message message = rowMutationMessage.makeRowMutationMessage(StorageService.readRepairVerbHandler_);
- String key = target + ":" + message.getMessageId();
- readRepairTable_.put(key, message);
- }
- catch ( IOException ex )
- {
- logger_.error(LogUtil.throwableToString(ex));
- }
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.concurrent.locks.*;
+
+import org.apache.cassandra.db.Column;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.RowMutationMessage;
+import org.apache.cassandra.db.SuperColumn;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Header;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.Cachetable;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.ICacheExpungeHook;
+import org.apache.cassandra.utils.ICachetable;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+
+/*
+ * This class manages the read repairs . This is a singleton class
+ * it basically uses the cache table construct to schedule writes that have to be
+ * made for read repairs.
+ * A cachetable is created which wakes up every n milliseconds specified by
+ * expirationTimeInMillis and calls a global hook function on pending entries
+ * This function basically sends the message to the appropriate servers to update them
+ * with the latest changes.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+class ReadRepairManager
+{
+ private static Logger logger_ = Logger.getLogger(ReadRepairManager.class);
+ private static final long expirationTimeInMillis = 2000;
+ private static Lock lock_ = new ReentrantLock();
+ private static ReadRepairManager self_ = null;
+
+ /*
+ * This is the internal class which actually
+ * implements the global hook function called by the read repair manager
+ */
+ static class ReadRepairPerformer implements
+ ICacheExpungeHook<String, Message>
+ {
+ /*
+ * The hook function which takes the end point and the row mutation that
+ * needs to be sent to the end point in order
+ * to perform read repair.
+ */
+ public void callMe(String target,
+ Message message)
+ {
+ String[] pieces = FBUtilities.strip(target, ":");
+ EndPoint to = new EndPoint(pieces[0], Integer.parseInt(pieces[1]));
+ MessagingService.getMessagingInstance().sendOneWay(message, to);
+ }
+
+ }
+
+ private ICachetable<String, Message> readRepairTable_ = new Cachetable<String, Message>(expirationTimeInMillis, new ReadRepairManager.ReadRepairPerformer());
+
+ protected ReadRepairManager()
+ {
+
+ }
+
+ public static ReadRepairManager instance()
+ {
+ if (self_ == null)
+ {
+ lock_.lock();
+ try
+ {
+ if ( self_ == null )
+ self_ = new ReadRepairManager();
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+ return self_;
+ }
+
+ /*
+ * Schedules a read repair.
+ * @param target endpoint on which the read repair should happen
+ * @param rowMutationMessage the row mutation message that has the repaired row.
+ */
+ public void schedule(EndPoint target, RowMutationMessage rowMutationMessage)
+ {
+ try
+ {
+ Message message = rowMutationMessage.makeRowMutationMessage(StorageService.readRepairVerbHandler_);
+ String key = target + ":" + message.getMessageId();
+ readRepairTable_.put(key, message);
+ }
+ catch ( IOException ex )
+ {
+ logger_.error(LogUtil.throwableToString(ex));
+ }
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java Thu Jul 30 15:30:21 2009
@@ -1,177 +1,177 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.service;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.ReadResponse;
-import org.apache.cassandra.db.Row;
-import org.apache.cassandra.db.RowMutation;
-import org.apache.cassandra.db.RowMutationMessage;
-import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
-
-
-/**
- * This class is used by all read functions and is called by the Quorum
- * when at least a few of the servers (few is specified in Quorum)
- * have sent the response . The resolve function then schedules read repair
- * and resolution of read data from the various servers.
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-public class ReadResponseResolver implements IResponseResolver<Row>
-{
- private static Logger logger_ = Logger.getLogger(WriteResponseResolver.class);
-
- /*
- * This method for resolving read data should look at the timestamps of each
- * of the columns that are read and should pick up columns with the latest
- * timestamp. For those columns where the timestamp is not the latest a
- * repair request should be scheduled.
- *
- */
- public Row resolve(List<Message> responses) throws DigestMismatchException
- {
- long startTime = System.currentTimeMillis();
- Row retRow = null;
- List<Row> rowList = new ArrayList<Row>();
- List<EndPoint> endPoints = new ArrayList<EndPoint>();
- String key = null;
- String table = null;
- byte[] digest = new byte[0];
- boolean isDigestQuery = false;
-
- /*
- * Populate the list of rows from each of the messages
- * Check to see if there is a digest query. If a digest
- * query exists then we need to compare the digest with
- * the digest of the data that is received.
- */
- DataInputBuffer bufIn = new DataInputBuffer();
- for (Message response : responses)
- {
- byte[] body = response.getMessageBody();
- bufIn.reset(body, body.length);
- try
- {
- long start = System.currentTimeMillis();
- ReadResponse result = ReadResponse.serializer().deserialize(bufIn);
- if (logger_.isDebugEnabled())
- logger_.debug( "Response deserialization time : " + (System.currentTimeMillis() - start) + " ms.");
- if(!result.isDigestQuery())
- {
- rowList.add(result.row());
- endPoints.add(response.getFrom());
- key = result.row().key();
- table = result.row().getTable();
- }
- else
- {
- digest = result.digest();
- isDigestQuery = true;
- }
- }
- catch( IOException ex )
- {
- logger_.info(LogUtil.throwableToString(ex));
- }
- }
- // If there was a digest query compare it with all the data digests
- // If there is a mismatch then throw an exception so that read repair can happen.
- if(isDigestQuery)
- {
- for(Row row: rowList)
- {
- if( !Arrays.equals(row.digest(), digest) )
- {
- /* Wrap the key as the context in this exception */
- throw new DigestMismatchException(row.key());
- }
- }
- }
-
- /* If the rowList is empty then we had some exception above. */
- if ( rowList.size() == 0 )
- {
- return retRow;
- }
-
- /* Now calculate the resolved row */
- retRow = new Row(table, key);
- for (int i = 0 ; i < rowList.size(); i++)
- {
- retRow.repair(rowList.get(i));
- }
-
- // At this point we have the return row .
- // Now we need to calculate the difference
- // so that we can schedule read repairs
- for (int i = 0 ; i < rowList.size(); i++)
- {
- // since retRow is the resolved row it can be used as the super set
- Row diffRow = rowList.get(i).diff(retRow);
- if(diffRow == null) // no repair needs to happen
- continue;
- // create the row mutation message based on the diff and schedule a read repair
- RowMutation rowMutation = new RowMutation(table, key);
- for (ColumnFamily cf : diffRow.getColumnFamilies())
- {
- rowMutation.add(cf);
- }
- RowMutationMessage rowMutationMessage = new RowMutationMessage(rowMutation);
- ReadRepairManager.instance().schedule(endPoints.get(i),rowMutationMessage);
- }
- logger_.info("resolve: " + (System.currentTimeMillis() - startTime) + " ms.");
- return retRow;
- }
-
- public boolean isDataPresent(List<Message> responses)
- {
- boolean isDataPresent = false;
- for (Message response : responses)
- {
- byte[] body = response.getMessageBody();
- DataInputBuffer bufIn = new DataInputBuffer();
- bufIn.reset(body, body.length);
- try
- {
- ReadResponse result = ReadResponse.serializer().deserialize(bufIn);
- if(!result.isDigestQuery())
- {
- isDataPresent = true;
- }
- bufIn.close();
- }
- catch(IOException ex)
- {
- logger_.info(LogUtil.throwableToString(ex));
- }
- }
- return isDataPresent;
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ReadResponse;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.RowMutationMessage;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+/**
+ * This class is used by all read functions and is called by the Quorum
+ * when at least a few of the servers (few is specified in Quorum)
+ * have sent the response . The resolve function then schedules read repair
+ * and resolution of read data from the various servers.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public class ReadResponseResolver implements IResponseResolver<Row>
+{
+ private static Logger logger_ = Logger.getLogger(WriteResponseResolver.class);
+
+ /*
+ * This method for resolving read data should look at the timestamps of each
+ * of the columns that are read and should pick up columns with the latest
+ * timestamp. For those columns where the timestamp is not the latest a
+ * repair request should be scheduled.
+ *
+ */
+ public Row resolve(List<Message> responses) throws DigestMismatchException
+ {
+ long startTime = System.currentTimeMillis();
+ Row retRow = null;
+ List<Row> rowList = new ArrayList<Row>();
+ List<EndPoint> endPoints = new ArrayList<EndPoint>();
+ String key = null;
+ String table = null;
+ byte[] digest = new byte[0];
+ boolean isDigestQuery = false;
+
+ /*
+ * Populate the list of rows from each of the messages
+ * Check to see if there is a digest query. If a digest
+ * query exists then we need to compare the digest with
+ * the digest of the data that is received.
+ */
+ DataInputBuffer bufIn = new DataInputBuffer();
+ for (Message response : responses)
+ {
+ byte[] body = response.getMessageBody();
+ bufIn.reset(body, body.length);
+ try
+ {
+ long start = System.currentTimeMillis();
+ ReadResponse result = ReadResponse.serializer().deserialize(bufIn);
+ if (logger_.isDebugEnabled())
+ logger_.debug( "Response deserialization time : " + (System.currentTimeMillis() - start) + " ms.");
+ if(!result.isDigestQuery())
+ {
+ rowList.add(result.row());
+ endPoints.add(response.getFrom());
+ key = result.row().key();
+ table = result.row().getTable();
+ }
+ else
+ {
+ digest = result.digest();
+ isDigestQuery = true;
+ }
+ }
+ catch( IOException ex )
+ {
+ logger_.info(LogUtil.throwableToString(ex));
+ }
+ }
+ // If there was a digest query compare it with all the data digests
+ // If there is a mismatch then throw an exception so that read repair can happen.
+ if(isDigestQuery)
+ {
+ for(Row row: rowList)
+ {
+ if( !Arrays.equals(row.digest(), digest) )
+ {
+ /* Wrap the key as the context in this exception */
+ throw new DigestMismatchException(row.key());
+ }
+ }
+ }
+
+ /* If the rowList is empty then we had some exception above. */
+ if ( rowList.size() == 0 )
+ {
+ return retRow;
+ }
+
+ /* Now calculate the resolved row */
+ retRow = new Row(table, key);
+ for (int i = 0 ; i < rowList.size(); i++)
+ {
+ retRow.repair(rowList.get(i));
+ }
+
+ // At this point we have the return row .
+ // Now we need to calculate the difference
+ // so that we can schedule read repairs
+ for (int i = 0 ; i < rowList.size(); i++)
+ {
+ // since retRow is the resolved row it can be used as the super set
+ Row diffRow = rowList.get(i).diff(retRow);
+ if(diffRow == null) // no repair needs to happen
+ continue;
+ // create the row mutation message based on the diff and schedule a read repair
+ RowMutation rowMutation = new RowMutation(table, key);
+ for (ColumnFamily cf : diffRow.getColumnFamilies())
+ {
+ rowMutation.add(cf);
+ }
+ RowMutationMessage rowMutationMessage = new RowMutationMessage(rowMutation);
+ ReadRepairManager.instance().schedule(endPoints.get(i),rowMutationMessage);
+ }
+ logger_.info("resolve: " + (System.currentTimeMillis() - startTime) + " ms.");
+ return retRow;
+ }
+
+ public boolean isDataPresent(List<Message> responses)
+ {
+ boolean isDataPresent = false;
+ for (Message response : responses)
+ {
+ byte[] body = response.getMessageBody();
+ DataInputBuffer bufIn = new DataInputBuffer();
+ bufIn.reset(body, body.length);
+ try
+ {
+ ReadResponse result = ReadResponse.serializer().deserialize(bufIn);
+ if(!result.isDigestQuery())
+ {
+ isDataPresent = true;
+ }
+ bufIn.close();
+ }
+ catch(IOException ex)
+ {
+ logger_.info(LogUtil.throwableToString(ex));
+ }
+ }
+ return isDataPresent;
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java Thu Jul 30 15:30:21 2009
@@ -1,403 +1,403 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.service;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.log4j.Logger;
-
-import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
-import org.apache.cassandra.concurrent.SingleThreadedStage;
-import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.concurrent.ThreadFactoryImpl;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.gms.ApplicationState;
-import org.apache.cassandra.gms.EndPointState;
-import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.gms.IEndPointStateChangeSubscriber;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-
-/*
- * The load balancing algorithm here is an implementation of
- * the algorithm as described in the paper "Scalable range query
- * processing for large-scale distributed database applications".
- * This class keeps track of load information across the system.
- * It registers itself with the Gossiper for ApplicationState namely
- * load information i.e number of requests processed w.r.t distinct
- * keys at an Endpoint. Monitor load information for a 5 minute
- * interval and then do load balancing operations if necessary.
- *
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-final class StorageLoadBalancer implements IEndPointStateChangeSubscriber
-{
- class LoadBalancer implements Runnable
- {
- LoadBalancer()
- {
- /* Copy the entries in loadInfo_ into loadInfo2_ and use it for all calculations */
- loadInfo2_.putAll(loadInfo_);
- }
-
- /**
- * Obtain a node which is a potential target. Start with
- * the neighbours i.e either successor or predecessor.
- * Send the target a MoveMessage. If the node cannot be
- * relocated on the ring then we pick another candidate for
- * relocation.
- */
- public void run()
- {
- /*
- int threshold = (int)(StorageLoadBalancer.ratio_ * averageSystemLoad());
- int myLoad = localLoad();
- EndPoint predecessor = storageService_.getPredecessor(StorageService.getLocalStorageEndPoint());
- if (logger_.isDebugEnabled())
- logger_.debug("Trying to relocate the predecessor " + predecessor);
- boolean value = tryThisNode(myLoad, threshold, predecessor);
- if ( !value )
- {
- loadInfo2_.remove(predecessor);
- EndPoint successor = storageService_.getSuccessor(StorageService.getLocalStorageEndPoint());
- if (logger_.isDebugEnabled())
- logger_.debug("Trying to relocate the successor " + successor);
- value = tryThisNode(myLoad, threshold, successor);
- if ( !value )
- {
- loadInfo2_.remove(successor);
- while ( !loadInfo2_.isEmpty() )
- {
- EndPoint target = findARandomLightNode();
- if ( target != null )
- {
- if (logger_.isDebugEnabled())
- logger_.debug("Trying to relocate the random node " + target);
- value = tryThisNode(myLoad, threshold, target);
- if ( !value )
- {
- loadInfo2_.remove(target);
- }
- else
- {
- break;
- }
- }
- else
- {
- // No light nodes available - this is NOT good.
- logger_.warn("Not even a single lightly loaded node is available ...");
- break;
- }
- }
-
- loadInfo2_.clear();
- // If we are here and no node was available to
- // perform load balance with we need to report and bail.
- if ( !value )
- {
- logger_.warn("Load Balancing operations weren't performed for this node");
- }
- }
- }
- */
- }
-
- /*
- private boolean tryThisNode(int myLoad, int threshold, EndPoint target)
- {
- boolean value = false;
- LoadInfo li = loadInfo2_.get(target);
- int pLoad = li.count();
- if ( ((myLoad + pLoad) >> 1) <= threshold )
- {
- //calculate the number of keys to be transferred
- int keyCount = ( (myLoad - pLoad) >> 1 );
- if (logger_.isDebugEnabled())
- logger_.debug("Number of keys we attempt to transfer to " + target + " " + keyCount);
- // Determine the token that the target should join at.
- BigInteger targetToken = BootstrapAndLbHelper.getTokenBasedOnPrimaryCount(keyCount);
- // Send a MoveMessage and see if this node is relocateable
- MoveMessage moveMessage = new MoveMessage(targetToken);
- Message message = new Message(StorageService.getLocalStorageEndPoint(), StorageLoadBalancer.lbStage_, StorageLoadBalancer.moveMessageVerbHandler_, new Object[]{moveMessage});
- if (logger_.isDebugEnabled())
- logger_.debug("Sending a move message to " + target);
- IAsyncResult result = MessagingService.getMessagingInstance().sendRR(message, target);
- value = (Boolean)result.get()[0];
- if (logger_.isDebugEnabled())
- logger_.debug("Response for query to relocate " + target + " is " + value);
- }
- return value;
- }
- */
- }
-
- class MoveMessageVerbHandler implements IVerbHandler
- {
- public void doVerb(Message message)
- {
- Message reply = message.getReply(StorageService.getLocalStorageEndPoint(), new byte[] {(byte)(isMoveable_.get() ? 1 : 0)});
- MessagingService.getMessagingInstance().sendOneWay(reply, message.getFrom());
- if ( isMoveable_.get() )
- {
- // MoveMessage moveMessage = (MoveMessage)message.getMessageBody()[0];
- /* Start the leave operation and join the ring at the position specified */
- isMoveable_.set(false);
- }
- }
- }
-
- private static final Logger logger_ = Logger.getLogger(StorageLoadBalancer.class);
- private static final String lbStage_ = "LOAD-BALANCER-STAGE";
- private static final String moveMessageVerbHandler_ = "MOVE-MESSAGE-VERB-HANDLER";
- /* time to delay in minutes the actual load balance procedure if heavily loaded */
- private static final int delay_ = 5;
- /* Ratio of highest loaded node and the average load. */
- private static final double ratio_ = 1.5;
-
- private StorageService storageService_;
- /* this indicates whether this node is already helping someone else */
- private AtomicBoolean isMoveable_ = new AtomicBoolean(false);
- private Map<EndPoint, LoadInfo> loadInfo_ = new HashMap<EndPoint, LoadInfo>();
- /* This map is a clone of the one above and is used for various calculations during LB operation */
- private Map<EndPoint, LoadInfo> loadInfo2_ = new HashMap<EndPoint, LoadInfo>();
- /* This thread pool is used for initiating load balancing operations */
- private ScheduledThreadPoolExecutor lb_ = new DebuggableScheduledThreadPoolExecutor(
- 1,
- new ThreadFactoryImpl("LB-OPERATIONS")
- );
- /* This thread pool is used by target node to leave the ring. */
- private ExecutorService lbOperations_ = new DebuggableThreadPoolExecutor("LB-TARGET");
-
- StorageLoadBalancer(StorageService storageService)
- {
- storageService_ = storageService;
- /* register the load balancer stage */
- StageManager.registerStage(StorageLoadBalancer.lbStage_, new SingleThreadedStage(StorageLoadBalancer.lbStage_));
- /* register the load balancer verb handler */
- MessagingService.getMessagingInstance().registerVerbHandlers(StorageLoadBalancer.moveMessageVerbHandler_, new MoveMessageVerbHandler());
- }
-
- public void start()
- {
- /* Register with the Gossiper for EndPointState notifications */
- Gossiper.instance().register(this);
- }
-
- public void onChange(EndPoint endpoint, EndPointState epState)
- {
- // load information for this specified endpoint for load balancing
- ApplicationState loadInfoState = epState.getApplicationState(LoadDisseminator.loadInfo_);
- if ( loadInfoState != null )
- {
- String lInfoState = loadInfoState.getState();
- LoadInfo lInfo = new LoadInfo(lInfoState);
- loadInfo_.put(endpoint, lInfo);
-
- /*
- int currentLoad = Integer.parseInt(loadInfoState.getState());
- // update load information for this endpoint
- loadInfo_.put(endpoint, currentLoad);
-
- // clone load information to perform calculations
- loadInfo2_.putAll(loadInfo_);
- // Perform the analysis for load balance operations
- if ( isHeavyNode() )
- {
- if (logger_.isDebugEnabled())
- logger_.debug(StorageService.getLocalStorageEndPoint() + " is a heavy node with load " + localLoad());
- // lb_.schedule( new LoadBalancer(), StorageLoadBalancer.delay_, TimeUnit.MINUTES );
- }
- */
- }
- }
-
- /*
- * Load information associated with a given endpoint.
- */
- LoadInfo getLoad(EndPoint ep)
- {
- LoadInfo li = loadInfo_.get(ep);
- return li;
- }
-
- /*
- private boolean isMoveable()
- {
- if ( !isMoveable_.get() )
- return false;
- int myload = localLoad();
- EndPoint successor = storageService_.getSuccessor(StorageService.getLocalStorageEndPoint());
- LoadInfo li = loadInfo2_.get(successor);
- // "load" is NULL means that the successor node has not
- // yet gossiped its load information. We should return
- // false in this case since we want to err on the side
- // of caution.
- if ( li == null )
- return false;
- else
- {
- if ( ( myload + li.count() ) > StorageLoadBalancer.ratio_*averageSystemLoad() )
- return false;
- else
- return true;
- }
- }
- */
-
- /*
- private int localLoad()
- {
- LoadInfo value = loadInfo2_.get(StorageService.getLocalStorageEndPoint());
- return (value == null) ? 0 : value.count();
- }
- */
-
- /*
- private int averageSystemLoad()
- {
- int nodeCount = loadInfo2_.size();
- Set<EndPoint> nodes = loadInfo2_.keySet();
-
- int systemLoad = 0;
- for ( EndPoint node : nodes )
- {
- LoadInfo load = loadInfo2_.get(node);
- if ( load != null )
- systemLoad += load.count();
- }
- int averageLoad = (nodeCount > 0) ? (systemLoad / nodeCount) : 0;
- if (logger_.isDebugEnabled())
- logger_.debug("Average system load should be " + averageLoad);
- return averageLoad;
- }
- */
-
- /*
- private boolean isHeavyNode()
- {
- return ( localLoad() > ( StorageLoadBalancer.ratio_ * averageSystemLoad() ) );
- }
- */
-
- /*
- private boolean isMoveable(EndPoint target)
- {
- int threshold = (int)(StorageLoadBalancer.ratio_ * averageSystemLoad());
- if ( isANeighbour(target) )
- {
- // If the target is a neighbour then it is
- // moveable if its
- LoadInfo load = loadInfo2_.get(target);
- if ( load == null )
- return false;
- else
- {
- int myload = localLoad();
- int avgLoad = (load.count() + myload) >> 1;
- if ( avgLoad <= threshold )
- return true;
- else
- return false;
- }
- }
- else
- {
- EndPoint successor = storageService_.getSuccessor(target);
- LoadInfo sLoad = loadInfo2_.get(successor);
- LoadInfo targetLoad = loadInfo2_.get(target);
- if ( (sLoad.count() + targetLoad.count()) > threshold )
- return false;
- else
- return true;
- }
- }
- */
-
- private boolean isANeighbour(EndPoint neighbour)
- {
- EndPoint predecessor = storageService_.getPredecessor(StorageService.getLocalStorageEndPoint());
- if ( predecessor.equals(neighbour) )
- return true;
-
- EndPoint successor = storageService_.getSuccessor(StorageService.getLocalStorageEndPoint());
- if ( successor.equals(neighbour) )
- return true;
-
- return false;
- }
-
- /*
- * Determine the nodes that are lightly loaded. Choose at
- * random one of the lightly loaded nodes and use them as
- * a potential target for load balance.
- */
- /*
- private EndPoint findARandomLightNode()
- {
- List<EndPoint> potentialCandidates = new ArrayList<EndPoint>();
- Set<EndPoint> allTargets = loadInfo2_.keySet();
- int avgLoad = averageSystemLoad();
-
- for( EndPoint target : allTargets )
- {
- LoadInfo load = loadInfo2_.get(target);
- if ( load.count() < avgLoad )
- potentialCandidates.add(target);
- }
-
- if ( potentialCandidates.size() > 0 )
- {
- Random random = new Random();
- int index = random.nextInt(potentialCandidates.size());
- return potentialCandidates.get(index);
- }
- return null;
- }
- */
-}
-
-class MoveMessage implements Serializable
-{
- private Token targetToken_;
-
- private MoveMessage()
- {
- }
-
- MoveMessage(Token targetToken)
- {
- targetToken_ = targetToken;
- }
-
- Token getTargetToken()
- {
- return targetToken_;
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.SingleThreadedStage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndPointState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.IEndPointStateChangeSubscriber;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+
+/*
+ * The load balancing algorithm here is an implementation of
+ * the algorithm as described in the paper "Scalable range query
+ * processing for large-scale distributed database applications".
+ * This class keeps track of load information across the system.
+ * It registers itself with the Gossiper for ApplicationState namely
+ * load information i.e number of requests processed w.r.t distinct
+ * keys at an Endpoint. Monitor load information for a 5 minute
+ * interval and then do load balancing operations if necessary.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+final class StorageLoadBalancer implements IEndPointStateChangeSubscriber
+{
+ class LoadBalancer implements Runnable
+ {
+ LoadBalancer()
+ {
+ /* Copy the entries in loadInfo_ into loadInfo2_ and use it for all calculations */
+ loadInfo2_.putAll(loadInfo_);
+ }
+
+ /**
+ * Obtain a node which is a potential target. Start with
+ * the neighbours i.e either successor or predecessor.
+ * Send the target a MoveMessage. If the node cannot be
+ * relocated on the ring then we pick another candidate for
+ * relocation.
+ */
+ public void run()
+ {
+ /*
+ int threshold = (int)(StorageLoadBalancer.ratio_ * averageSystemLoad());
+ int myLoad = localLoad();
+ EndPoint predecessor = storageService_.getPredecessor(StorageService.getLocalStorageEndPoint());
+ if (logger_.isDebugEnabled())
+ logger_.debug("Trying to relocate the predecessor " + predecessor);
+ boolean value = tryThisNode(myLoad, threshold, predecessor);
+ if ( !value )
+ {
+ loadInfo2_.remove(predecessor);
+ EndPoint successor = storageService_.getSuccessor(StorageService.getLocalStorageEndPoint());
+ if (logger_.isDebugEnabled())
+ logger_.debug("Trying to relocate the successor " + successor);
+ value = tryThisNode(myLoad, threshold, successor);
+ if ( !value )
+ {
+ loadInfo2_.remove(successor);
+ while ( !loadInfo2_.isEmpty() )
+ {
+ EndPoint target = findARandomLightNode();
+ if ( target != null )
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug("Trying to relocate the random node " + target);
+ value = tryThisNode(myLoad, threshold, target);
+ if ( !value )
+ {
+ loadInfo2_.remove(target);
+ }
+ else
+ {
+ break;
+ }
+ }
+ else
+ {
+ // No light nodes available - this is NOT good.
+ logger_.warn("Not even a single lightly loaded node is available ...");
+ break;
+ }
+ }
+
+ loadInfo2_.clear();
+ // If we are here and no node was available to
+ // perform load balance with we need to report and bail.
+ if ( !value )
+ {
+ logger_.warn("Load Balancing operations weren't performed for this node");
+ }
+ }
+ }
+ */
+ }
+
+ /*
+ private boolean tryThisNode(int myLoad, int threshold, EndPoint target)
+ {
+ boolean value = false;
+ LoadInfo li = loadInfo2_.get(target);
+ int pLoad = li.count();
+ if ( ((myLoad + pLoad) >> 1) <= threshold )
+ {
+ //calculate the number of keys to be transferred
+ int keyCount = ( (myLoad - pLoad) >> 1 );
+ if (logger_.isDebugEnabled())
+ logger_.debug("Number of keys we attempt to transfer to " + target + " " + keyCount);
+ // Determine the token that the target should join at.
+ BigInteger targetToken = BootstrapAndLbHelper.getTokenBasedOnPrimaryCount(keyCount);
+ // Send a MoveMessage and see if this node is relocateable
+ MoveMessage moveMessage = new MoveMessage(targetToken);
+ Message message = new Message(StorageService.getLocalStorageEndPoint(), StorageLoadBalancer.lbStage_, StorageLoadBalancer.moveMessageVerbHandler_, new Object[]{moveMessage});
+ if (logger_.isDebugEnabled())
+ logger_.debug("Sending a move message to " + target);
+ IAsyncResult result = MessagingService.getMessagingInstance().sendRR(message, target);
+ value = (Boolean)result.get()[0];
+ if (logger_.isDebugEnabled())
+ logger_.debug("Response for query to relocate " + target + " is " + value);
+ }
+ return value;
+ }
+ */
+ }
+
+ class MoveMessageVerbHandler implements IVerbHandler
+ {
+ public void doVerb(Message message)
+ {
+ Message reply = message.getReply(StorageService.getLocalStorageEndPoint(), new byte[] {(byte)(isMoveable_.get() ? 1 : 0)});
+ MessagingService.getMessagingInstance().sendOneWay(reply, message.getFrom());
+ if ( isMoveable_.get() )
+ {
+ // MoveMessage moveMessage = (MoveMessage)message.getMessageBody()[0];
+ /* Start the leave operation and join the ring at the position specified */
+ isMoveable_.set(false);
+ }
+ }
+ }
+
+ private static final Logger logger_ = Logger.getLogger(StorageLoadBalancer.class);
+ private static final String lbStage_ = "LOAD-BALANCER-STAGE";
+ private static final String moveMessageVerbHandler_ = "MOVE-MESSAGE-VERB-HANDLER";
+ /* time to delay in minutes the actual load balance procedure if heavily loaded */
+ private static final int delay_ = 5;
+ /* Ratio of highest loaded node and the average load. */
+ private static final double ratio_ = 1.5;
+
+ private StorageService storageService_;
+ /* this indicates whether this node is already helping someone else */
+ private AtomicBoolean isMoveable_ = new AtomicBoolean(false);
+ private Map<EndPoint, LoadInfo> loadInfo_ = new HashMap<EndPoint, LoadInfo>();
+ /* This map is a clone of the one above and is used for various calculations during LB operation */
+ private Map<EndPoint, LoadInfo> loadInfo2_ = new HashMap<EndPoint, LoadInfo>();
+ /* This thread pool is used for initiating load balancing operations */
+ private ScheduledThreadPoolExecutor lb_ = new DebuggableScheduledThreadPoolExecutor(
+ 1,
+ new ThreadFactoryImpl("LB-OPERATIONS")
+ );
+ /* This thread pool is used by target node to leave the ring. */
+ private ExecutorService lbOperations_ = new DebuggableThreadPoolExecutor("LB-TARGET");
+
+ StorageLoadBalancer(StorageService storageService)
+ {
+ storageService_ = storageService;
+ /* register the load balancer stage */
+ StageManager.registerStage(StorageLoadBalancer.lbStage_, new SingleThreadedStage(StorageLoadBalancer.lbStage_));
+ /* register the load balancer verb handler */
+ MessagingService.getMessagingInstance().registerVerbHandlers(StorageLoadBalancer.moveMessageVerbHandler_, new MoveMessageVerbHandler());
+ }
+
+ public void start()
+ {
+ /* Register with the Gossiper for EndPointState notifications */
+ Gossiper.instance().register(this);
+ }
+
+ public void onChange(EndPoint endpoint, EndPointState epState)
+ {
+ // load information for this specified endpoint for load balancing
+ ApplicationState loadInfoState = epState.getApplicationState(LoadDisseminator.loadInfo_);
+ if ( loadInfoState != null )
+ {
+ String lInfoState = loadInfoState.getState();
+ LoadInfo lInfo = new LoadInfo(lInfoState);
+ loadInfo_.put(endpoint, lInfo);
+
+ /*
+ int currentLoad = Integer.parseInt(loadInfoState.getState());
+ // update load information for this endpoint
+ loadInfo_.put(endpoint, currentLoad);
+
+ // clone load information to perform calculations
+ loadInfo2_.putAll(loadInfo_);
+ // Perform the analysis for load balance operations
+ if ( isHeavyNode() )
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug(StorageService.getLocalStorageEndPoint() + " is a heavy node with load " + localLoad());
+ // lb_.schedule( new LoadBalancer(), StorageLoadBalancer.delay_, TimeUnit.MINUTES );
+ }
+ */
+ }
+ }
+
+ /*
+ * Load information associated with a given endpoint.
+ */
+ LoadInfo getLoad(EndPoint ep)
+ {
+ LoadInfo li = loadInfo_.get(ep);
+ return li;
+ }
+
+ /*
+ private boolean isMoveable()
+ {
+ if ( !isMoveable_.get() )
+ return false;
+ int myload = localLoad();
+ EndPoint successor = storageService_.getSuccessor(StorageService.getLocalStorageEndPoint());
+ LoadInfo li = loadInfo2_.get(successor);
+ // "load" is NULL means that the successor node has not
+ // yet gossiped its load information. We should return
+ // false in this case since we want to err on the side
+ // of caution.
+ if ( li == null )
+ return false;
+ else
+ {
+ if ( ( myload + li.count() ) > StorageLoadBalancer.ratio_*averageSystemLoad() )
+ return false;
+ else
+ return true;
+ }
+ }
+ */
+
+ /*
+ private int localLoad()
+ {
+ LoadInfo value = loadInfo2_.get(StorageService.getLocalStorageEndPoint());
+ return (value == null) ? 0 : value.count();
+ }
+ */
+
+ /*
+ private int averageSystemLoad()
+ {
+ int nodeCount = loadInfo2_.size();
+ Set<EndPoint> nodes = loadInfo2_.keySet();
+
+ int systemLoad = 0;
+ for ( EndPoint node : nodes )
+ {
+ LoadInfo load = loadInfo2_.get(node);
+ if ( load != null )
+ systemLoad += load.count();
+ }
+ int averageLoad = (nodeCount > 0) ? (systemLoad / nodeCount) : 0;
+ if (logger_.isDebugEnabled())
+ logger_.debug("Average system load should be " + averageLoad);
+ return averageLoad;
+ }
+ */
+
+ /*
+ private boolean isHeavyNode()
+ {
+ return ( localLoad() > ( StorageLoadBalancer.ratio_ * averageSystemLoad() ) );
+ }
+ */
+
+ /*
+ private boolean isMoveable(EndPoint target)
+ {
+ int threshold = (int)(StorageLoadBalancer.ratio_ * averageSystemLoad());
+ if ( isANeighbour(target) )
+ {
+ // If the target is a neighbour then it is
+ // moveable if its
+ LoadInfo load = loadInfo2_.get(target);
+ if ( load == null )
+ return false;
+ else
+ {
+ int myload = localLoad();
+ int avgLoad = (load.count() + myload) >> 1;
+ if ( avgLoad <= threshold )
+ return true;
+ else
+ return false;
+ }
+ }
+ else
+ {
+ EndPoint successor = storageService_.getSuccessor(target);
+ LoadInfo sLoad = loadInfo2_.get(successor);
+ LoadInfo targetLoad = loadInfo2_.get(target);
+ if ( (sLoad.count() + targetLoad.count()) > threshold )
+ return false;
+ else
+ return true;
+ }
+ }
+ */
+
+ private boolean isANeighbour(EndPoint neighbour)
+ {
+ EndPoint predecessor = storageService_.getPredecessor(StorageService.getLocalStorageEndPoint());
+ if ( predecessor.equals(neighbour) )
+ return true;
+
+ EndPoint successor = storageService_.getSuccessor(StorageService.getLocalStorageEndPoint());
+ if ( successor.equals(neighbour) )
+ return true;
+
+ return false;
+ }
+
+ /*
+ * Determine the nodes that are lightly loaded. Choose at
+ * random one of the lightly loaded nodes and use them as
+ * a potential target for load balance.
+ */
+ /*
+ private EndPoint findARandomLightNode()
+ {
+ List<EndPoint> potentialCandidates = new ArrayList<EndPoint>();
+ Set<EndPoint> allTargets = loadInfo2_.keySet();
+ int avgLoad = averageSystemLoad();
+
+ for( EndPoint target : allTargets )
+ {
+ LoadInfo load = loadInfo2_.get(target);
+ if ( load.count() < avgLoad )
+ potentialCandidates.add(target);
+ }
+
+ if ( potentialCandidates.size() > 0 )
+ {
+ Random random = new Random();
+ int index = random.nextInt(potentialCandidates.size());
+ return potentialCandidates.get(index);
+ }
+ return null;
+ }
+ */
+}
+
+class MoveMessage implements Serializable
+{
+ private Token targetToken_;
+
+ private MoveMessage()
+ {
+ }
+
+ MoveMessage(Token targetToken)
+ {
+ targetToken_ = targetToken;
+ }
+
+ Token getTargetToken()
+ {
+ return targetToken_;
+ }
+}