You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/21 00:36:33 UTC

svn commit: r901432 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/gms/ src/java/org/apache/cassandra/io/ src/java/org/apache...

Author: jbellis
Date: Wed Jan 20 23:36:32 2010
New Revision: 901432

URL: http://svn.apache.org/viewvc?rev=901432&view=rev
Log:
make shutdown non-reentrant and make MS a true singleton
patch by jbellis; tested by Brandon Williams for CASSANDRA-705

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableExportTest.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java?rev=901432&r1=901431&r2=901432&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java Wed Jan 20 23:36:32 2010
@@ -83,12 +83,12 @@
     /**
      * This method shuts down all registered stages.
      */
-    public static void shutdown()
+    public static void shutdownNow()
     {
         Set<String> stages = StageManager.stages.keySet();
         for (String stage : stages)
         {
-            StageManager.stages.get(stage).shutdown();
+            StageManager.stages.get(stage).shutdownNow();
         }
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=901432&r1=901431&r2=901432&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Wed Jan 20 23:36:32 2010
@@ -19,13 +19,9 @@
 package org.apache.cassandra.db;
 
 import java.util.Collection;
-import java.util.Timer;
-import java.util.TimerTask;
 import java.util.Arrays;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 import java.io.IOException;
 
 import org.apache.log4j.Logger;
@@ -128,7 +124,7 @@
         }
         Message message = rm.makeRowMutationMessage();
         WriteResponseHandler responseHandler = new WriteResponseHandler(1);
-        MessagingService.instance().sendRR(message, new InetAddress[] { endPoint }, responseHandler);
+        MessagingService.instance.sendRR(message, new InetAddress[] { endPoint }, responseHandler);
 
         try
         {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=901432&r1=901431&r2=901432&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java Wed Jan 20 23:36:32 2010
@@ -101,7 +101,7 @@
             Message response = message.getReply(FBUtilities.getLocalAddress(), bytes);
             if (logger_.isDebugEnabled())
               logger_.debug("Read key " + command.key + "; sending response to " + message.getMessageId() + "@" + message.getFrom());
-            MessagingService.instance().sendOneWay(response, message.getFrom());
+            MessagingService.instance.sendOneWay(response, message.getFrom());
 
             /* Do read repair if header of the message says so */
             if (message.getHeader(ReadCommand.DO_REPAIR) != null)

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=901432&r1=901431&r2=901432&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Wed Jan 20 23:36:32 2010
@@ -62,7 +62,7 @@
             Message responseMessage = WriteResponse.makeWriteResponseMessage(message, response);
             if (logger_.isDebugEnabled())
               logger_.debug(rm + " applied.  Sending response to " + message.getMessageId() + "@" + message.getFrom());
-            MessagingService.instance().sendOneWay(responseMessage, message.getFrom());
+            MessagingService.instance.sendOneWay(responseMessage, message.getFrom());
         }
         catch (IOException e)
         {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=901432&r1=901431&r2=901432&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Wed Jan 20 23:36:32 2010
@@ -182,7 +182,7 @@
     {
         Message message = new Message(FBUtilities.getLocalAddress(), "", StorageService.bootstrapTokenVerbHandler_, ArrayUtils.EMPTY_BYTE_ARRAY);
         BootstrapTokenCallback btc = new BootstrapTokenCallback();
-        MessagingService.instance().sendRR(message, maxEndpoint, btc);
+        MessagingService.instance.sendRR(message, maxEndpoint, btc);
         return btc.getToken();
     }
 
@@ -230,7 +230,7 @@
             {
                 throw new AssertionError();
             }
-            MessagingService.instance().sendOneWay(response, message.getFrom());
+            MessagingService.instance.sendOneWay(response, message.getFrom());
         }
     }
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=901432&r1=901431&r2=901432&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Wed Jan 20 23:36:32 2010
@@ -137,10 +137,10 @@
         /* register with the Failure Detector for receiving Failure detector events */
         FailureDetector.instance.registerFailureDetectionEventListener(this);
         /* register the verbs */
-        MessagingService.instance().registerVerbHandlers(JOIN_VERB_HANDLER, new JoinVerbHandler());
-        MessagingService.instance().registerVerbHandlers(GOSSIP_DIGEST_SYN_VERB, new GossipDigestSynVerbHandler());
-        MessagingService.instance().registerVerbHandlers(GOSSIP_DIGEST_ACK_VERB, new GossipDigestAckVerbHandler());
-        MessagingService.instance().registerVerbHandlers(GOSSIP_DIGEST_ACK2_VERB, new GossipDigestAck2VerbHandler());
+        MessagingService.instance.registerVerbHandlers(JOIN_VERB_HANDLER, new JoinVerbHandler());
+        MessagingService.instance.registerVerbHandlers(GOSSIP_DIGEST_SYN_VERB, new GossipDigestSynVerbHandler());
+        MessagingService.instance.registerVerbHandlers(GOSSIP_DIGEST_ACK_VERB, new GossipDigestAckVerbHandler());
+        MessagingService.instance.registerVerbHandlers(GOSSIP_DIGEST_ACK2_VERB, new GossipDigestAck2VerbHandler());
     }
 
     /** Register with the Gossiper for EndPointState notifications */
@@ -308,7 +308,7 @@
         InetAddress to = liveEndPoints.get(index);
         if (logger_.isTraceEnabled())
             logger_.trace("Sending a GossipDigestSynMessage to " + to + " ...");
-        MessagingService.instance().sendUdpOneWay(message, to);
+        MessagingService.instance.sendUdpOneWay(message, to);
         return seeds_.contains(to);
     }
 
@@ -887,7 +887,7 @@
             Message gDigestAckMessage = Gossiper.instance.makeGossipDigestAckMessage(gDigestAck);
             if (logger_.isTraceEnabled())
                 logger_.trace("Sending a GossipDigestAckMessage to " + from);
-            MessagingService.instance().sendUdpOneWay(gDigestAckMessage, from);
+            MessagingService.instance.sendUdpOneWay(gDigestAckMessage, from);
         }
         catch (IOException e)
         {
@@ -979,7 +979,7 @@
             Message gDigestAck2Message = Gossiper.instance.makeGossipDigestAck2Message(gDigestAck2);
             if (logger_.isTraceEnabled())
                 logger_.trace("Sending a GossipDigestAck2Message to " + from);
-            MessagingService.instance().sendUdpOneWay(gDigestAck2Message, from);
+            MessagingService.instance.sendUdpOneWay(gDigestAck2Message, from);
         }
         catch ( IOException e )
         {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java?rev=901432&r1=901431&r2=901432&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java Wed Jan 20 23:36:32 2010
@@ -131,7 +131,7 @@
         message.addHeader(Streaming.TABLE_NAME, table.getBytes());
         if (logger.isDebugEnabled())
           logger.debug("Sending a stream initiate message to " + target + " ...");
-        MessagingService.instance().sendOneWay(message, target);
+        MessagingService.instance.sendOneWay(message, target);
 
         if (streamContexts.length > 0)
         {
@@ -149,7 +149,7 @@
     {
         StreamRequestMetadata streamRequestMetadata = new StreamRequestMetadata(FBUtilities.getLocalAddress(), ranges);
         Message message = StreamRequestMessage.makeStreamRequestMessage(new StreamRequestMessage(streamRequestMetadata));
-        MessagingService.instance().sendOneWay(message, source);
+        MessagingService.instance.sendOneWay(message, source);
     }
 
     public static class StreamInitiateVerbHandler implements IVerbHandler
@@ -202,7 +202,7 @@
                 if (logger.isDebugEnabled())
                   logger.debug("Sending a stream initiate done message ...");
                 Message doneMessage = new Message(FBUtilities.getLocalAddress(), "", StorageService.streamInitiateDoneVerbHandler_, new byte[0] );
-                MessagingService.instance().sendOneWay(doneMessage, message.getFrom());
+                MessagingService.instance.sendOneWay(doneMessage, message.getFrom());
             }
             catch (IOException ex)
             {
@@ -314,7 +314,7 @@
             /* Send a StreamStatusMessage object which may require the source node to re-stream certain files. */
             StreamContextManager.StreamStatusMessage streamStatusMessage = new StreamContextManager.StreamStatusMessage(streamStatus);
             Message message = StreamContextManager.StreamStatusMessage.makeStreamStatusMessage(streamStatusMessage);
-            MessagingService.instance().sendOneWay(message, host);
+            MessagingService.instance.sendOneWay(message, host);
 
             /* If we're done with everything for this host, remove from bootstrap sources */
             if (StreamContextManager.isDone(host) && StorageService.instance.isBootstrapMode())

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java?rev=901432&r1=901431&r2=901432&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java Wed Jan 20 23:36:32 2010
@@ -33,7 +33,7 @@
     public void run()
     { 
         String verb = message_.getVerb();
-        IVerbHandler verbHandler = MessagingService.instance().getVerbHandler(verb);
+        IVerbHandler verbHandler = MessagingService.instance.getVerbHandler(verb);
         assert verbHandler != null : "unknown verb " + verb;
         verbHandler.doVerb(message_);
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=901432&r1=901431&r2=901432&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Wed Jan 20 23:36:32 2010
@@ -58,12 +58,6 @@
     private static ICachetable<String, IAsyncCallback> callbackMap_;
     private static ICachetable<String, IAsyncResult> taskCompletionMap_;
     
-    /* List of sockets we are listening on */
-    private static Map<InetAddress, SelectionKey> listenSockets_ = new HashMap<InetAddress, SelectionKey>();
-
-    /* List of UdpConnections we are listening on */
-    private static Map<InetAddress, UdpConnection> udpConnections_ = new HashMap<InetAddress, UdpConnection>();
-    
     /* Lookup table for registering message handlers based on the verb. */
     private static Map<String, IVerbHandler> verbHandlers_;
 
@@ -78,33 +72,15 @@
     
     private static NonBlockingHashMap<String, TcpConnectionManager> connectionManagers_ = new NonBlockingHashMap<String, TcpConnectionManager>();
     
-    private static volatile boolean bShutdown_ = false;
-    
     private static Logger logger_ = Logger.getLogger(MessagingService.class);
     
-    private static volatile MessagingService messagingService_ = new MessagingService();
+    public static final MessagingService instance = new MessagingService();
 
     public static int getVersion()
     {
         return version_;
     }
 
-    public static MessagingService instance()
-    {   
-    	if ( bShutdown_ )
-    	{
-            synchronized (MessagingService.class)
-            {
-                if ( bShutdown_ )
-                {
-            		messagingService_ = new MessagingService();
-            		bShutdown_ = false;
-                }
-            }
-    	}
-        return messagingService_;
-    }
-    
     public Object clone() throws CloneNotSupportedException
     {
         //Prevents the singleton from being cloned
@@ -179,7 +155,6 @@
         SelectionKeyHandler handler = new TcpConnectionHandler(localEp);
 
         SelectionKey key = SelectorManager.getSelectorManager().register(serverChannel, handler, SelectionKey.OP_ACCEPT);          
-        listenSockets_.put(localEp, key);
         FailureDetector.instance.registerFailureDetectionEventListener(this);
     }
     
@@ -195,7 +170,6 @@
         try
         {
             connection.init(localEp);
-            udpConnections_.put(localEp, connection);
         }
         catch (IOException e)
         {
@@ -421,47 +395,17 @@
     public static void shutdown()
     {
         logger_.info("Shutting down ...");
-        synchronized (MessagingService.class)
-        {
-            FailureDetector.instance.unregisterFailureDetectionEventListener(MessagingService.instance());
-            /* Stop listening on any TCP socket */
-            for (SelectionKey skey : listenSockets_.values())
-            {
-                skey.cancel();
-                try
-                {
-                    skey.channel().close();
-                }
-                catch (IOException e) {}
-            }
-            listenSockets_.clear();
 
-            /* Stop listening on any UDP ports. */
-            for (UdpConnection con : udpConnections_.values())
-            {
-                con.close();
-            }
-            udpConnections_.clear();
+        messageReadExecutor_.shutdownNow();
+        messageDeserializerExecutor_.shutdownNow();
+        streamExecutor_.shutdownNow();
+        StageManager.shutdownNow();
+
+        /* shut down the cachetables */
+        taskCompletionMap_.shutdown();
+        callbackMap_.shutdown();
 
-            /* Shutdown the threads in the EventQueue's */
-            messageReadExecutor_.shutdownNow();
-            messageDeserializerExecutor_.shutdownNow();
-            streamExecutor_.shutdownNow();
-
-            StageManager.shutdown();
-            
-            /* shut down the cachetables */
-            taskCompletionMap_.shutdown();
-            callbackMap_.shutdown();
-
-            /* Interrupt the selector manager thread */
-            SelectorManager.getSelectorManager().interrupt();
-
-            connectionManagers_.clear();
-            verbHandlers_.clear();
-            bShutdown_ = true;
-        }
-        logger_.info("Shutdown invocation complete.");
+        logger_.info("Shutdown complete (no further commands will be processed)");
     }
 
     public static void receive(Message message)

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=901432&r1=901431&r2=901432&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java Wed Jan 20 23:36:32 2010
@@ -120,8 +120,8 @@
      */
     protected AntiEntropyService()
     {
-        MessagingService.instance().registerVerbHandlers(TREE_REQUEST_VERB, new TreeRequestVerbHandler());
-        MessagingService.instance().registerVerbHandlers(TREE_RESPONSE_VERB, new TreeResponseVerbHandler());
+        MessagingService.instance.registerVerbHandlers(TREE_REQUEST_VERB, new TreeRequestVerbHandler());
+        MessagingService.instance.registerVerbHandlers(TREE_RESPONSE_VERB, new TreeResponseVerbHandler());
         naturalRepairs = new ConcurrentHashMap<CFPair, Long>();
         trees = new HashMap<CFPair, Cachetable<InetAddress, TreePair>>();
     }
@@ -225,7 +225,7 @@
      */
     void notifyNeighbors(Validator validator, InetAddress local, Collection<InetAddress> neighbors)
     {
-        MessagingService ms = MessagingService.instance();
+        MessagingService ms = MessagingService.instance;
 
         try
         {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java?rev=901432&r1=901431&r2=901432&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java Wed Jan 20 23:36:32 2010
@@ -89,7 +89,7 @@
             Message message = readCommand.makeReadMessage();
             if (logger_.isDebugEnabled())
               logger_.debug("Performing read repair for " + readCommand_.key + " to " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]");
-			MessagingService.instance().sendRR(message, replicas_.toArray(new InetAddress[replicas_.size()]), responseHandler);
+            MessagingService.instance.sendRR(message, replicas_.toArray(new InetAddress[replicas_.size()]), responseHandler);
 		}
 	}
 	
@@ -153,7 +153,7 @@
 			Message message = readCommandDigestOnly.makeReadMessage();
             if (logger_.isDebugEnabled())
               logger_.debug("Reading consistency digest for " + readCommand_.key + " from " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]");
-            MessagingService.instance().sendRR(message, replicas_.toArray(new InetAddress[replicas_.size()]), new DigestResponseHandler());
+            MessagingService.instance.sendRR(message, replicas_.toArray(new InetAddress[replicas_.size()]), new DigestResponseHandler());
 		}
 		catch (IOException ex)
 		{

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java?rev=901432&r1=901431&r2=901432&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java Wed Jan 20 23:36:32 2010
@@ -46,7 +46,7 @@
             Message response = reply.getReply(message);
             if (logger.isDebugEnabled())
                 logger.debug("Sending " + reply+ " to " + message.getMessageId() + "@" + message.getFrom());
-            MessagingService.instance().sendOneWay(response, message.getFrom());
+            MessagingService.instance.sendOneWay(response, message.getFrom());
         }
         catch (Exception ex)
         {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java?rev=901432&r1=901431&r2=901432&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java Wed Jan 20 23:36:32 2010
@@ -20,7 +20,6 @@
 
 import java.io.IOException;
 import java.io.IOError;
-import java.util.concurrent.locks.*;
 
 import org.apache.cassandra.db.RowMutationMessage;
 import java.net.InetAddress;
@@ -74,7 +73,7 @@
             {
                 throw new RuntimeException(e);
             }
-            MessagingService.instance().sendOneWay(message, to);
+            MessagingService.instance.sendOneWay(message, to);
 		}
 
 	}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=901432&r1=901431&r2=901432&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java Wed Jan 20 23:36:32 2010
@@ -154,7 +154,7 @@
         public void doVerb(Message message)
         {
             Message reply = message.getReply(FBUtilities.getLocalAddress(), new byte[] {(byte)(isMoveable_.get() ? 1 : 0)});
-            MessagingService.instance().sendOneWay(reply, message.getFrom());
+            MessagingService.instance.sendOneWay(reply, message.getFrom());
             if ( isMoveable_.get() )
             {
                 // MoveMessage moveMessage = (MoveMessage)message.getMessageBody()[0];
@@ -190,7 +190,7 @@
 
     private StorageLoadBalancer()
     {
-        MessagingService.instance().registerVerbHandlers(StorageLoadBalancer.moveMessageVerbHandler_, new MoveMessageVerbHandler());
+        MessagingService.instance.registerVerbHandlers(StorageLoadBalancer.moveMessageVerbHandler_, new MoveMessageVerbHandler());
         Gossiper.instance.register(this);
     }
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=901432&r1=901431&r2=901432&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed Jan 20 23:36:32 2010
@@ -137,7 +137,7 @@
                                     unhintedMessage = rm.makeRowMutationMessage();
                                 if (logger.isDebugEnabled())
                                     logger.debug("insert writing key " + rm.key() + " to " + unhintedMessage.getMessageId() + "@" + target);
-                                MessagingService.instance().sendOneWay(unhintedMessage, target);
+                                MessagingService.instance.sendOneWay(unhintedMessage, target);
                             }
                         }
                         else
@@ -146,7 +146,7 @@
                             hintedMessage.addHeader(RowMutation.HINT, target.getAddress());
                             if (logger.isDebugEnabled())
                                 logger.debug("insert writing key " + rm.key() + " to " + hintedMessage.getMessageId() + "@" + hintedTarget + " for " + target);
-                            MessagingService.instance().sendOneWay(hintedMessage, hintedTarget);
+                            MessagingService.instance.sendOneWay(hintedMessage, hintedTarget);
                         }
                     }
                 }
@@ -202,11 +202,11 @@
                             if (unhintedMessage == null)
                             {
                                 unhintedMessage = rm.makeRowMutationMessage();
-                                MessagingService.instance().addCallback(responseHandler, unhintedMessage.getMessageId());
+                                MessagingService.instance.addCallback(responseHandler, unhintedMessage.getMessageId());
                             }
                             if (logger.isDebugEnabled())
                                 logger.debug("insert writing key " + rm.key() + " to " + unhintedMessage.getMessageId() + "@" + naturalTarget);
-                            MessagingService.instance().sendOneWay(unhintedMessage, naturalTarget);
+                            MessagingService.instance.sendOneWay(unhintedMessage, naturalTarget);
                         }
                     }
                     else
@@ -216,7 +216,7 @@
                         hintedMessage.addHeader(RowMutation.HINT, naturalTarget.getAddress());
                         if (logger.isDebugEnabled())
                             logger.debug("insert writing key " + rm.key() + " to " + hintedMessage.getMessageId() + "@" + maybeHintedTarget + " for " + naturalTarget);
-                        MessagingService.instance().sendOneWay(hintedMessage, maybeHintedTarget);
+                        MessagingService.instance.sendOneWay(hintedMessage, maybeHintedTarget);
                     }
                 }
             }
@@ -326,7 +326,7 @@
             if (logger.isDebugEnabled())
                 logger.debug("weakreadremote reading " + command + " from " + message.getMessageId() + "@" + endPoint);
             message.addHeader(ReadCommand.DO_REPAIR, ReadCommand.DO_REPAIR.getBytes());
-            iars.add(MessagingService.instance().sendRR(message, endPoint));
+            iars.add(MessagingService.instance.sendRR(message, endPoint));
         }
 
         for (IAsyncResult iar: iars)
@@ -436,7 +436,7 @@
                     logger.debug("strongread reading " + (m == message ? "data" : "digest") + " for " + command + " from " + m.getMessageId() + "@" + endpoint);
             }
             QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(DatabaseDescriptor.getQuorum(), new ReadResponseResolver(command.table, responseCount));
-            MessagingService.instance().sendRR(messages, endPoints, quorumResponseHandler);
+            MessagingService.instance.sendRR(messages, endPoints, quorumResponseHandler);
             quorumResponseHandlers.add(quorumResponseHandler);
             commandEndPoints.add(endPoints);
         }
@@ -465,7 +465,7 @@
                             readResponseResolverRepair);
                     logger.info("DigestMismatchException: " + ex.getMessage());
                     Message messageRepair = command.makeReadMessage();
-                    MessagingService.instance().sendRR(messageRepair, commandEndPoints.get(commandIndex), quorumResponseHandlerRepair);
+                    MessagingService.instance.sendRR(messageRepair, commandEndPoints.get(commandIndex), quorumResponseHandlerRepair);
                     try
                     {
                         row = quorumResponseHandlerRepair.get();
@@ -556,7 +556,7 @@
                 logger.debug("reading " + command + " for " + primaryRange + " from " + message.getMessageId() + "@" + endPoint);
             for (InetAddress replicaEndpoint : endpoints)
             {
-                MessagingService.instance().sendRR(message, replicaEndpoint, handler);
+                MessagingService.instance.sendRR(message, replicaEndpoint, handler);
             }
 
             // if we're done, great, otherwise, move to the next range

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=901432&r1=901431&r2=901432&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed Jan 20 23:36:32 2010
@@ -188,17 +188,17 @@
         endPointSnitch_ = DatabaseDescriptor.getEndPointSnitch();
 
         /* register the verb handlers */
-        MessagingService.instance().registerVerbHandlers(binaryVerbHandler_, new BinaryVerbHandler());
-        MessagingService.instance().registerVerbHandlers(mutationVerbHandler_, new RowMutationVerbHandler());
-        MessagingService.instance().registerVerbHandlers(readRepairVerbHandler_, new ReadRepairVerbHandler());
-        MessagingService.instance().registerVerbHandlers(readVerbHandler_, new ReadVerbHandler());
-        MessagingService.instance().registerVerbHandlers(rangeSliceVerbHandler_, new RangeSliceVerbHandler());
+        MessagingService.instance.registerVerbHandlers(binaryVerbHandler_, new BinaryVerbHandler());
+        MessagingService.instance.registerVerbHandlers(mutationVerbHandler_, new RowMutationVerbHandler());
+        MessagingService.instance.registerVerbHandlers(readRepairVerbHandler_, new ReadRepairVerbHandler());
+        MessagingService.instance.registerVerbHandlers(readVerbHandler_, new ReadVerbHandler());
+        MessagingService.instance.registerVerbHandlers(rangeSliceVerbHandler_, new RangeSliceVerbHandler());
         // see BootStrapper for a summary of how the bootstrap verbs interact
-        MessagingService.instance().registerVerbHandlers(bootstrapTokenVerbHandler_, new BootStrapper.BootstrapTokenVerbHandler());
-        MessagingService.instance().registerVerbHandlers(streamRequestVerbHandler_, new StreamRequestVerbHandler() );
-        MessagingService.instance().registerVerbHandlers(streamInitiateVerbHandler_, new Streaming.StreamInitiateVerbHandler());
-        MessagingService.instance().registerVerbHandlers(streamInitiateDoneVerbHandler_, new Streaming.StreamInitiateDoneVerbHandler());
-        MessagingService.instance().registerVerbHandlers(streamFinishedVerbHandler_, new Streaming.StreamFinishedVerbHandler());
+        MessagingService.instance.registerVerbHandlers(bootstrapTokenVerbHandler_, new BootStrapper.BootstrapTokenVerbHandler());
+        MessagingService.instance.registerVerbHandlers(streamRequestVerbHandler_, new StreamRequestVerbHandler() );
+        MessagingService.instance.registerVerbHandlers(streamInitiateVerbHandler_, new Streaming.StreamInitiateVerbHandler());
+        MessagingService.instance.registerVerbHandlers(streamInitiateDoneVerbHandler_, new Streaming.StreamInitiateDoneVerbHandler());
+        MessagingService.instance.registerVerbHandlers(streamFinishedVerbHandler_, new Streaming.StreamFinishedVerbHandler());
 
         replicationStrategy_ = getReplicationStrategy(tokenMetadata_);
     }
@@ -231,8 +231,8 @@
     {
         isClientMode = true;
         logger_.info("Starting up client gossip");
-        MessagingService.instance().listen(FBUtilities.getLocalAddress());
-        MessagingService.instance().listenUDP(FBUtilities.getLocalAddress());
+        MessagingService.instance.listen(FBUtilities.getLocalAddress());
+        MessagingService.instance.listenUDP(FBUtilities.getLocalAddress());
 
         SelectorManager.getSelectorManager().start();
         SelectorManager.getUdpSelectorManager().start();
@@ -249,9 +249,9 @@
         logger_.info("Starting up server gossip");
 
         /* Listen for application messages */
-        MessagingService.instance().listen(FBUtilities.getLocalAddress());
+        MessagingService.instance.listen(FBUtilities.getLocalAddress());
         /* Listen for control messages */
-        MessagingService.instance().listenUDP(FBUtilities.getLocalAddress());
+        MessagingService.instance.listenUDP(FBUtilities.getLocalAddress());
 
         SelectorManager.getSelectorManager().start();
         SelectorManager.getUdpSelectorManager().start();
@@ -961,7 +961,7 @@
     public void forceTableRepair(final String tableName, final String... columnFamilies) throws IOException
     {
         // request that all relevant endpoints generate trees
-        final MessagingService ms = MessagingService.instance();
+        final MessagingService ms = MessagingService.instance;
         final List<InetAddress> endpoints = getNaturalEndpoints(getLocalToken());
         for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, columnFamilies))
         {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java?rev=901432&r1=901431&r2=901432&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java Wed Jan 20 23:36:32 2010
@@ -83,7 +83,7 @@
             File file = filesToStream_.get(0);
             if (logger_.isDebugEnabled())
               logger_.debug("Streaming " + file.length() + " length file " + file + " ...");
-            MessagingService.instance().stream(file.getAbsolutePath(), 0L, file.length(), FBUtilities.getLocalAddress(), to_);
+            MessagingService.instance.stream(file.getAbsolutePath(), 0L, file.length(), FBUtilities.getLocalAddress(), to_);
         }
     }
     

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableExportTest.java?rev=901432&r1=901431&r2=901432&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableExportTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableExportTest.java Wed Jan 20 23:36:32 2010
@@ -83,8 +83,7 @@
     }
 
     @Test
-    public void testExportSimpleCf() throws IOException
-    {
+    public void testExportSimpleCf() throws IOException    {
         File tempSS = createTemporarySSTable("Keyspace1", "Standard1");
         ColumnFamily cfamily = ColumnFamily.create("Keyspace1", "Standard1");
         IPartitioner<?> partitioner = DatabaseDescriptor.getPartitioner();