You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/07/14 06:13:23 UTC
svn commit: r963941 - in /cassandra/trunk: CHANGES.txt
src/java/org/apache/cassandra/db/commitlog/CommitLog.java
src/java/org/apache/cassandra/net/MessageDeserializationTask.java
src/java/org/apache/cassandra/utils/FBUtilities.java
Author: jbellis
Date: Wed Jul 14 04:13:23 2010
New Revision: 963941
URL: http://svn.apache.org/viewvc?rev=963941&view=rev
Log:
pre-emptively droprequests that cannot be processedwithin RPCTimeout. patch by mdennis; reviewed by jbellis for CASSANDRA-685
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeserializationTask.java
cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=963941&r1=963940&r2=963941&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Jul 14 04:13:23 2010
@@ -39,6 +39,8 @@ dev
* allow multiple repair sessions per node (CASSANDRA-1190)
* add dynamic endpoint snitch (CASSANDRA-981)
* optimize away MessagingService for local range queries (CASSANDRA-1261)
+ * pre-emptively drop requests that cannot be processed within RPCTimeout
+ (CASSANDRA-685)
0.6.4
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=963941&r1=963940&r2=963941&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java Wed Jul 14 04:13:23 2010
@@ -28,6 +28,7 @@ import org.apache.cassandra.db.Table;
import org.apache.cassandra.io.DeletionService;
import org.apache.cassandra.io.util.BufferedRandomAccessFile;
import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
@@ -72,6 +73,7 @@ import java.util.zip.Checksum;
*/
public class CommitLog
{
+ private static final int MAX_OUTSTANDING_REPLAY_COUNT = 1024;
private static volatile int SEGMENT_SIZE = 128*1024*1024; // roll after log gets this big
static final Logger logger = LoggerFactory.getLogger(CommitLog.class);
@@ -180,7 +182,7 @@ public class CommitLog
public static void recover(File[] clogs) throws IOException
{
Set<Table> tablesRecovered = new HashSet<Table>();
- final AtomicInteger counter = new AtomicInteger(0);
+ List<Future<?>> futures = new ArrayList<Future<?>>();
byte[] bytes = new byte[4096];
for (File file : clogs)
@@ -278,47 +280,29 @@ public class CommitLog
{
Table.open(newRm.getTable()).apply(newRm, null, false);
}
- counter.decrementAndGet();
}
};
- counter.incrementAndGet();
- StageManager.getStage(StageManager.MUTATION_STAGE).submit(runnable);
+ futures.add(StageManager.getStage(StageManager.MUTATION_STAGE).submit(runnable));
+ if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
+ {
+ FBUtilities.waitOnFutures(futures);
+ futures.clear();
+ }
}
reader.close();
- logger.info("Finished reading " + file);
+ logger.info("Finished reading " + file + " for recovery");
}
// wait for all the writes to finish on the mutation stage
- while (counter.get() > 0)
- {
- try
- {
- Thread.sleep(10);
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
- }
+ FBUtilities.waitOnFutures(futures);
+ logger.debug("Finished waiting on mutations from recovery");
// flush replayed tables
- List<Future<?>> futures = new ArrayList<Future<?>>();
+ futures.clear();
for (Table table : tablesRecovered)
- {
futures.addAll(table.flush());
- }
- // wait for flushes to finish
- for (Future<?> future : futures)
- {
- try
- {
- future.get();
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- }
+ FBUtilities.waitOnFutures(futures);
+ logger.info("Recovery complete");
}
private CommitLogSegment currentSegment()
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeserializationTask.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeserializationTask.java?rev=963941&r1=963940&r2=963941&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeserializationTask.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeserializationTask.java Wed Jul 14 04:13:23 2010
@@ -22,39 +22,36 @@ import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
-import org.apache.cassandra.net.sink.SinkManager;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-class MessageDeserializationTask implements Runnable
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.utils.WrappedRunnable;
+
+class MessageDeserializationTask extends WrappedRunnable
{
- private static Logger logger_ = LoggerFactory.getLogger(MessageDeserializationTask.class);
+ private static Logger logger = LoggerFactory.getLogger(MessageDeserializationTask.class);
- private ByteArrayInputStream bytes;
+ private final ByteArrayInputStream bytes;
+ private final long constructionTime = System.currentTimeMillis();
MessageDeserializationTask(ByteArrayInputStream bytes)
{
this.bytes = bytes;
}
-
- public void run()
+
+ public void runMayThrow() throws IOException
{
- Message message = null;
- try
+ if (System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getRpcTimeout())
{
- message = Message.serializer().deserialize(new DataInputStream(bytes));
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
+ logger.warn(String.format("dropping message (%,dms past timeout)",
+ System.currentTimeMillis() - (constructionTime + DatabaseDescriptor.getRpcTimeout())));
+ return;
}
- if ( message != null )
- {
- message = SinkManager.processServerMessageSink(message);
- MessagingService.receive(message);
- }
+ Message message = Message.serializer().deserialize(new DataInputStream(bytes));
+ message = SinkManager.processServerMessageSink(message);
+ MessagingService.receive(message);
}
-
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=963941&r1=963940&r2=963941&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java Wed Jul 14 04:13:23 2010
@@ -28,6 +28,8 @@ import java.nio.charset.CharacterCodingE
import java.nio.charset.CharsetDecoder;
import java.security.MessageDigest;
import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -527,4 +529,23 @@ public class FBUtilities
// we can't actually get microsecond precision.
return System.currentTimeMillis() * 1000;
}
+
+ public static void waitOnFutures(Collection<Future<?>> futures)
+ {
+ for (Future f : futures)
+ {
+ try
+ {
+ f.get();
+ }
+ catch (ExecutionException ee)
+ {
+ throw new RuntimeException(ee);
+ }
+ catch (InterruptedException ie)
+ {
+ throw new AssertionError(ie);
+ }
+ }
+ }
}