You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/09/23 03:57:22 UTC

svn commit: r817926 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra: concurrent/ThreadLocalContext.java net/MessageSerializationTask.java net/MessagingService.java service/MultiQuorumResponseHandler.java

Author: jbellis
Date: Wed Sep 23 01:57:21 2009
New Revision: 817926

URL: http://svn.apache.org/viewvc?rev=817926&view=rev
Log:
clean out unused code from MessagingService. Inline sink processing into sendOneWay instead of having another executor for that.
this sets the stage for backpressuring the client, should we choose to do that
patch by jbellis; reviewed by goffinet for CASSANDRA-401

Removed:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/ThreadLocalContext.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessageSerializationTask.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/MultiQuorumResponseHandler.java
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=817926&r1=817925&r2=817926&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Wed Sep 23 01:57:21 2009
@@ -21,12 +21,14 @@
 import org.apache.cassandra.concurrent.*;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.net.io.SerializerType;
+import org.apache.cassandra.net.sink.SinkManager;
 import org.apache.cassandra.utils.*;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
 import java.net.MulticastSocket;
 import java.net.ServerSocket;
+import java.net.SocketException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.ServerSocketChannel;
@@ -40,8 +42,6 @@
 
 public class MessagingService implements IMessagingService
 {
-    private static boolean debugOn_ = false;   
-    
     private static int version_ = 1;
     //TODO: make this parameter dynamic somehow.  Not sure if config is appropriate.
     private static SerializerType serializerType_ = SerializerType.BINARY;
@@ -51,7 +51,8 @@
     public static final String responseVerbHandler_ = "RESPONSE";
     /* Stage for responses. */
     public static final String responseStage_ = "RESPONSE-STAGE";
-    private enum ReservedVerbs_ {RESPONSE};
+    private enum ReservedVerbs_ {
+    };
     
     private static Map<String, String> reservedVerbs_ = new Hashtable<String, String>();
     /* Indicate if we are currently streaming data to another node or receiving streaming data */
@@ -69,15 +70,10 @@
     
     /* Lookup table for registering message handlers based on the verb. */
     private static Map<String, IVerbHandler> verbHandlers_;
-    
-    private static Map<String, MulticastSocket> mCastMembership_ = new HashMap<String, MulticastSocket>();
-    
+
     /* Thread pool to handle messaging read activities of Socket and default stage */
     private static ExecutorService messageDeserializationExecutor_;
     
-    /* Thread pool to handle messaging write activities */
-    private static ExecutorService messageSerializerExecutor_;
-    
     /* Thread pool to handle deserialization of messages read from the socket. */
     private static ExecutorService messageDeserializerExecutor_;
     
@@ -92,48 +88,12 @@
     private static Logger logger_ = Logger.getLogger(MessagingService.class);
     
     private static IMessagingService messagingService_ = new MessagingService();
-    
-    public static boolean isDebugOn()
-    {
-        return debugOn_;
-    }
-    
-    public static void debugOn(boolean on)
-    {
-        debugOn_ = on;
-    }
-    
-    public static SerializerType getSerializerType()
-    {
-        return serializerType_;
-    }
-    
-    public synchronized static void serializerType(String type)
-    { 
-        if ( type.equalsIgnoreCase("binary") )
-        {
-            serializerType_ = SerializerType.BINARY;
-        }
-        else if ( type.equalsIgnoreCase("java") )
-        {
-            serializerType_ = SerializerType.JAVA;
-        }
-        else if ( type.equalsIgnoreCase("xml") )
-        {
-            serializerType_ = SerializerType.XML;
-        }
-    }
-    
+
     public static int getVersion()
     {
         return version_;
     }
-    
-    public static void setVersion(int version)
-    {
-        version_ = version;
-    }
-    
+
     public static IMessagingService getMessagingInstance()
     {   
     	if ( bShutdown_ )
@@ -186,15 +146,7 @@
                 new LinkedBlockingQueue<Runnable>(),
                 new ThreadFactoryImpl("MESSAGING-SERVICE-POOL")
                 );
-                
-        messageSerializerExecutor_ = new DebuggableThreadPoolExecutor( maxSize,
-                maxSize,
-                Integer.MAX_VALUE,
-                TimeUnit.SECONDS,
-                new LinkedBlockingQueue<Runnable>(),
-                new ThreadFactoryImpl("MESSAGE-SERIALIZER-POOL")
-                ); 
-        
+
         messageDeserializerExecutor_ = new DebuggableThreadPoolExecutor( maxSize,
                 maxSize,
                 Integer.MAX_VALUE,
@@ -284,19 +236,6 @@
         return cp;
     }
 
-    public static ConnectionStatistics[] getPoolStatistics()
-    {
-        Set<ConnectionStatistics> stats = new HashSet<ConnectionStatistics>();        
-        Iterator<TcpConnectionManager> it = poolTable_.values().iterator();
-        while ( it.hasNext() )
-        {
-            TcpConnectionManager cp = it.next();
-            ConnectionStatistics cs = new ConnectionStatistics(cp.getLocalEndPoint(), cp.getRemoteEndPoint(), cp.getPoolSize(), cp.getConnectionsInUse());
-            stats.add( cs );
-        }
-        return stats.toArray(new ConnectionStatistics[0]);
-    }
-    
     public static TcpConnection getConnection(EndPoint from, EndPoint to) throws IOException
     {
         return getConnectionPool(from, to).getConnection();
@@ -419,8 +358,38 @@
             return;
         }
         
-        Runnable tcpWriteEvent = new MessageSerializationTask(message, to);
-        messageSerializerExecutor_.execute(tcpWriteEvent);    
+        TcpConnection connection = null;
+        try
+        {
+            Message processedMessage = SinkManager.processClientMessageSink(message);
+            if (processedMessage == null)
+            {
+                return;
+            }
+            connection = MessagingService.getConnection(processedMessage.getFrom(), to);
+            connection.write(message);
+        }
+        catch (SocketException se)
+        {
+            // Shutting down the entire pool. May be too conservative an approach.
+            MessagingService.getConnectionPool(message.getFrom(), to).shutdown();
+            logger_.error("socket error writing to " + to, se);
+        }
+        catch (IOException e)
+        {
+            if (connection != null)
+            {
+                connection.errorClose();
+            }
+            logger_.error("unexpected error writing " + message, e);
+        }
+        finally
+        {
+            if (connection != null)
+            {
+                connection.close();
+            }
+        }
     }
     
     public IAsyncResult sendRR(Message message, EndPoint to)
@@ -463,34 +432,12 @@
         Runnable streamingTask = new FileStreamTask(file, startPosition, total, from, to);
         streamExecutor_.execute(streamingTask);
     }
-    
-    /*
-     * Does the application determine if we are currently streaming data.
-     * This would imply either streaming to a receiver, receiving streamed
-     * data or both. 
-    */
-    public static boolean isStreaming()
-    {
-        return isStreaming_.get();
-    }
-    
+
     public static void setStreamingMode(boolean bVal)
     {
         isStreaming_.set(bVal);
     }
-    public static void flushAndshutdown()
-    {
-        // safely shutdown and send all writes
-        for(Map.Entry<String, TcpConnectionManager> entry : poolTable_.entrySet() )
-        {
-            for(TcpConnection connection: entry.getValue().getConnections())
-            {
-                connection.doPendingWrites();
-            }
-        }
-        shutdown();
-    }
-    
+
     public static void shutdown()
     {
         logger_.info("Shutting down ...");
@@ -510,7 +457,6 @@
 
             /* Shutdown the threads in the EventQueue's */
             messageDeserializationExecutor_.shutdownNow();
-            messageSerializerExecutor_.shutdownNow();
             messageDeserializerExecutor_.shutdownNow();
             streamExecutor_.shutdownNow();
 
@@ -534,12 +480,7 @@
     {        
         enqueueRunnable(message.getMessageType(), new MessageDeliveryTask(message));
     }
-    
-    public static boolean isLocalEndPoint(EndPoint ep)
-    {
-        return ( endPoints_.contains(ep) );
-    }
-        
+
     private static void enqueueRunnable(String stageName, Runnable runnable){
         
         IStage stage = StageManager.getStage(stageName);   
@@ -550,8 +491,8 @@
         } 
         else
         {
-            logger_.info("Running on default stage - beware");
-            messageSerializerExecutor_.execute(runnable);
+            logger_.warn("Running on default stage - beware");
+            messageDeserializerExecutor_.execute(runnable);
         }
     }    
     
@@ -569,27 +510,12 @@
     {
         return taskCompletionMap_.remove(key);
     }
-    
-    public static void removeAsyncResult(String key)
-    {
-        taskCompletionMap_.remove(key);
-    }
 
-    public static byte[] getProtocol()
-    {
-        return protocol_;
-    }
-    
     public static ExecutorService getReadExecutor()
     {
         return messageDeserializationExecutor_;
     }
-    
-    public static ExecutorService getWriteExecutor()
-    {
-        return messageSerializerExecutor_;
-    }
-    
+
     public static ExecutorService getDeserializationExecutor()
     {
         return messageDeserializerExecutor_;