You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/08/13 18:58:49 UTC

svn commit: r985283 - in /cassandra/branches/cassandra-0.6: ./ src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/net/

Author: jbellis
Date: Fri Aug 13 16:58:49 2010
New Revision: 985283

URL: http://svn.apache.org/viewvc?rev=985283&view=rev
Log:
remove message deserialization stage, and uncap read/write stages.  patch by jbellis; reviewed by Stu Hood for CASSANDRA-1358

Modified:
    cassandra/branches/cassandra-0.6/CHANGES.txt
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/StageManager.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeserializationTask.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java

Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=985283&r1=985282&r2=985283&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Fri Aug 13 16:58:49 2010
@@ -11,6 +11,8 @@
    initialization (CASSANDRA-1377)
  * fix errors in hard-coded bloom filter optKPerBucket by computing it
    algorithmically (CASSANDRA-1220
+ * remove message deserialization stage, and uncap read/write stages
+   so slow reads/writes don't block gossip processing (CASSANDRA-1358)
 
 
 0.6.4

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/StageManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/StageManager.java?rev=985283&r1=985282&r2=985283&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/StageManager.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/concurrent/StageManager.java Fri Aug 13 16:58:49 2010
@@ -70,7 +70,7 @@ public class StageManager
                                                 numThreads,
                                                 Integer.MAX_VALUE,
                                                 TimeUnit.SECONDS,
-                                                new LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getStageQueueSize()),
+                                                new LinkedBlockingQueue<Runnable>(),
                                                 new NamedThreadFactory(name));
     }
     
@@ -82,7 +82,7 @@ public class StageManager
                                                      numThreads,
                                                      Integer.MAX_VALUE,
                                                      TimeUnit.SECONDS,
-                                                     new LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getStageQueueSize()),
+                                                     new LinkedBlockingQueue<Runnable>(),
                                                      new NamedThreadFactory(name));
     }
 

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=985283&r1=985282&r2=985283&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Fri Aug 13 16:58:49 2010
@@ -156,8 +156,6 @@ public class DatabaseDescriptor
         throw new RuntimeException("Cannot locate " + STORAGE_CONF_FILE + " via storage-config system property or classpath lookup.");
     }
 
-    private static int stageQueueSize_ = 4096;
-
     static
     {
         try
@@ -1114,11 +1112,6 @@ public class DatabaseDescriptor
         return getCFMetaData(tableName, cfName).subcolumnComparator;
     }
 
-    public static int getStageQueueSize()
-    {
-        return stageQueueSize_;
-    }
-
     /**
      * @return The absolute number of keys that should be cached per table.
      */

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=985283&r1=985282&r2=985283&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Fri Aug 13 16:58:49 2010
@@ -26,6 +26,7 @@ import java.net.Socket;
 
 import org.apache.log4j.Logger;
 
+import org.apache.cassandra.net.sink.SinkManager;
 import org.apache.cassandra.streaming.IncomingStreamReader;
 
 public class IncomingTcpConnection extends Thread
@@ -70,7 +71,9 @@ public class IncomingTcpConnection exten
                     int size = input.readInt();
                     byte[] contentBytes = new byte[size];
                     input.readFully(contentBytes);
-                    MessagingService.getDeserializationExecutor().submit(new MessageDeserializationTask(new ByteArrayInputStream(contentBytes)));
+                    
+                    Message message = Message.serializer().deserialize(new DataInputStream(new ByteArrayInputStream(contentBytes)));
+                    MessagingService.receive(message);
                 }
             }
             catch (EOFException e)

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeliveryTask.java?rev=985283&r1=985282&r2=985283&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeliveryTask.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeliveryTask.java Fri Aug 13 16:58:49 2010
@@ -20,13 +20,16 @@ package org.apache.cassandra.net;
 
 import org.apache.log4j.Logger;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.service.StorageService;
 
 public class MessageDeliveryTask implements Runnable
 {
+    private static final Logger logger_ = Logger.getLogger(MessageDeliveryTask.class);    
+
     private Message message_;
-    private static Logger logger_ = Logger.getLogger(MessageDeliveryTask.class);    
-    
+    private final long constructionTime_ = System.currentTimeMillis();
+
     public MessageDeliveryTask(Message message)
     {
         message_ = message;    
@@ -34,6 +37,12 @@ public class MessageDeliveryTask impleme
     
     public void run()
     { 
+        if (System.currentTimeMillis() >  constructionTime_ + DatabaseDescriptor.getRpcTimeout())
+        {
+            MessagingService.incrementDroppedMessages();
+            return;
+        }
+
         StorageService.Verb verb = message_.getVerb();
         IVerbHandler verbHandler = MessagingService.instance.getVerbHandler(verb);
         assert verbHandler != null : "unknown verb " + verb;

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeserializationTask.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeserializationTask.java?rev=985283&r1=985282&r2=985283&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeserializationTask.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeserializationTask.java Fri Aug 13 16:58:49 2010
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-
-import org.apache.log4j.Logger;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.net.sink.SinkManager;
-import org.apache.cassandra.utils.WrappedRunnable;
-
-class MessageDeserializationTask extends WrappedRunnable
-{
-    private static Logger logger = Logger.getLogger(MessageDeserializationTask.class);
-    
-    private final ByteArrayInputStream bytes;
-    private final long constructionTime = System.currentTimeMillis();
-    
-    MessageDeserializationTask(ByteArrayInputStream bytes)
-    {
-        this.bytes = bytes;
-    }
-
-    public void runMayThrow() throws IOException
-    {
-        if (System.currentTimeMillis() >  constructionTime + DatabaseDescriptor.getRpcTimeout())
-        {
-            MessagingService.incrementDroppedMessages();
-            return;
-        }
-
-        Message message = Message.serializer().deserialize(new DataInputStream(bytes));
-        message = SinkManager.processServerMessageSink(message);
-        MessagingService.receive(message);
-    }
-}

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java?rev=985283&r1=985282&r2=985283&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java Fri Aug 13 16:58:49 2010
@@ -18,21 +18,6 @@
 
 package org.apache.cassandra.net;
 
-import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.gms.IFailureDetectionEventListener;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.net.io.SerializerType;
-import org.apache.cassandra.net.sink.SinkManager;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.ExpiringMap;
-import org.apache.cassandra.utils.GuidGenerator;
-import org.apache.cassandra.utils.SimpleCondition;
-import org.apache.log4j.Logger;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-
 import java.io.IOError;
 import java.io.IOException;
 import java.net.InetAddress;
@@ -47,12 +32,24 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.io.SerializerType;
+import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ExpiringMap;
+import org.apache.cassandra.utils.GuidGenerator;
+import org.apache.cassandra.utils.SimpleCondition;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+
 public class MessagingService
 {
     private static int version_ = 1;
@@ -69,8 +66,8 @@ public class MessagingService
     /* Lookup table for registering message handlers based on the verb. */
     private static Map<StorageService.Verb, IVerbHandler> verbHandlers_;
 
-    /* Thread pool to handle deserialization of messages read from the socket. */
-    private static ExecutorService messageDeserializerExecutor_;
+    /* Thread pool to handle messages without a specialized stage */
+    private static ExecutorService defaultExecutor_;
     
     /* Thread pool to handle messaging write activities */
     private static ExecutorService streamExecutor_;
@@ -105,13 +102,7 @@ public class MessagingService
         callbackMap_ = new ExpiringMap<String, IAsyncCallback>( 2 * DatabaseDescriptor.getRpcTimeout() );
         taskCompletionMap_ = new ExpiringMap<String, IAsyncResult>( 2 * DatabaseDescriptor.getRpcTimeout() );
 
-        // read executor puts messages to deserialize on this.
-        messageDeserializerExecutor_ = new JMXEnabledThreadPoolExecutor(1,
-                                                                        Runtime.getRuntime().availableProcessors(),
-                                                                        Integer.MAX_VALUE,
-                                                                        TimeUnit.SECONDS,
-                                                                        new LinkedBlockingQueue<Runnable>(),
-                                                                        new NamedThreadFactory("MESSAGE-DESERIALIZER-POOL"));
+        defaultExecutor_ = new JMXEnabledThreadPoolExecutor("MISCELLANEOUS-POOL");
 
         streamExecutor_ = new JMXEnabledThreadPoolExecutor("MESSAGE-STREAMING-POOL");
         TimerTask logDropped = new TimerTask()
@@ -354,8 +345,8 @@ public class MessagingService
     /** blocks until the processing pools are empty and done. */
     public static void waitFor() throws InterruptedException
     {
-        while (!messageDeserializerExecutor_.isTerminated())
-            messageDeserializerExecutor_.awaitTermination(5, TimeUnit.SECONDS);
+        while (!defaultExecutor_.isTerminated())
+            defaultExecutor_.awaitTermination(5, TimeUnit.SECONDS);
         while (!streamExecutor_.isTerminated())
             streamExecutor_.awaitTermination(5, TimeUnit.SECONDS);
     }
@@ -373,7 +364,7 @@ public class MessagingService
             throw new IOError(e);
         }
 
-        messageDeserializerExecutor_.shutdownNow();
+        defaultExecutor_.shutdownNow();
         streamExecutor_.shutdownNow();
 
         /* shut down the cachetables */
@@ -385,14 +376,16 @@ public class MessagingService
 
     public static void receive(Message message)
     {
-        Runnable runnable = new MessageDeliveryTask(message);
+        message = SinkManager.processServerMessageSink(message);
 
+        Runnable runnable = new MessageDeliveryTask(message);
         ExecutorService stage = StageManager.getStage(message.getMessageType());
+
         if (stage == null)
         {
             if (logger_.isDebugEnabled())
                 logger_.debug("Running " + message.getMessageType() + " on default stage");
-            messageDeserializerExecutor_.execute(runnable);
+            defaultExecutor_.execute(runnable);
         }
         else
         {
@@ -425,11 +418,6 @@ public class MessagingService
         return taskCompletionMap_.getAge(key);
     }
 
-    public static ExecutorService getDeserializationExecutor()
-    {
-        return messageDeserializerExecutor_;
-    }
-
     public static void validateMagic(int magic) throws IOException
     {
         if (magic != PROTOCOL_MAGIC)