You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/07/24 06:05:46 UTC

svn commit: r978791 - in /cassandra/branches/cassandra-0.6: ./ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/db/commitlog/ src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/utils/

Author: jbellis
Date: Sat Jul 24 04:05:46 2010
New Revision: 978791

URL: http://svn.apache.org/viewvc?rev=978791&view=rev
Log:
backport CASSANDRA-685 fix from r963941 in trunk

Modified:
    cassandra/branches/cassandra-0.6/   (props changed)
    cassandra/branches/cassandra-0.6/CHANGES.txt
    cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java   (props changed)
    cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java   (props changed)
    cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java   (props changed)
    cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java   (props changed)
    cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java   (props changed)
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeserializationTask.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/FBUtilities.java

Propchange: cassandra/branches/cassandra-0.6/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Jul 24 04:05:46 2010
@@ -1,4 +1,4 @@
-/cassandra/trunk:931148
+/cassandra/trunk:931148,963941
 /incubator/cassandra/branches/cassandra-0.3:774578-796573
 /incubator/cassandra/branches/cassandra-0.4:810145-834239,834349-834350
 /incubator/cassandra/branches/cassandra-0.5:888872-915439

Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=978791&r1=978790&r2=978791&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Sat Jul 24 04:05:46 2010
@@ -15,6 +15,8 @@
  * fix duplicate rows being read during mapreduce (CASSANDRA-1042)
  * failure detection wasn't closing command sockets (CASSANDRA-1221)
  * cassandra-cli.bat works on windows (CASSANDRA-1236)
+ * pre-emptively drop requests that cannot be processed within RPCTimeout
+   (CASSANDRA-685)
 
 
 0.6.3

Propchange: cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Jul 24 04:05:46 2010
@@ -1,4 +1,4 @@
-/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:931148
+/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:931148,963941
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-834239,834349-834350
 /incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Cassandra.java:888872-903502

Propchange: cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Jul 24 04:05:46 2010
@@ -1,4 +1,4 @@
-/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:931148
+/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:931148,963941
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-834239,834349-834350
 /incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Column.java:888872-903502

Propchange: cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Jul 24 04:05:46 2010
@@ -1,4 +1,4 @@
-/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:931148
+/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:931148,963941
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-834239,834349-834350
 /incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:888872-903502

Propchange: cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Jul 24 04:05:46 2010
@@ -1,4 +1,4 @@
-/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:931148
+/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:931148,963941
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-834239,834349-834350
 /incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:888872-903502

Propchange: cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Jul 24 04:05:46 2010
@@ -1,4 +1,4 @@
-/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:931148
+/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:931148,963941
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-834239,834349-834350
 /incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:888872-903502

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=978791&r1=978790&r2=978791&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/CommitLog.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/CommitLog.java Sat Jul 24 04:05:46 2010
@@ -26,6 +26,7 @@ import org.apache.cassandra.db.Table;
 import org.apache.cassandra.io.DeletionService;
 import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.WrappedRunnable;
 import org.apache.commons.lang.StringUtils;
 import org.apache.log4j.Logger;
@@ -69,6 +70,7 @@ import java.util.zip.Checksum;
  */
 public class CommitLog
 {
+    private static final int MAX_OUTSTANDING_REPLAY_COUNT = 1024;
     private static volatile int SEGMENT_SIZE = 128*1024*1024; // roll after log gets this big
 
     private static final Logger logger = Logger.getLogger(CommitLog.class);
@@ -177,7 +179,7 @@ public class CommitLog
     public static void recover(File[] clogs) throws IOException
     {
         Set<Table> tablesRecovered = new HashSet<Table>();
-        final AtomicInteger counter = new AtomicInteger(0);
+        List<Future<?>> futures = new ArrayList<Future<?>>();
         for (File file : clogs)
         {
             int bufferSize = (int)Math.min(file.length(), 32 * 1024 * 1024);
@@ -261,47 +263,29 @@ public class CommitLog
                         {
                             Table.open(rm.getTable()).apply(rm, null, false);
                         }
-                        counter.decrementAndGet();
                     }
                 };
-                counter.incrementAndGet();
-                StageManager.getStage(StageManager.MUTATION_STAGE).submit(runnable);
+                futures.add(StageManager.getStage(StageManager.MUTATION_STAGE).submit(runnable));
+                if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
+                {
+                    FBUtilities.waitOnFutures(futures);
+                    futures.clear();
+                }
             }
             reader.close();
             logger.info("Finished reading " + file);
         }
 
         // wait for all the writes to finish on the mutation stage
-        while (counter.get() > 0)
-        {
-            try
-            {
-                Thread.sleep(10);
-            }
-            catch (InterruptedException e)
-            {
-                throw new AssertionError(e);
-            }
-        }
+        FBUtilities.waitOnFutures(futures);
+        logger.debug("Finished waiting on mutations from recovery");
 
         // flush replayed tables
-        List<Future<?>> futures = new ArrayList<Future<?>>();
+        futures.clear();
         for (Table table : tablesRecovered)
-        {
             futures.addAll(table.flush());
-        }
-        // wait for flushes to finish
-        for (Future<?> future : futures)
-        {
-            try
-            {
-                future.get();
-            }
-            catch (Exception e)
-            {
-                throw new RuntimeException(e);
-            }
-        }
+        FBUtilities.waitOnFutures(futures);
+        logger.info("Recovery complete");
     }
 
     private CommitLogSegment currentSegment()

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeserializationTask.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeserializationTask.java?rev=978791&r1=978790&r2=978791&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeserializationTask.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeserializationTask.java Sat Jul 24 04:05:46 2010
@@ -22,38 +22,35 @@ import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
 
-import org.apache.cassandra.net.sink.SinkManager;
-
 import org.apache.log4j.Logger;
 
-class MessageDeserializationTask implements Runnable
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.utils.WrappedRunnable;
+
+class MessageDeserializationTask extends WrappedRunnable
 {
-    private static Logger logger_ = Logger.getLogger(MessageDeserializationTask.class);
+    private static Logger logger = Logger.getLogger(MessageDeserializationTask.class);
     
-    private ByteArrayInputStream bytes;
+    private final ByteArrayInputStream bytes;
+    private final long constructionTime = System.currentTimeMillis();
     
     MessageDeserializationTask(ByteArrayInputStream bytes)
     {
         this.bytes = bytes;
     }
-    
-    public void run()
+
+    public void runMayThrow() throws IOException
     {
-        Message message = null;
-        try
+        if (System.currentTimeMillis() >  constructionTime + DatabaseDescriptor.getRpcTimeout())
         {
-            message = Message.serializer().deserialize(new DataInputStream(bytes));
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
+            logger.warn(String.format("dropping message (%,dms past timeout)",
+                                      System.currentTimeMillis() - (constructionTime + DatabaseDescriptor.getRpcTimeout())));
+            return;
         }
 
-        if ( message != null )
-        {
-            message = SinkManager.processServerMessageSink(message);
-            MessagingService.receive(message);
-        }
+        Message message = Message.serializer().deserialize(new DataInputStream(bytes));
+        message = SinkManager.processServerMessageSink(message);
+        MessagingService.receive(message);
     }
-
 }

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/FBUtilities.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=978791&r1=978790&r2=978791&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/FBUtilities.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/FBUtilities.java Sat Jul 24 04:05:46 2010
@@ -25,6 +25,8 @@ import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.zip.DataFormatException;
@@ -487,4 +489,23 @@ public class FBUtilities
         ByteBuffer.wrap(bytes).putLong(n);
         return bytes;
     }
+
+    public static void waitOnFutures(Collection<Future<?>> futures)
+    {
+        for (Future f : futures)
+        {
+            try
+            {
+                f.get();
+            }
+            catch (ExecutionException ee)
+            {
+                throw new RuntimeException(ee);
+            }
+            catch (InterruptedException ie)
+            {
+                throw new AssertionError(ie);
+            }
+        }
+    }
 }