You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/11/16 19:22:15 UTC
svn commit: r880889 - in /incubator/cassandra/trunk: ./
src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/service/
Author: jbellis
Date: Mon Nov 16 18:22:14 2009
New Revision: 880889
URL: http://svn.apache.org/viewvc?rev=880889&view=rev
Log:
multithread recovery
Modified:
incubator/cassandra/trunk/CHANGES.txt
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/IStage.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/MultiThreadedStage.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedStage.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Modified: incubator/cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/CHANGES.txt?rev=880889&r1=880888&r2=880889&view=diff
==============================================================================
--- incubator/cassandra/trunk/CHANGES.txt (original)
+++ incubator/cassandra/trunk/CHANGES.txt Mon Nov 16 18:22:14 2009
@@ -41,6 +41,7 @@
interfaces (CASSANDRA-546)
* stress.py benchmarking tool improvements (several tickets)
* optimized replica placement code (CASSANDRA-525)
+ * faster log replay on restart (CASSANDRA-539, -540)
0.4.2
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/IStage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/IStage.java?rev=880889&r1=880888&r2=880889&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/IStage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/IStage.java Mon Nov 16 18:22:14 2009
@@ -116,4 +116,6 @@
* @return task count.
*/
public long getPendingTasks();
+
+ public long getCompletedTasks();
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/MultiThreadedStage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/MultiThreadedStage.java?rev=880889&r1=880888&r2=880889&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/MultiThreadedStage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/MultiThreadedStage.java Mon Nov 16 18:22:14 2009
@@ -87,4 +87,9 @@
public long getPendingTasks(){
return executorService_.getPendingTasks();
}
+
+ public long getCompletedTasks()
+ {
+ return executorService_.getCompletedTasks();
+ }
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedStage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedStage.java?rev=880889&r1=880888&r2=880889&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedStage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedStage.java Mon Nov 16 18:22:14 2009
@@ -96,4 +96,9 @@
public long getPendingTasks(){
return executorService_.getPendingTasks();
}
+
+ public long getCompletedTasks()
+ {
+ return executorService_.getCompletedTasks();
+ }
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java?rev=880889&r1=880888&r2=880889&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java Mon Nov 16 18:22:14 2009
@@ -23,6 +23,9 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
+import static org.apache.cassandra.config.DatabaseDescriptor.getConcurrentWriters;
+import static org.apache.cassandra.config.DatabaseDescriptor.getConcurrentReaders;
+
/**
* This class manages all stages that exist within a process. The application registers
@@ -33,6 +36,17 @@
public class StageManager
{
private static Map<String, IStage > stageQueues_ = new HashMap<String, IStage>();
+
+ public final static String readStage_ = "ROW-READ-STAGE";
+ public final static String mutationStage_ = "ROW-MUTATION-STAGE";
+ public final static String streamStage_ = "STREAM-STAGE";
+
+ static
+ {
+ StageManager.registerStage(mutationStage_, new MultiThreadedStage(mutationStage_, getConcurrentWriters()));
+ StageManager.registerStage(readStage_, new MultiThreadedStage(readStage_, getConcurrentReaders()));
+ StageManager.registerStage(streamStage_, new SingleThreadedStage(streamStage_));
+ }
/**
* Register a stage with the StageManager
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=880889&r1=880888&r2=880889&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java Mon Nov 16 18:22:14 2009
@@ -24,6 +24,8 @@
import org.apache.cassandra.io.DataOutputBuffer;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.FileUtils;
+import org.apache.cassandra.concurrent.StageManager;
+
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
@@ -285,13 +287,15 @@
void recover(File[] clogs) throws IOException
{
Set<Table> tablesRecovered = new HashSet<Table>();
+ assert StageManager.getStage(StageManager.mutationStage_).getCompletedTasks() == 0;
+ int rows = 0;
DataInputBuffer bufIn = new DataInputBuffer();
for (File file : clogs)
{
int bufferSize = (int)Math.min(file.length(), 32 * 1024 * 1024);
BufferedRandomAccessFile reader = new BufferedRandomAccessFile(file.getAbsolutePath(), "r", bufferSize);
- CommitLogHeader clHeader = readCommitLogHeader(reader);
+ final CommitLogHeader clHeader = readCommitLogHeader(reader);
/* seek to the lowest position where any CF has non-flushed data */
int lowPos = CommitLogHeader.getLowestPosition(clHeader);
if (lowPos == 0)
@@ -324,32 +328,61 @@
bufIn.reset(bytes, bytes.length);
/* read the commit log entry */
- RowMutation rm = RowMutation.serializer().deserialize(bufIn);
+ final RowMutation rm = RowMutation.serializer().deserialize(bufIn);
if (logger_.isDebugEnabled())
logger_.debug(String.format("replaying mutation for %s.%s: %s",
rm.getTable(),
rm.key(),
"{" + StringUtils.join(rm.getColumnFamilies(), ", ") + "}"));
- Table table = Table.open(rm.getTable());
+ final Table table = Table.open(rm.getTable());
tablesRecovered.add(table);
- Collection<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>(rm.getColumnFamilies());
- /* remove column families that have already been flushed */
- for (ColumnFamily columnFamily : columnFamilies)
+ final Collection<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>(rm.getColumnFamilies());
+ final long entryLocation = reader.getFilePointer();
+ Runnable runnable = new Runnable()
{
- int id = table.getColumnFamilyId(columnFamily.name());
- if (!clHeader.isDirty(id) || reader.getFilePointer() < clHeader.getPosition(id))
+ public void run()
{
- rm.removeColumnFamily(columnFamily);
+ /* remove column families that have already been flushed before applying the rest */
+ for (ColumnFamily columnFamily : columnFamilies)
+ {
+ int id = table.getColumnFamilyId(columnFamily.name());
+ if (!clHeader.isDirty(id) || entryLocation < clHeader.getPosition(id))
+ {
+ rm.removeColumnFamily(columnFamily);
+ }
+ }
+ if (!rm.isEmpty())
+ {
+ try
+ {
+ table.applyNow(rm);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
}
- }
- if (!rm.isEmpty())
- {
- table.applyNow(rm);
- }
+ };
+ StageManager.getStage(StageManager.mutationStage_).execute(runnable);
+ rows++;
}
reader.close();
}
+ // wait for all the writes to finish on the mutation stage
+ while (StageManager.getStage(StageManager.mutationStage_).getCompletedTasks() < rows)
+ {
+ try
+ {
+ Thread.sleep(10);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+
// flush replayed tables, allowing commitlog segments to be removed
List<Future<?>> futures = new ArrayList<Future<?>>();
for (Table table : tablesRecovered)
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java?rev=880889&r1=880888&r2=880889&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java Mon Nov 16 18:22:14 2009
@@ -29,6 +29,7 @@
import org.apache.cassandra.net.Message;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.concurrent.StageManager;
public class RangeCommand
{
@@ -54,7 +55,7 @@
DataOutputBuffer dob = new DataOutputBuffer();
serializer.serialize(this, dob);
return new Message(FBUtilities.getLocalAddress(),
- StorageService.readStage_,
+ StageManager.readStage_,
StorageService.rangeVerbHandler_,
Arrays.copyOf(dob.getData(), dob.getLength()));
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java?rev=880889&r1=880888&r2=880889&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java Mon Nov 16 18:22:14 2009
@@ -31,6 +31,7 @@
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.concurrent.StageManager;
public abstract class ReadCommand
@@ -53,7 +54,7 @@
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
ReadCommand.serializer().serialize(this, dos);
- return new Message(FBUtilities.getLocalAddress(), StorageService.readStage_, StorageService.readVerbHandler_, bos.toByteArray());
+ return new Message(FBUtilities.getLocalAddress(), StageManager.readStage_, StorageService.readVerbHandler_, bos.toByteArray());
}
public final QueryPath queryPath;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=880889&r1=880888&r2=880889&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Mon Nov 16 18:22:14 2009
@@ -36,12 +36,12 @@
import org.apache.cassandra.io.DataOutputBuffer;
import org.apache.cassandra.io.ICompactSerializer;
-import java.net.InetAddress;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.service.*;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.concurrent.StageManager;
public class RowMutation implements Serializable
{
@@ -224,7 +224,7 @@
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
serializer().serialize(this, dos);
- return new Message(FBUtilities.getLocalAddress(), StorageService.mutationStage_, verbHandlerName, bos.toByteArray());
+ return new Message(FBUtilities.getLocalAddress(), StageManager.mutationStage_, verbHandlerName, bos.toByteArray());
}
public static RowMutation getRowMutation(String table, String key, Map<String, List<ColumnOrSuperColumn>> cfmap)
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java?rev=880889&r1=880888&r2=880889&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java Mon Nov 16 18:22:14 2009
@@ -30,6 +30,7 @@
import org.apache.cassandra.net.Message;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.concurrent.StageManager;
public class RowMutationMessage implements Serializable
{
@@ -51,7 +52,7 @@
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream( bos );
RowMutationMessage.serializer().serialize(this, dos);
- return new Message(FBUtilities.getLocalAddress(), StorageService.mutationStage_, verbHandlerName, bos.toByteArray());
+ return new Message(FBUtilities.getLocalAddress(), StageManager.mutationStage_, verbHandlerName, bos.toByteArray());
}
@XmlElement(name="RowMutation")
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java?rev=880889&r1=880888&r2=880889&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java Mon Nov 16 18:22:14 2009
@@ -24,6 +24,7 @@
import org.apache.cassandra.net.Message;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.concurrent.StageManager;
/**
@@ -57,7 +58,7 @@
{
throw new IOError(e);
}
- return new Message(FBUtilities.getLocalAddress(), StorageService.streamStage_, StorageService.bootstrapMetadataVerbHandler_, bos.toByteArray() );
+ return new Message(FBUtilities.getLocalAddress(), StageManager.streamStage_, StorageService.bootstrapMetadataVerbHandler_, bos.toByteArray() );
}
protected BootstrapMetadata[] bsMetadata_ = new BootstrapMetadata[0];
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=880889&r1=880888&r2=880889&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Mon Nov 16 18:22:14 2009
@@ -465,7 +465,7 @@
for (ReadCommand command: commands)
{
Callable<Object> callable = new weakReadLocalCallable(command);
- futures.add(StageManager.getStage(StorageService.readStage_).execute(callable));
+ futures.add(StageManager.getStage(StageManager.readStage_).execute(callable));
}
for (Future<Object> future : futures)
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=880889&r1=880888&r2=880889&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Mon Nov 16 18:22:14 2009
@@ -65,11 +65,6 @@
public final static String STATE_LEAVING = "LEAVING";
public final static String STATE_LEFT = "LEFT";
- /* All stage identifiers */
- public final static String mutationStage_ = "ROW-MUTATION-STAGE";
- public final static String readStage_ = "ROW-READ-STAGE";
- public final static String streamStage_ = "STREAM-STAGE";
-
/* All verb handler identifiers */
public final static String mutationVerbHandler_ = "ROW-MUTATION-VERB-HANDLER";
public final static String binaryVerbHandler_ = "BINARY-VERB-HANDLER";
@@ -235,12 +230,6 @@
MessagingService.instance().registerVerbHandlers(streamInitiateDoneVerbHandler_, new Streaming.StreamInitiateDoneVerbHandler());
MessagingService.instance().registerVerbHandlers(streamFinishedVerbHandler_, new Streaming.StreamFinishedVerbHandler());
- StageManager.registerStage(StorageService.mutationStage_,
- new MultiThreadedStage(StorageService.mutationStage_, DatabaseDescriptor.getConcurrentWriters()));
- StageManager.registerStage(StorageService.readStage_,
- new MultiThreadedStage(StorageService.readStage_, DatabaseDescriptor.getConcurrentReaders()));
- StageManager.registerStage(StorageService.streamStage_, new SingleThreadedStage(StorageService.streamStage_));
-
Class<AbstractReplicationStrategy> cls = DatabaseDescriptor.getReplicaPlacementStrategyClass();
Class [] parameterTypes = new Class[] { TokenMetadata.class, IPartitioner.class, int.class};
try
@@ -998,7 +987,7 @@
}
}
};
- StageManager.getStage(streamStage_).execute(new Runnable()
+ StageManager.getStage(StageManager.streamStage_).execute(new Runnable()
{
public void run()
{