You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/07/14 06:13:23 UTC

svn commit: r963941 - in /cassandra/trunk: CHANGES.txt src/java/org/apache/cassandra/db/commitlog/CommitLog.java src/java/org/apache/cassandra/net/MessageDeserializationTask.java src/java/org/apache/cassandra/utils/FBUtilities.java

Author: jbellis
Date: Wed Jul 14 04:13:23 2010
New Revision: 963941

URL: http://svn.apache.org/viewvc?rev=963941&view=rev
Log:
pre-emptively droprequests that cannot be processedwithin RPCTimeout.  patch by mdennis; reviewed by jbellis for CASSANDRA-685

Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
    cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeserializationTask.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=963941&r1=963940&r2=963941&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Jul 14 04:13:23 2010
@@ -39,6 +39,8 @@ dev
  * allow multiple repair sessions per node (CASSANDRA-1190)
  * add dynamic endpoint snitch (CASSANDRA-981)
  * optimize away MessagingService for local range queries (CASSANDRA-1261)
+ * pre-emptively drop requests that cannot be processed within RPCTimeout
+   (CASSANDRA-685)
 
 
 0.6.4

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=963941&r1=963940&r2=963941&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java Wed Jul 14 04:13:23 2010
@@ -28,6 +28,7 @@ import org.apache.cassandra.db.Table;
 import org.apache.cassandra.io.DeletionService;
 import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.WrappedRunnable;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -72,6 +73,7 @@ import java.util.zip.Checksum;
  */
 public class CommitLog
 {
+    private static final int MAX_OUTSTANDING_REPLAY_COUNT = 1024;
     private static volatile int SEGMENT_SIZE = 128*1024*1024; // roll after log gets this big
 
     static final Logger logger = LoggerFactory.getLogger(CommitLog.class);
@@ -180,7 +182,7 @@ public class CommitLog
     public static void recover(File[] clogs) throws IOException
     {
         Set<Table> tablesRecovered = new HashSet<Table>();
-        final AtomicInteger counter = new AtomicInteger(0);
+        List<Future<?>> futures = new ArrayList<Future<?>>();
         byte[] bytes = new byte[4096];
 
         for (File file : clogs)
@@ -278,47 +280,29 @@ public class CommitLog
                         {
                             Table.open(newRm.getTable()).apply(newRm, null, false);
                         }
-                        counter.decrementAndGet();
                     }
                 };
-                counter.incrementAndGet();
-                StageManager.getStage(StageManager.MUTATION_STAGE).submit(runnable);
+                futures.add(StageManager.getStage(StageManager.MUTATION_STAGE).submit(runnable));
+                if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
+                {
+                    FBUtilities.waitOnFutures(futures);
+                    futures.clear();
+                }
             }
             reader.close();
-            logger.info("Finished reading " + file);
+            logger.info("Finished reading " + file + " for recovery");
         }
 
         // wait for all the writes to finish on the mutation stage
-        while (counter.get() > 0)
-        {
-            try
-            {
-                Thread.sleep(10);
-            }
-            catch (InterruptedException e)
-            {
-                throw new AssertionError(e);
-            }
-        }
+        FBUtilities.waitOnFutures(futures);
+        logger.debug("Finished waiting on mutations from recovery");
 
         // flush replayed tables
-        List<Future<?>> futures = new ArrayList<Future<?>>();
+        futures.clear();
         for (Table table : tablesRecovered)
-        {
             futures.addAll(table.flush());
-        }
-        // wait for flushes to finish
-        for (Future<?> future : futures)
-        {
-            try
-            {
-                future.get();
-            }
-            catch (Exception e)
-            {
-                throw new RuntimeException(e);
-            }
-        }
+        FBUtilities.waitOnFutures(futures);
+        logger.info("Recovery complete");
     }
 
     private CommitLogSegment currentSegment()

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeserializationTask.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeserializationTask.java?rev=963941&r1=963940&r2=963941&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeserializationTask.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeserializationTask.java Wed Jul 14 04:13:23 2010
@@ -22,39 +22,36 @@ import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
 
-import org.apache.cassandra.net.sink.SinkManager;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class MessageDeserializationTask implements Runnable
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.utils.WrappedRunnable;
+
+class MessageDeserializationTask extends WrappedRunnable
 {
-    private static Logger logger_ = LoggerFactory.getLogger(MessageDeserializationTask.class);
+    private static Logger logger = LoggerFactory.getLogger(MessageDeserializationTask.class);
     
-    private ByteArrayInputStream bytes;
+    private final ByteArrayInputStream bytes;
+    private final long constructionTime = System.currentTimeMillis();
     
     MessageDeserializationTask(ByteArrayInputStream bytes)
     {
         this.bytes = bytes;
     }
-    
-    public void run()
+
+    public void runMayThrow() throws IOException
     {
-        Message message = null;
-        try
+        if (System.currentTimeMillis() >  constructionTime + DatabaseDescriptor.getRpcTimeout())
         {
-            message = Message.serializer().deserialize(new DataInputStream(bytes));
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
+            logger.warn(String.format("dropping message (%,dms past timeout)",
+                                      System.currentTimeMillis() - (constructionTime + DatabaseDescriptor.getRpcTimeout())));
+            return;
         }
 
-        if ( message != null )
-        {
-            message = SinkManager.processServerMessageSink(message);
-            MessagingService.receive(message);
-        }
+        Message message = Message.serializer().deserialize(new DataInputStream(bytes));
+        message = SinkManager.processServerMessageSink(message);
+        MessagingService.receive(message);
     }
-
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=963941&r1=963940&r2=963941&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java Wed Jul 14 04:13:23 2010
@@ -28,6 +28,8 @@ import java.nio.charset.CharacterCodingE
 import java.nio.charset.CharsetDecoder;
 import java.security.MessageDigest;
 import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -527,4 +529,23 @@ public class FBUtilities
         // we can't actually get microsecond precision.
         return System.currentTimeMillis() * 1000;
     }
+
+    public static void waitOnFutures(Collection<Future<?>> futures)
+    {
+        for (Future f : futures)
+        {
+            try
+            {
+                f.get();
+            }
+            catch (ExecutionException ee)
+            {
+                throw new RuntimeException(ee);
+            }
+            catch (InterruptedException ie)
+            {
+                throw new AssertionError(ie);
+            }
+        }
+    }
 }