You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/07/24 06:05:46 UTC
svn commit: r978791 - in /cassandra/branches/cassandra-0.6: ./
interface/thrift/gen-java/org/apache/cassandra/thrift/
src/java/org/apache/cassandra/db/commitlog/
src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/utils/
Author: jbellis
Date: Sat Jul 24 04:05:46 2010
New Revision: 978791
URL: http://svn.apache.org/viewvc?rev=978791&view=rev
Log:
backport CASSANDRA-685 fix from r963941 in trunk
Modified:
cassandra/branches/cassandra-0.6/ (props changed)
cassandra/branches/cassandra-0.6/CHANGES.txt
cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed)
cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed)
cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed)
cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed)
cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed)
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeserializationTask.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/FBUtilities.java
Propchange: cassandra/branches/cassandra-0.6/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Jul 24 04:05:46 2010
@@ -1,4 +1,4 @@
-/cassandra/trunk:931148
+/cassandra/trunk:931148,963941
/incubator/cassandra/branches/cassandra-0.3:774578-796573
/incubator/cassandra/branches/cassandra-0.4:810145-834239,834349-834350
/incubator/cassandra/branches/cassandra-0.5:888872-915439
Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=978791&r1=978790&r2=978791&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Sat Jul 24 04:05:46 2010
@@ -15,6 +15,8 @@
* fix duplicate rows being read during mapreduce (CASSANDRA-1042)
* failure detection wasn't closing command sockets (CASSANDRA-1221)
* cassandra-cli.bat works on windows (CASSANDRA-1236)
+ * pre-emptively drop requests that cannot be processed within RPCTimeout
+ (CASSANDRA-685)
0.6.3
Propchange: cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Jul 24 04:05:46 2010
@@ -1,4 +1,4 @@
-/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:931148
+/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:931148,963941
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-834239,834349-834350
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Cassandra.java:888872-903502
Propchange: cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Jul 24 04:05:46 2010
@@ -1,4 +1,4 @@
-/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:931148
+/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:931148,963941
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-834239,834349-834350
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Column.java:888872-903502
Propchange: cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Jul 24 04:05:46 2010
@@ -1,4 +1,4 @@
-/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:931148
+/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:931148,963941
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-834239,834349-834350
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:888872-903502
Propchange: cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Jul 24 04:05:46 2010
@@ -1,4 +1,4 @@
-/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:931148
+/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:931148,963941
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-834239,834349-834350
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:888872-903502
Propchange: cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Jul 24 04:05:46 2010
@@ -1,4 +1,4 @@
-/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:931148
+/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:931148,963941
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-834239,834349-834350
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:888872-903502
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=978791&r1=978790&r2=978791&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/CommitLog.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/commitlog/CommitLog.java Sat Jul 24 04:05:46 2010
@@ -26,6 +26,7 @@ import org.apache.cassandra.db.Table;
import org.apache.cassandra.io.DeletionService;
import org.apache.cassandra.io.util.BufferedRandomAccessFile;
import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
@@ -69,6 +70,7 @@ import java.util.zip.Checksum;
*/
public class CommitLog
{
+ private static final int MAX_OUTSTANDING_REPLAY_COUNT = 1024;
private static volatile int SEGMENT_SIZE = 128*1024*1024; // roll after log gets this big
private static final Logger logger = Logger.getLogger(CommitLog.class);
@@ -177,7 +179,7 @@ public class CommitLog
public static void recover(File[] clogs) throws IOException
{
Set<Table> tablesRecovered = new HashSet<Table>();
- final AtomicInteger counter = new AtomicInteger(0);
+ List<Future<?>> futures = new ArrayList<Future<?>>();
for (File file : clogs)
{
int bufferSize = (int)Math.min(file.length(), 32 * 1024 * 1024);
@@ -261,47 +263,29 @@ public class CommitLog
{
Table.open(rm.getTable()).apply(rm, null, false);
}
- counter.decrementAndGet();
}
};
- counter.incrementAndGet();
- StageManager.getStage(StageManager.MUTATION_STAGE).submit(runnable);
+ futures.add(StageManager.getStage(StageManager.MUTATION_STAGE).submit(runnable));
+ if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
+ {
+ FBUtilities.waitOnFutures(futures);
+ futures.clear();
+ }
}
reader.close();
logger.info("Finished reading " + file);
}
// wait for all the writes to finish on the mutation stage
- while (counter.get() > 0)
- {
- try
- {
- Thread.sleep(10);
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
- }
+ FBUtilities.waitOnFutures(futures);
+ logger.debug("Finished waiting on mutations from recovery");
// flush replayed tables
- List<Future<?>> futures = new ArrayList<Future<?>>();
+ futures.clear();
for (Table table : tablesRecovered)
- {
futures.addAll(table.flush());
- }
- // wait for flushes to finish
- for (Future<?> future : futures)
- {
- try
- {
- future.get();
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- }
+ FBUtilities.waitOnFutures(futures);
+ logger.info("Recovery complete");
}
private CommitLogSegment currentSegment()
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeserializationTask.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeserializationTask.java?rev=978791&r1=978790&r2=978791&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeserializationTask.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeserializationTask.java Sat Jul 24 04:05:46 2010
@@ -22,38 +22,35 @@ import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
-import org.apache.cassandra.net.sink.SinkManager;
-
import org.apache.log4j.Logger;
-class MessageDeserializationTask implements Runnable
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.utils.WrappedRunnable;
+
+class MessageDeserializationTask extends WrappedRunnable
{
- private static Logger logger_ = Logger.getLogger(MessageDeserializationTask.class);
+ private static Logger logger = Logger.getLogger(MessageDeserializationTask.class);
- private ByteArrayInputStream bytes;
+ private final ByteArrayInputStream bytes;
+ private final long constructionTime = System.currentTimeMillis();
MessageDeserializationTask(ByteArrayInputStream bytes)
{
this.bytes = bytes;
}
-
- public void run()
+
+ public void runMayThrow() throws IOException
{
- Message message = null;
- try
+ if (System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getRpcTimeout())
{
- message = Message.serializer().deserialize(new DataInputStream(bytes));
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
+ logger.warn(String.format("dropping message (%,dms past timeout)",
+ System.currentTimeMillis() - (constructionTime + DatabaseDescriptor.getRpcTimeout())));
+ return;
}
- if ( message != null )
- {
- message = SinkManager.processServerMessageSink(message);
- MessagingService.receive(message);
- }
+ Message message = Message.serializer().deserialize(new DataInputStream(bytes));
+ message = SinkManager.processServerMessageSink(message);
+ MessagingService.receive(message);
}
-
}
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/FBUtilities.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=978791&r1=978790&r2=978791&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/FBUtilities.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/FBUtilities.java Sat Jul 24 04:05:46 2010
@@ -25,6 +25,8 @@ import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.DataFormatException;
@@ -487,4 +489,23 @@ public class FBUtilities
ByteBuffer.wrap(bytes).putLong(n);
return bytes;
}
+
+ public static void waitOnFutures(Collection<Future<?>> futures)
+ {
+ for (Future f : futures)
+ {
+ try
+ {
+ f.get();
+ }
+ catch (ExecutionException ee)
+ {
+ throw new RuntimeException(ee);
+ }
+ catch (InterruptedException ie)
+ {
+ throw new AssertionError(ie);
+ }
+ }
+ }
}