You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/21 00:36:33 UTC
svn commit: r901432 - in /incubator/cassandra/trunk:
src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/gms/
src/java/org/apache/cassandra/io/ src/java/org/apache...
Author: jbellis
Date: Wed Jan 20 23:36:32 2010
New Revision: 901432
URL: http://svn.apache.org/viewvc?rev=901432&view=rev
Log:
make shutdown non-reentrant and make MS a true singleton
patch by jbellis; tested by Brandon Williams for CASSANDRA-705
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java?rev=901432&r1=901431&r2=901432&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java Wed Jan 20 23:36:32 2010
@@ -83,12 +83,12 @@
/**
* This method shuts down all registered stages.
*/
- public static void shutdown()
+ public static void shutdownNow()
{
Set<String> stages = StageManager.stages.keySet();
for (String stage : stages)
{
- StageManager.stages.get(stage).shutdown();
+ StageManager.stages.get(stage).shutdownNow();
}
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=901432&r1=901431&r2=901432&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Wed Jan 20 23:36:32 2010
@@ -19,13 +19,9 @@
package org.apache.cassandra.db;
import java.util.Collection;
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import java.io.IOException;
import org.apache.log4j.Logger;
@@ -128,7 +124,7 @@
}
Message message = rm.makeRowMutationMessage();
WriteResponseHandler responseHandler = new WriteResponseHandler(1);
- MessagingService.instance().sendRR(message, new InetAddress[] { endPoint }, responseHandler);
+ MessagingService.instance.sendRR(message, new InetAddress[] { endPoint }, responseHandler);
try
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=901432&r1=901431&r2=901432&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java Wed Jan 20 23:36:32 2010
@@ -101,7 +101,7 @@
Message response = message.getReply(FBUtilities.getLocalAddress(), bytes);
if (logger_.isDebugEnabled())
logger_.debug("Read key " + command.key + "; sending response to " + message.getMessageId() + "@" + message.getFrom());
- MessagingService.instance().sendOneWay(response, message.getFrom());
+ MessagingService.instance.sendOneWay(response, message.getFrom());
/* Do read repair if header of the message says so */
if (message.getHeader(ReadCommand.DO_REPAIR) != null)
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=901432&r1=901431&r2=901432&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Wed Jan 20 23:36:32 2010
@@ -62,7 +62,7 @@
Message responseMessage = WriteResponse.makeWriteResponseMessage(message, response);
if (logger_.isDebugEnabled())
logger_.debug(rm + " applied. Sending response to " + message.getMessageId() + "@" + message.getFrom());
- MessagingService.instance().sendOneWay(responseMessage, message.getFrom());
+ MessagingService.instance.sendOneWay(responseMessage, message.getFrom());
}
catch (IOException e)
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=901432&r1=901431&r2=901432&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Wed Jan 20 23:36:32 2010
@@ -182,7 +182,7 @@
{
Message message = new Message(FBUtilities.getLocalAddress(), "", StorageService.bootstrapTokenVerbHandler_, ArrayUtils.EMPTY_BYTE_ARRAY);
BootstrapTokenCallback btc = new BootstrapTokenCallback();
- MessagingService.instance().sendRR(message, maxEndpoint, btc);
+ MessagingService.instance.sendRR(message, maxEndpoint, btc);
return btc.getToken();
}
@@ -230,7 +230,7 @@
{
throw new AssertionError();
}
- MessagingService.instance().sendOneWay(response, message.getFrom());
+ MessagingService.instance.sendOneWay(response, message.getFrom());
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=901432&r1=901431&r2=901432&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Wed Jan 20 23:36:32 2010
@@ -137,10 +137,10 @@
/* register with the Failure Detector for receiving Failure detector events */
FailureDetector.instance.registerFailureDetectionEventListener(this);
/* register the verbs */
- MessagingService.instance().registerVerbHandlers(JOIN_VERB_HANDLER, new JoinVerbHandler());
- MessagingService.instance().registerVerbHandlers(GOSSIP_DIGEST_SYN_VERB, new GossipDigestSynVerbHandler());
- MessagingService.instance().registerVerbHandlers(GOSSIP_DIGEST_ACK_VERB, new GossipDigestAckVerbHandler());
- MessagingService.instance().registerVerbHandlers(GOSSIP_DIGEST_ACK2_VERB, new GossipDigestAck2VerbHandler());
+ MessagingService.instance.registerVerbHandlers(JOIN_VERB_HANDLER, new JoinVerbHandler());
+ MessagingService.instance.registerVerbHandlers(GOSSIP_DIGEST_SYN_VERB, new GossipDigestSynVerbHandler());
+ MessagingService.instance.registerVerbHandlers(GOSSIP_DIGEST_ACK_VERB, new GossipDigestAckVerbHandler());
+ MessagingService.instance.registerVerbHandlers(GOSSIP_DIGEST_ACK2_VERB, new GossipDigestAck2VerbHandler());
}
/** Register with the Gossiper for EndPointState notifications */
@@ -308,7 +308,7 @@
InetAddress to = liveEndPoints.get(index);
if (logger_.isTraceEnabled())
logger_.trace("Sending a GossipDigestSynMessage to " + to + " ...");
- MessagingService.instance().sendUdpOneWay(message, to);
+ MessagingService.instance.sendUdpOneWay(message, to);
return seeds_.contains(to);
}
@@ -887,7 +887,7 @@
Message gDigestAckMessage = Gossiper.instance.makeGossipDigestAckMessage(gDigestAck);
if (logger_.isTraceEnabled())
logger_.trace("Sending a GossipDigestAckMessage to " + from);
- MessagingService.instance().sendUdpOneWay(gDigestAckMessage, from);
+ MessagingService.instance.sendUdpOneWay(gDigestAckMessage, from);
}
catch (IOException e)
{
@@ -979,7 +979,7 @@
Message gDigestAck2Message = Gossiper.instance.makeGossipDigestAck2Message(gDigestAck2);
if (logger_.isTraceEnabled())
logger_.trace("Sending a GossipDigestAck2Message to " + from);
- MessagingService.instance().sendUdpOneWay(gDigestAck2Message, from);
+ MessagingService.instance.sendUdpOneWay(gDigestAck2Message, from);
}
catch ( IOException e )
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java?rev=901432&r1=901431&r2=901432&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java Wed Jan 20 23:36:32 2010
@@ -131,7 +131,7 @@
message.addHeader(Streaming.TABLE_NAME, table.getBytes());
if (logger.isDebugEnabled())
logger.debug("Sending a stream initiate message to " + target + " ...");
- MessagingService.instance().sendOneWay(message, target);
+ MessagingService.instance.sendOneWay(message, target);
if (streamContexts.length > 0)
{
@@ -149,7 +149,7 @@
{
StreamRequestMetadata streamRequestMetadata = new StreamRequestMetadata(FBUtilities.getLocalAddress(), ranges);
Message message = StreamRequestMessage.makeStreamRequestMessage(new StreamRequestMessage(streamRequestMetadata));
- MessagingService.instance().sendOneWay(message, source);
+ MessagingService.instance.sendOneWay(message, source);
}
public static class StreamInitiateVerbHandler implements IVerbHandler
@@ -202,7 +202,7 @@
if (logger.isDebugEnabled())
logger.debug("Sending a stream initiate done message ...");
Message doneMessage = new Message(FBUtilities.getLocalAddress(), "", StorageService.streamInitiateDoneVerbHandler_, new byte[0] );
- MessagingService.instance().sendOneWay(doneMessage, message.getFrom());
+ MessagingService.instance.sendOneWay(doneMessage, message.getFrom());
}
catch (IOException ex)
{
@@ -314,7 +314,7 @@
/* Send a StreamStatusMessage object which may require the source node to re-stream certain files. */
StreamContextManager.StreamStatusMessage streamStatusMessage = new StreamContextManager.StreamStatusMessage(streamStatus);
Message message = StreamContextManager.StreamStatusMessage.makeStreamStatusMessage(streamStatusMessage);
- MessagingService.instance().sendOneWay(message, host);
+ MessagingService.instance.sendOneWay(message, host);
/* If we're done with everything for this host, remove from bootstrap sources */
if (StreamContextManager.isDone(host) && StorageService.instance.isBootstrapMode())
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java?rev=901432&r1=901431&r2=901432&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java Wed Jan 20 23:36:32 2010
@@ -33,7 +33,7 @@
public void run()
{
String verb = message_.getVerb();
- IVerbHandler verbHandler = MessagingService.instance().getVerbHandler(verb);
+ IVerbHandler verbHandler = MessagingService.instance.getVerbHandler(verb);
assert verbHandler != null : "unknown verb " + verb;
verbHandler.doVerb(message_);
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=901432&r1=901431&r2=901432&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Wed Jan 20 23:36:32 2010
@@ -58,12 +58,6 @@
private static ICachetable<String, IAsyncCallback> callbackMap_;
private static ICachetable<String, IAsyncResult> taskCompletionMap_;
- /* List of sockets we are listening on */
- private static Map<InetAddress, SelectionKey> listenSockets_ = new HashMap<InetAddress, SelectionKey>();
-
- /* List of UdpConnections we are listening on */
- private static Map<InetAddress, UdpConnection> udpConnections_ = new HashMap<InetAddress, UdpConnection>();
-
/* Lookup table for registering message handlers based on the verb. */
private static Map<String, IVerbHandler> verbHandlers_;
@@ -78,33 +72,15 @@
private static NonBlockingHashMap<String, TcpConnectionManager> connectionManagers_ = new NonBlockingHashMap<String, TcpConnectionManager>();
- private static volatile boolean bShutdown_ = false;
-
private static Logger logger_ = Logger.getLogger(MessagingService.class);
- private static volatile MessagingService messagingService_ = new MessagingService();
+ public static final MessagingService instance = new MessagingService();
public static int getVersion()
{
return version_;
}
- public static MessagingService instance()
- {
- if ( bShutdown_ )
- {
- synchronized (MessagingService.class)
- {
- if ( bShutdown_ )
- {
- messagingService_ = new MessagingService();
- bShutdown_ = false;
- }
- }
- }
- return messagingService_;
- }
-
public Object clone() throws CloneNotSupportedException
{
//Prevents the singleton from being cloned
@@ -179,7 +155,6 @@
SelectionKeyHandler handler = new TcpConnectionHandler(localEp);
SelectionKey key = SelectorManager.getSelectorManager().register(serverChannel, handler, SelectionKey.OP_ACCEPT);
- listenSockets_.put(localEp, key);
FailureDetector.instance.registerFailureDetectionEventListener(this);
}
@@ -195,7 +170,6 @@
try
{
connection.init(localEp);
- udpConnections_.put(localEp, connection);
}
catch (IOException e)
{
@@ -421,47 +395,17 @@
public static void shutdown()
{
logger_.info("Shutting down ...");
- synchronized (MessagingService.class)
- {
- FailureDetector.instance.unregisterFailureDetectionEventListener(MessagingService.instance());
- /* Stop listening on any TCP socket */
- for (SelectionKey skey : listenSockets_.values())
- {
- skey.cancel();
- try
- {
- skey.channel().close();
- }
- catch (IOException e) {}
- }
- listenSockets_.clear();
- /* Stop listening on any UDP ports. */
- for (UdpConnection con : udpConnections_.values())
- {
- con.close();
- }
- udpConnections_.clear();
+ messageReadExecutor_.shutdownNow();
+ messageDeserializerExecutor_.shutdownNow();
+ streamExecutor_.shutdownNow();
+ StageManager.shutdownNow();
+
+ /* shut down the cachetables */
+ taskCompletionMap_.shutdown();
+ callbackMap_.shutdown();
- /* Shutdown the threads in the EventQueue's */
- messageReadExecutor_.shutdownNow();
- messageDeserializerExecutor_.shutdownNow();
- streamExecutor_.shutdownNow();
-
- StageManager.shutdown();
-
- /* shut down the cachetables */
- taskCompletionMap_.shutdown();
- callbackMap_.shutdown();
-
- /* Interrupt the selector manager thread */
- SelectorManager.getSelectorManager().interrupt();
-
- connectionManagers_.clear();
- verbHandlers_.clear();
- bShutdown_ = true;
- }
- logger_.info("Shutdown invocation complete.");
+ logger_.info("Shutdown complete (no further commands will be processed)");
}
public static void receive(Message message)
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=901432&r1=901431&r2=901432&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java Wed Jan 20 23:36:32 2010
@@ -120,8 +120,8 @@
*/
protected AntiEntropyService()
{
- MessagingService.instance().registerVerbHandlers(TREE_REQUEST_VERB, new TreeRequestVerbHandler());
- MessagingService.instance().registerVerbHandlers(TREE_RESPONSE_VERB, new TreeResponseVerbHandler());
+ MessagingService.instance.registerVerbHandlers(TREE_REQUEST_VERB, new TreeRequestVerbHandler());
+ MessagingService.instance.registerVerbHandlers(TREE_RESPONSE_VERB, new TreeResponseVerbHandler());
naturalRepairs = new ConcurrentHashMap<CFPair, Long>();
trees = new HashMap<CFPair, Cachetable<InetAddress, TreePair>>();
}
@@ -225,7 +225,7 @@
*/
void notifyNeighbors(Validator validator, InetAddress local, Collection<InetAddress> neighbors)
{
- MessagingService ms = MessagingService.instance();
+ MessagingService ms = MessagingService.instance;
try
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java?rev=901432&r1=901431&r2=901432&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java Wed Jan 20 23:36:32 2010
@@ -89,7 +89,7 @@
Message message = readCommand.makeReadMessage();
if (logger_.isDebugEnabled())
logger_.debug("Performing read repair for " + readCommand_.key + " to " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]");
- MessagingService.instance().sendRR(message, replicas_.toArray(new InetAddress[replicas_.size()]), responseHandler);
+ MessagingService.instance.sendRR(message, replicas_.toArray(new InetAddress[replicas_.size()]), responseHandler);
}
}
@@ -153,7 +153,7 @@
Message message = readCommandDigestOnly.makeReadMessage();
if (logger_.isDebugEnabled())
logger_.debug("Reading consistency digest for " + readCommand_.key + " from " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]");
- MessagingService.instance().sendRR(message, replicas_.toArray(new InetAddress[replicas_.size()]), new DigestResponseHandler());
+ MessagingService.instance.sendRR(message, replicas_.toArray(new InetAddress[replicas_.size()]), new DigestResponseHandler());
}
catch (IOException ex)
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java?rev=901432&r1=901431&r2=901432&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java Wed Jan 20 23:36:32 2010
@@ -46,7 +46,7 @@
Message response = reply.getReply(message);
if (logger.isDebugEnabled())
logger.debug("Sending " + reply+ " to " + message.getMessageId() + "@" + message.getFrom());
- MessagingService.instance().sendOneWay(response, message.getFrom());
+ MessagingService.instance.sendOneWay(response, message.getFrom());
}
catch (Exception ex)
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java?rev=901432&r1=901431&r2=901432&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java Wed Jan 20 23:36:32 2010
@@ -20,7 +20,6 @@
import java.io.IOException;
import java.io.IOError;
-import java.util.concurrent.locks.*;
import org.apache.cassandra.db.RowMutationMessage;
import java.net.InetAddress;
@@ -74,7 +73,7 @@
{
throw new RuntimeException(e);
}
- MessagingService.instance().sendOneWay(message, to);
+ MessagingService.instance.sendOneWay(message, to);
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=901432&r1=901431&r2=901432&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java Wed Jan 20 23:36:32 2010
@@ -154,7 +154,7 @@
public void doVerb(Message message)
{
Message reply = message.getReply(FBUtilities.getLocalAddress(), new byte[] {(byte)(isMoveable_.get() ? 1 : 0)});
- MessagingService.instance().sendOneWay(reply, message.getFrom());
+ MessagingService.instance.sendOneWay(reply, message.getFrom());
if ( isMoveable_.get() )
{
// MoveMessage moveMessage = (MoveMessage)message.getMessageBody()[0];
@@ -190,7 +190,7 @@
private StorageLoadBalancer()
{
- MessagingService.instance().registerVerbHandlers(StorageLoadBalancer.moveMessageVerbHandler_, new MoveMessageVerbHandler());
+ MessagingService.instance.registerVerbHandlers(StorageLoadBalancer.moveMessageVerbHandler_, new MoveMessageVerbHandler());
Gossiper.instance.register(this);
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=901432&r1=901431&r2=901432&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed Jan 20 23:36:32 2010
@@ -137,7 +137,7 @@
unhintedMessage = rm.makeRowMutationMessage();
if (logger.isDebugEnabled())
logger.debug("insert writing key " + rm.key() + " to " + unhintedMessage.getMessageId() + "@" + target);
- MessagingService.instance().sendOneWay(unhintedMessage, target);
+ MessagingService.instance.sendOneWay(unhintedMessage, target);
}
}
else
@@ -146,7 +146,7 @@
hintedMessage.addHeader(RowMutation.HINT, target.getAddress());
if (logger.isDebugEnabled())
logger.debug("insert writing key " + rm.key() + " to " + hintedMessage.getMessageId() + "@" + hintedTarget + " for " + target);
- MessagingService.instance().sendOneWay(hintedMessage, hintedTarget);
+ MessagingService.instance.sendOneWay(hintedMessage, hintedTarget);
}
}
}
@@ -202,11 +202,11 @@
if (unhintedMessage == null)
{
unhintedMessage = rm.makeRowMutationMessage();
- MessagingService.instance().addCallback(responseHandler, unhintedMessage.getMessageId());
+ MessagingService.instance.addCallback(responseHandler, unhintedMessage.getMessageId());
}
if (logger.isDebugEnabled())
logger.debug("insert writing key " + rm.key() + " to " + unhintedMessage.getMessageId() + "@" + naturalTarget);
- MessagingService.instance().sendOneWay(unhintedMessage, naturalTarget);
+ MessagingService.instance.sendOneWay(unhintedMessage, naturalTarget);
}
}
else
@@ -216,7 +216,7 @@
hintedMessage.addHeader(RowMutation.HINT, naturalTarget.getAddress());
if (logger.isDebugEnabled())
logger.debug("insert writing key " + rm.key() + " to " + hintedMessage.getMessageId() + "@" + maybeHintedTarget + " for " + naturalTarget);
- MessagingService.instance().sendOneWay(hintedMessage, maybeHintedTarget);
+ MessagingService.instance.sendOneWay(hintedMessage, maybeHintedTarget);
}
}
}
@@ -326,7 +326,7 @@
if (logger.isDebugEnabled())
logger.debug("weakreadremote reading " + command + " from " + message.getMessageId() + "@" + endPoint);
message.addHeader(ReadCommand.DO_REPAIR, ReadCommand.DO_REPAIR.getBytes());
- iars.add(MessagingService.instance().sendRR(message, endPoint));
+ iars.add(MessagingService.instance.sendRR(message, endPoint));
}
for (IAsyncResult iar: iars)
@@ -436,7 +436,7 @@
logger.debug("strongread reading " + (m == message ? "data" : "digest") + " for " + command + " from " + m.getMessageId() + "@" + endpoint);
}
QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(DatabaseDescriptor.getQuorum(), new ReadResponseResolver(command.table, responseCount));
- MessagingService.instance().sendRR(messages, endPoints, quorumResponseHandler);
+ MessagingService.instance.sendRR(messages, endPoints, quorumResponseHandler);
quorumResponseHandlers.add(quorumResponseHandler);
commandEndPoints.add(endPoints);
}
@@ -465,7 +465,7 @@
readResponseResolverRepair);
logger.info("DigestMismatchException: " + ex.getMessage());
Message messageRepair = command.makeReadMessage();
- MessagingService.instance().sendRR(messageRepair, commandEndPoints.get(commandIndex), quorumResponseHandlerRepair);
+ MessagingService.instance.sendRR(messageRepair, commandEndPoints.get(commandIndex), quorumResponseHandlerRepair);
try
{
row = quorumResponseHandlerRepair.get();
@@ -556,7 +556,7 @@
logger.debug("reading " + command + " for " + primaryRange + " from " + message.getMessageId() + "@" + endPoint);
for (InetAddress replicaEndpoint : endpoints)
{
- MessagingService.instance().sendRR(message, replicaEndpoint, handler);
+ MessagingService.instance.sendRR(message, replicaEndpoint, handler);
}
// if we're done, great, otherwise, move to the next range
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=901432&r1=901431&r2=901432&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed Jan 20 23:36:32 2010
@@ -188,17 +188,17 @@
endPointSnitch_ = DatabaseDescriptor.getEndPointSnitch();
/* register the verb handlers */
- MessagingService.instance().registerVerbHandlers(binaryVerbHandler_, new BinaryVerbHandler());
- MessagingService.instance().registerVerbHandlers(mutationVerbHandler_, new RowMutationVerbHandler());
- MessagingService.instance().registerVerbHandlers(readRepairVerbHandler_, new ReadRepairVerbHandler());
- MessagingService.instance().registerVerbHandlers(readVerbHandler_, new ReadVerbHandler());
- MessagingService.instance().registerVerbHandlers(rangeSliceVerbHandler_, new RangeSliceVerbHandler());
+ MessagingService.instance.registerVerbHandlers(binaryVerbHandler_, new BinaryVerbHandler());
+ MessagingService.instance.registerVerbHandlers(mutationVerbHandler_, new RowMutationVerbHandler());
+ MessagingService.instance.registerVerbHandlers(readRepairVerbHandler_, new ReadRepairVerbHandler());
+ MessagingService.instance.registerVerbHandlers(readVerbHandler_, new ReadVerbHandler());
+ MessagingService.instance.registerVerbHandlers(rangeSliceVerbHandler_, new RangeSliceVerbHandler());
// see BootStrapper for a summary of how the bootstrap verbs interact
- MessagingService.instance().registerVerbHandlers(bootstrapTokenVerbHandler_, new BootStrapper.BootstrapTokenVerbHandler());
- MessagingService.instance().registerVerbHandlers(streamRequestVerbHandler_, new StreamRequestVerbHandler() );
- MessagingService.instance().registerVerbHandlers(streamInitiateVerbHandler_, new Streaming.StreamInitiateVerbHandler());
- MessagingService.instance().registerVerbHandlers(streamInitiateDoneVerbHandler_, new Streaming.StreamInitiateDoneVerbHandler());
- MessagingService.instance().registerVerbHandlers(streamFinishedVerbHandler_, new Streaming.StreamFinishedVerbHandler());
+ MessagingService.instance.registerVerbHandlers(bootstrapTokenVerbHandler_, new BootStrapper.BootstrapTokenVerbHandler());
+ MessagingService.instance.registerVerbHandlers(streamRequestVerbHandler_, new StreamRequestVerbHandler() );
+ MessagingService.instance.registerVerbHandlers(streamInitiateVerbHandler_, new Streaming.StreamInitiateVerbHandler());
+ MessagingService.instance.registerVerbHandlers(streamInitiateDoneVerbHandler_, new Streaming.StreamInitiateDoneVerbHandler());
+ MessagingService.instance.registerVerbHandlers(streamFinishedVerbHandler_, new Streaming.StreamFinishedVerbHandler());
replicationStrategy_ = getReplicationStrategy(tokenMetadata_);
}
@@ -231,8 +231,8 @@
{
isClientMode = true;
logger_.info("Starting up client gossip");
- MessagingService.instance().listen(FBUtilities.getLocalAddress());
- MessagingService.instance().listenUDP(FBUtilities.getLocalAddress());
+ MessagingService.instance.listen(FBUtilities.getLocalAddress());
+ MessagingService.instance.listenUDP(FBUtilities.getLocalAddress());
SelectorManager.getSelectorManager().start();
SelectorManager.getUdpSelectorManager().start();
@@ -249,9 +249,9 @@
logger_.info("Starting up server gossip");
/* Listen for application messages */
- MessagingService.instance().listen(FBUtilities.getLocalAddress());
+ MessagingService.instance.listen(FBUtilities.getLocalAddress());
/* Listen for control messages */
- MessagingService.instance().listenUDP(FBUtilities.getLocalAddress());
+ MessagingService.instance.listenUDP(FBUtilities.getLocalAddress());
SelectorManager.getSelectorManager().start();
SelectorManager.getUdpSelectorManager().start();
@@ -961,7 +961,7 @@
public void forceTableRepair(final String tableName, final String... columnFamilies) throws IOException
{
// request that all relevant endpoints generate trees
- final MessagingService ms = MessagingService.instance();
+ final MessagingService ms = MessagingService.instance;
final List<InetAddress> endpoints = getNaturalEndpoints(getLocalToken());
for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, columnFamilies))
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java?rev=901432&r1=901431&r2=901432&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java Wed Jan 20 23:36:32 2010
@@ -83,7 +83,7 @@
File file = filesToStream_.get(0);
if (logger_.isDebugEnabled())
logger_.debug("Streaming " + file.length() + " length file " + file + " ...");
- MessagingService.instance().stream(file.getAbsolutePath(), 0L, file.length(), FBUtilities.getLocalAddress(), to_);
+ MessagingService.instance.stream(file.getAbsolutePath(), 0L, file.length(), FBUtilities.getLocalAddress(), to_);
}
}
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableExportTest.java?rev=901432&r1=901431&r2=901432&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableExportTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableExportTest.java Wed Jan 20 23:36:32 2010
@@ -83,8 +83,7 @@
}
@Test
- public void testExportSimpleCf() throws IOException
- {
+ public void testExportSimpleCf() throws IOException {
File tempSS = createTemporarySSTable("Keyspace1", "Standard1");
ColumnFamily cfamily = ColumnFamily.create("Keyspace1", "Standard1");
IPartitioner<?> partitioner = DatabaseDescriptor.getPartitioner();