You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/01/12 00:36:38 UTC

svn commit: r1057930 - in /cassandra/branches/cassandra-0.7: src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/streaming/ test/unit/org/apache/cassandra/service/

Author: jbellis
Date: Tue Jan 11 23:36:37 2011
New Revision: 1057930

URL: http://svn.apache.org/viewvc?rev=1057930&view=rev
Log:
convert MessagingService into a true singleton
patch by Folke Behrens; reviewed by jbellis for CASSANDRA-1959

Modified:
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/AsyncResult.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/FileStreamTask.java
    cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RemoveTest.java

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/AsyncResult.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/AsyncResult.java?rev=1057930&r1=1057929&r2=1057930&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/AsyncResult.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/AsyncResult.java Tue Jan 11 23:36:37 2011
@@ -95,7 +95,7 @@ class AsyncResult implements IAsyncResul
             lock.unlock();
         }        
 
-        MessagingService.removeRegisteredCallback(response.getMessageId());
+        MessagingService.instance().removeRegisteredCallback(response.getMessageId());
     }
 
     public InetAddress getFrom()

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1057930&r1=1057929&r2=1057930&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Tue Jan 11 23:36:37 2011
@@ -88,7 +88,7 @@ public class IncomingTcpConnection exten
                     input.readFully(contentBytes);
                     
                     Message message = Message.serializer().deserialize(new DataInputStream(new ByteArrayInputStream(contentBytes)));
-                    MessagingService.receive(message);
+                    MessagingService.instance().receive(message);
                 }
                 // prepare to read the next message
                 MessagingService.validateMagic(input.readInt());

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessageDeliveryTask.java?rev=1057930&r1=1057929&r2=1057930&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessageDeliveryTask.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessageDeliveryTask.java Tue Jan 11 23:36:37 2011
@@ -50,7 +50,7 @@ public class MessageDeliveryTask impleme
             case REQUEST_RESPONSE:
                 if (System.currentTimeMillis() > constructionTime_ + DatabaseDescriptor.getRpcTimeout())
                 {
-                    MessagingService.incrementDroppedMessages(verb);
+                    MessagingService.instance().incrementDroppedMessages(verb);
                     return;
                 }
                 break;

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java?rev=1057930&r1=1057929&r2=1057930&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java Tue Jan 11 23:36:37 2011
@@ -61,36 +61,35 @@ import org.apache.cassandra.utils.Simple
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.cliffc.high_scale_lib.NonBlockingHashSet;
 
-public class MessagingService implements MessagingServiceMBean, ILatencyPublisher
+public final class MessagingService implements MessagingServiceMBean, ILatencyPublisher
 {
-    private static int version_ = 1;
+    private static final int version_ = 1;
     //TODO: make this parameter dynamic somehow.  Not sure if config is appropriate.
-    private static SerializerType serializerType_ = SerializerType.BINARY;
+    private SerializerType serializerType_ = SerializerType.BINARY;
 
     /** we preface every message with this number so the recipient can validate the sender is sane */
-    public static final int PROTOCOL_MAGIC = 0xCA552DFA;
+    private static final int PROTOCOL_MAGIC = 0xCA552DFA;
 
     /* This records all the results mapped by message Id */
-    private static ExpiringMap<String, IMessageCallback> callbacks;
-    private static ConcurrentMap<String, Collection<InetAddress>> targets = new NonBlockingHashMap<String, Collection<InetAddress>>();
+    private final ExpiringMap<String, IMessageCallback> callbacks;
+    private final ConcurrentMap<String, Collection<InetAddress>> targets = new NonBlockingHashMap<String, Collection<InetAddress>>();
 
     /* Lookup table for registering message handlers based on the verb. */
-    private static Map<StorageService.Verb, IVerbHandler> verbHandlers_;
+    private final Map<StorageService.Verb, IVerbHandler> verbHandlers_;
 
     /* Thread pool to handle messaging write activities */
-    private static ExecutorService streamExecutor_;
+    private final ExecutorService streamExecutor_;
     
-    private static NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool> connectionManagers_ = new NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool>();
+    private final NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool> connectionManagers_ = new NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool>();
     
-    private static Logger logger_ = LoggerFactory.getLogger(MessagingService.class);
-    private static int LOG_DROPPED_INTERVAL_IN_MS = 5000;
+    private static final Logger logger_ = LoggerFactory.getLogger(MessagingService.class);
+    private static final int LOG_DROPPED_INTERVAL_IN_MS = 5000;
 
     private SocketThread socketThread;
-    private SimpleCondition listenGate;
-    private static final Map<StorageService.Verb, AtomicInteger> droppedMessages = new EnumMap<StorageService.Verb, AtomicInteger>(StorageService.Verb.class);
+    private final SimpleCondition listenGate;
+    private final Map<StorageService.Verb, AtomicInteger> droppedMessages = new EnumMap<StorageService.Verb, AtomicInteger>(StorageService.Verb.class);
     private final List<ILatencySubscriber> subscribers = new ArrayList<ILatencySubscriber>();
 
-    static
     {
         for (StorageService.Verb verb : StorageService.Verb.values())
             droppedMessages.put(verb, new AtomicInteger());
@@ -105,13 +104,7 @@ public class MessagingService implements
         return MSHandle.instance;
     }
 
-    public Object clone() throws CloneNotSupportedException
-    {
-        //Prevents the singleton from being cloned
-        throw new CloneNotSupportedException();
-    }
-
-    protected MessagingService()
+    private MessagingService()
     {
         listenGate = new SimpleCondition();
         verbHandlers_ = new EnumMap<StorageService.Verb, IVerbHandler>(StorageService.Verb.class);
@@ -155,7 +148,7 @@ public class MessagingService implements
         }
     }
 
-    public byte[] hash(String type, byte data[])
+    public static byte[] hash(String type, byte data[])
     {
         byte result[];
         try
@@ -204,7 +197,7 @@ public class MessagingService implements
         }
     }
 
-    public static OutboundTcpConnectionPool getConnectionPool(InetAddress to)
+    public OutboundTcpConnectionPool getConnectionPool(InetAddress to)
     {
         OutboundTcpConnectionPool cp = connectionManagers_.get(to);
         if (cp == null)
@@ -215,7 +208,7 @@ public class MessagingService implements
         return cp;
     }
 
-    public static OutboundTcpConnection getConnection(InetAddress to, Message msg)
+    public OutboundTcpConnection getConnection(InetAddress to, Message msg)
     {
         return getConnectionPool(to).getConnection(msg);
     }
@@ -275,7 +268,7 @@ public class MessagingService implements
         addresses.add(endpoint);
     }
 
-    private static void removeTarget(String messageId, InetAddress from)
+    private void removeTarget(String messageId, InetAddress from)
     {
         Collection<InetAddress> addresses = targets.get(messageId);
         // null is expected if we removed the callback or we got a reply after its timeout expired
@@ -346,7 +339,7 @@ public class MessagingService implements
         // do local deliveries
         if ( message.getFrom().equals(to) )
         {
-            MessagingService.receive(message);
+            receive(message);
             return;
         }
 
@@ -407,19 +400,19 @@ public class MessagingService implements
     }
 
     /** blocks until the processing pools are empty and done. */
-    public static void waitFor() throws InterruptedException
+    public void waitFor() throws InterruptedException
     {
         while (!streamExecutor_.isTerminated())
             streamExecutor_.awaitTermination(5, TimeUnit.SECONDS);
     }
 
-    public static void shutdown()
+    public void shutdown()
     {
         logger_.info("Shutting down MessageService...");
 
         try
         {
-            instance().socketThread.close();
+            socketThread.close();
         }
         catch (IOException e)
         {
@@ -432,7 +425,7 @@ public class MessagingService implements
         logger_.info("Shutdown complete (no further commands will be processed)");
     }
 
-    public static void receive(Message message)
+    public void receive(Message message)
     {
         message = SinkManager.processServerMessage(message);
         if (message == null)
@@ -444,23 +437,23 @@ public class MessagingService implements
         stage.execute(runnable);
     }
 
-    public static IMessageCallback getRegisteredCallback(String messageId)
+    public IMessageCallback getRegisteredCallback(String messageId)
     {
         return callbacks.get(messageId);
     }
     
-    public static IMessageCallback removeRegisteredCallback(String messageId)
+    public IMessageCallback removeRegisteredCallback(String messageId)
     {
         targets.remove(messageId); // TODO fix this when we clean up quorum reads to do proper RR
         return callbacks.remove(messageId);
     }
 
-    public static long getRegisteredCallbackAge(String messageId)
+    public long getRegisteredCallbackAge(String messageId)
     {
         return callbacks.getAge(messageId);
     }
 
-    public static void responseReceivedFrom(String messageId, InetAddress from)
+    public void responseReceivedFrom(String messageId, InetAddress from)
     {
         removeTarget(messageId, from);
     }
@@ -476,7 +469,7 @@ public class MessagingService implements
         return x >>> (p + 1) - n & ~(-1 << n);
     }
         
-    public static ByteBuffer packIt(byte[] bytes, boolean compress)
+    public ByteBuffer packIt(byte[] bytes, boolean compress)
     {
         /*
              Setting up the protocol header. This is 4 bytes long
@@ -506,7 +499,7 @@ public class MessagingService implements
         return buffer;
     }
         
-    public static ByteBuffer constructStreamHeader(StreamHeader streamHeader, boolean compress)
+    public ByteBuffer constructStreamHeader(StreamHeader streamHeader, boolean compress)
     {
         /* 
         Setting up the protocol header. This is 4 bytes long
@@ -557,12 +550,12 @@ public class MessagingService implements
         return buffer;
     }
 
-    public static int incrementDroppedMessages(StorageService.Verb verb)
+    public int incrementDroppedMessages(StorageService.Verb verb)
     {
         return droppedMessages.get(verb).incrementAndGet();
     }
                
-    private static void logDroppedMessages()
+    private void logDroppedMessages()
     {
         boolean logTpstats = false;
         for (Map.Entry<StorageService.Verb, AtomicInteger> entry : droppedMessages.entrySet())

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/ResponseVerbHandler.java?rev=1057930&r1=1057929&r2=1057930&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/ResponseVerbHandler.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/ResponseVerbHandler.java Tue Jan 11 23:36:37 2011
@@ -37,9 +37,9 @@ public class ResponseVerbHandler impleme
     public void doVerb(Message message)
     {     
         String messageId = message.getMessageId();
-        MessagingService.responseReceivedFrom(messageId, message.getFrom());
-        double age = System.currentTimeMillis() - MessagingService.getRegisteredCallbackAge(messageId);
-        IMessageCallback cb = MessagingService.getRegisteredCallback(messageId);
+        MessagingService.instance().responseReceivedFrom(messageId, message.getFrom());
+        double age = System.currentTimeMillis() - MessagingService.instance().getRegisteredCallbackAge(messageId);
+        IMessageCallback cb = MessagingService.instance().getRegisteredCallback(messageId);
         if (cb == null)
             return;
 

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java?rev=1057930&r1=1057929&r2=1057930&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java Tue Jan 11 23:36:37 2011
@@ -83,9 +83,10 @@ public class ReadCallback<T> implements 
 
     public void close()
     {
+        MessagingService ms = MessagingService.instance();
         for (Message response : resolver.getMessages())
         {
-            MessagingService.removeRegisteredCallback(response.getMessageId());
+            ms.removeRegisteredCallback(response.getMessageId());
         }
     }
     

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java?rev=1057930&r1=1057929&r2=1057930&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java Tue Jan 11 23:36:37 2011
@@ -273,7 +273,7 @@ public class StorageService implements I
         Gossiper.instance.unregister(migrationManager);
         Gossiper.instance.unregister(this);
         Gossiper.instance.stop();
-        MessagingService.shutdown();
+        MessagingService.instance().shutdown();
         StageManager.shutdownNow();
     }
     
@@ -1500,7 +1500,7 @@ public class StorageService implements I
             public void run()
             {
                 Gossiper.instance.stop();
-                MessagingService.shutdown();
+                MessagingService.instance().shutdown();
                 StageManager.shutdownNow();
                 setMode("Decommissioned", true);
                 // let op be responsible for killing the process
@@ -1797,9 +1797,9 @@ public class StorageService implements I
         setMode("Starting drain process", true);
         Gossiper.instance.stop();
         setMode("Draining: shutting down MessageService", false);
-        MessagingService.shutdown();
+        MessagingService.instance().shutdown();
         setMode("Draining: emptying MessageService pools", false);
-        MessagingService.waitFor();
+        MessagingService.instance().waitFor();
 
         setMode("Draining: clearing mutation stage", false);
         mutationStage.shutdown();

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/FileStreamTask.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/FileStreamTask.java?rev=1057930&r1=1057929&r2=1057930&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/FileStreamTask.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/FileStreamTask.java Tue Jan 11 23:36:37 2011
@@ -84,7 +84,7 @@ public class FileStreamTask extends Wrap
 
     private void stream(SocketChannel channel) throws IOException
     {
-        ByteBuffer buffer = MessagingService.constructStreamHeader(header, false);
+        ByteBuffer buffer = MessagingService.instance().constructStreamHeader(header, false);
         channel.write(buffer);
         assert buffer.remaining() == 0;
         if (header.file == null)

Modified: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RemoveTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RemoveTest.java?rev=1057930&r1=1057929&r2=1057930&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RemoveTest.java (original)
+++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RemoveTest.java Tue Jan 11 23:36:37 2011
@@ -84,7 +84,7 @@ public class RemoveTest extends CleanupH
     public void tearDown()
     {
         SinkManager.clear();
-        MessagingService.shutdown();
+        MessagingService.instance().shutdown();
         ss.setPartitionerUnsafe(oldPartitioner);
     }