You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/11/16 19:22:15 UTC

svn commit: r880889 - in /incubator/cassandra/trunk: ./ src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/service/

Author: jbellis
Date: Mon Nov 16 18:22:14 2009
New Revision: 880889

URL: http://svn.apache.org/viewvc?rev=880889&view=rev
Log:
multithread recovery

Modified:
    incubator/cassandra/trunk/CHANGES.txt
    incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/IStage.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/MultiThreadedStage.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedStage.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java

Modified: incubator/cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/CHANGES.txt?rev=880889&r1=880888&r2=880889&view=diff
==============================================================================
--- incubator/cassandra/trunk/CHANGES.txt (original)
+++ incubator/cassandra/trunk/CHANGES.txt Mon Nov 16 18:22:14 2009
@@ -41,6 +41,7 @@
    interfaces (CASSANDRA-546)
  * stress.py benchmarking tool improvements (several tickets)
  * optimized replica placement code (CASSANDRA-525)
+ * faster log replay on restart (CASSANDRA-539, -540)
  
 
 0.4.2

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/IStage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/IStage.java?rev=880889&r1=880888&r2=880889&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/IStage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/IStage.java Mon Nov 16 18:22:14 2009
@@ -116,4 +116,6 @@
      * @return task count.
      */
     public long getPendingTasks();
+
+    public long getCompletedTasks();
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/MultiThreadedStage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/MultiThreadedStage.java?rev=880889&r1=880888&r2=880889&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/MultiThreadedStage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/MultiThreadedStage.java Mon Nov 16 18:22:14 2009
@@ -87,4 +87,9 @@
     public long getPendingTasks(){
         return executorService_.getPendingTasks();
     }
+
+    public long getCompletedTasks()
+    {
+        return executorService_.getCompletedTasks();
+    }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedStage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedStage.java?rev=880889&r1=880888&r2=880889&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedStage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/SingleThreadedStage.java Mon Nov 16 18:22:14 2009
@@ -96,4 +96,9 @@
     public long getPendingTasks(){
         return executorService_.getPendingTasks();
     }
+
+    public long getCompletedTasks()
+    {
+        return executorService_.getCompletedTasks();
+    }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java?rev=880889&r1=880888&r2=880889&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java Mon Nov 16 18:22:14 2009
@@ -23,6 +23,9 @@
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 
+import static org.apache.cassandra.config.DatabaseDescriptor.getConcurrentWriters;
+import static org.apache.cassandra.config.DatabaseDescriptor.getConcurrentReaders;
+
 
 /**
  * This class manages all stages that exist within a process. The application registers
@@ -33,6 +36,17 @@
 public class StageManager
 {
     private static Map<String, IStage > stageQueues_ = new HashMap<String, IStage>();
+
+    public final static String readStage_ = "ROW-READ-STAGE";
+    public final static String mutationStage_ = "ROW-MUTATION-STAGE";
+    public final static String streamStage_ = "STREAM-STAGE";
+
+    static
+    {
+        StageManager.registerStage(mutationStage_, new MultiThreadedStage(mutationStage_, getConcurrentWriters()));
+        StageManager.registerStage(readStage_, new MultiThreadedStage(readStage_, getConcurrentReaders()));
+        StageManager.registerStage(streamStage_, new SingleThreadedStage(streamStage_));
+    }
     
     /**
      * Register a stage with the StageManager

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=880889&r1=880888&r2=880889&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java Mon Nov 16 18:22:14 2009
@@ -24,6 +24,8 @@
 import org.apache.cassandra.io.DataOutputBuffer;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.FileUtils;
+import org.apache.cassandra.concurrent.StageManager;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.log4j.Logger;
 
@@ -285,13 +287,15 @@
     void recover(File[] clogs) throws IOException
     {
         Set<Table> tablesRecovered = new HashSet<Table>();
+        assert StageManager.getStage(StageManager.mutationStage_).getCompletedTasks() == 0;
+        int rows = 0;
 
         DataInputBuffer bufIn = new DataInputBuffer();
         for (File file : clogs)
         {
             int bufferSize = (int)Math.min(file.length(), 32 * 1024 * 1024);
             BufferedRandomAccessFile reader = new BufferedRandomAccessFile(file.getAbsolutePath(), "r", bufferSize);
-            CommitLogHeader clHeader = readCommitLogHeader(reader);
+            final CommitLogHeader clHeader = readCommitLogHeader(reader);
             /* seek to the lowest position where any CF has non-flushed data */
             int lowPos = CommitLogHeader.getLowestPosition(clHeader);
             if (lowPos == 0)
@@ -324,32 +328,61 @@
                 bufIn.reset(bytes, bytes.length);
 
                 /* read the commit log entry */
-                RowMutation rm = RowMutation.serializer().deserialize(bufIn);
+                final RowMutation rm = RowMutation.serializer().deserialize(bufIn);
                 if (logger_.isDebugEnabled())
                     logger_.debug(String.format("replaying mutation for %s.%s: %s",
                                                 rm.getTable(),
                                                 rm.key(),
                                                 "{" + StringUtils.join(rm.getColumnFamilies(), ", ") + "}"));
-                Table table = Table.open(rm.getTable());
+                final Table table = Table.open(rm.getTable());
                 tablesRecovered.add(table);
-                Collection<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>(rm.getColumnFamilies());
-                /* remove column families that have already been flushed */
-                for (ColumnFamily columnFamily : columnFamilies)
+                final Collection<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>(rm.getColumnFamilies());
+                final long entryLocation = reader.getFilePointer();
+                Runnable runnable = new Runnable()
                 {
-                    int id = table.getColumnFamilyId(columnFamily.name());
-                    if (!clHeader.isDirty(id) || reader.getFilePointer() < clHeader.getPosition(id))
+                    public void run()
                     {
-                        rm.removeColumnFamily(columnFamily);
+                        /* remove column families that have already been flushed before applying the rest */
+                        for (ColumnFamily columnFamily : columnFamilies)
+                        {
+                            int id = table.getColumnFamilyId(columnFamily.name());
+                            if (!clHeader.isDirty(id) || entryLocation < clHeader.getPosition(id))
+                            {
+                                rm.removeColumnFamily(columnFamily);
+                            }
+                        }
+                        if (!rm.isEmpty())
+                        {
+                            try
+                            {
+                                table.applyNow(rm);
+                            }
+                            catch (IOException e)
+                            {
+                                throw new RuntimeException(e);
+                            }
+                        }
                     }
-                }
-                if (!rm.isEmpty())
-                {
-                    table.applyNow(rm);
-                }
+                };
+                StageManager.getStage(StageManager.mutationStage_).execute(runnable);
+                rows++;
             }
             reader.close();
         }
 
+        // wait for all the writes to finish on the mutation stage
+        while (StageManager.getStage(StageManager.mutationStage_).getCompletedTasks() < rows)
+        {
+            try
+            {
+                Thread.sleep(10);
+            }
+            catch (InterruptedException e)
+            {
+                throw new AssertionError(e);
+            }
+        }
+
         // flush replayed tables, allowing commitlog segments to be removed
         List<Future<?>> futures = new ArrayList<Future<?>>();
         for (Table table : tablesRecovered)

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java?rev=880889&r1=880888&r2=880889&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java Mon Nov 16 18:22:14 2009
@@ -29,6 +29,7 @@
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.concurrent.StageManager;
 
 public class RangeCommand
 {
@@ -54,7 +55,7 @@
         DataOutputBuffer dob = new DataOutputBuffer();
         serializer.serialize(this, dob);
         return new Message(FBUtilities.getLocalAddress(),
-                           StorageService.readStage_,
+                           StageManager.readStage_,
                            StorageService.rangeVerbHandler_,
                            Arrays.copyOf(dob.getData(), dob.getLength()));
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java?rev=880889&r1=880888&r2=880889&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java Mon Nov 16 18:22:14 2009
@@ -31,6 +31,7 @@
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.concurrent.StageManager;
 
 
 public abstract class ReadCommand
@@ -53,7 +54,7 @@
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
         ReadCommand.serializer().serialize(this, dos);
-        return new Message(FBUtilities.getLocalAddress(), StorageService.readStage_, StorageService.readVerbHandler_, bos.toByteArray());
+        return new Message(FBUtilities.getLocalAddress(), StageManager.readStage_, StorageService.readVerbHandler_, bos.toByteArray());
     }
 
     public final QueryPath queryPath;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=880889&r1=880888&r2=880889&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Mon Nov 16 18:22:14 2009
@@ -36,12 +36,12 @@
 
 import org.apache.cassandra.io.DataOutputBuffer;
 import org.apache.cassandra.io.ICompactSerializer;
-import java.net.InetAddress;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.service.*;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.concurrent.StageManager;
 
 public class RowMutation implements Serializable
 {
@@ -224,7 +224,7 @@
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
         serializer().serialize(this, dos);
-        return new Message(FBUtilities.getLocalAddress(), StorageService.mutationStage_, verbHandlerName, bos.toByteArray());
+        return new Message(FBUtilities.getLocalAddress(), StageManager.mutationStage_, verbHandlerName, bos.toByteArray());
     }
 
     public static RowMutation getRowMutation(String table, String key, Map<String, List<ColumnOrSuperColumn>> cfmap)

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java?rev=880889&r1=880888&r2=880889&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java Mon Nov 16 18:22:14 2009
@@ -30,6 +30,7 @@
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.concurrent.StageManager;
 
 public class RowMutationMessage implements Serializable
 {   
@@ -51,7 +52,7 @@
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream( bos );
         RowMutationMessage.serializer().serialize(this, dos);
-        return new Message(FBUtilities.getLocalAddress(), StorageService.mutationStage_, verbHandlerName, bos.toByteArray());
+        return new Message(FBUtilities.getLocalAddress(), StageManager.mutationStage_, verbHandlerName, bos.toByteArray());
     }
     
     @XmlElement(name="RowMutation")

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java?rev=880889&r1=880888&r2=880889&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java Mon Nov 16 18:22:14 2009
@@ -24,6 +24,7 @@
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.concurrent.StageManager;
 
 
  /**
@@ -57,7 +58,7 @@
         {
             throw new IOError(e);
         }
-        return new Message(FBUtilities.getLocalAddress(), StorageService.streamStage_, StorageService.bootstrapMetadataVerbHandler_, bos.toByteArray() );
+        return new Message(FBUtilities.getLocalAddress(), StageManager.streamStage_, StorageService.bootstrapMetadataVerbHandler_, bos.toByteArray() );
     }        
     
     protected BootstrapMetadata[] bsMetadata_ = new BootstrapMetadata[0];

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=880889&r1=880888&r2=880889&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Mon Nov 16 18:22:14 2009
@@ -465,7 +465,7 @@
         for (ReadCommand command: commands)
         {
             Callable<Object> callable = new weakReadLocalCallable(command);
-            futures.add(StageManager.getStage(StorageService.readStage_).execute(callable));
+            futures.add(StageManager.getStage(StageManager.readStage_).execute(callable));
         }
         for (Future<Object> future : futures)
         {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=880889&r1=880888&r2=880889&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Mon Nov 16 18:22:14 2009
@@ -65,11 +65,6 @@
     public final static String STATE_LEAVING = "LEAVING";
     public final static String STATE_LEFT = "LEFT";
 
-    /* All stage identifiers */
-    public final static String mutationStage_ = "ROW-MUTATION-STAGE";
-    public final static String readStage_ = "ROW-READ-STAGE";
-    public final static String streamStage_ = "STREAM-STAGE";
-
     /* All verb handler identifiers */
     public final static String mutationVerbHandler_ = "ROW-MUTATION-VERB-HANDLER";
     public final static String binaryVerbHandler_ = "BINARY-VERB-HANDLER";
@@ -235,12 +230,6 @@
         MessagingService.instance().registerVerbHandlers(streamInitiateDoneVerbHandler_, new Streaming.StreamInitiateDoneVerbHandler());
         MessagingService.instance().registerVerbHandlers(streamFinishedVerbHandler_, new Streaming.StreamFinishedVerbHandler());
 
-        StageManager.registerStage(StorageService.mutationStage_,
-                                   new MultiThreadedStage(StorageService.mutationStage_, DatabaseDescriptor.getConcurrentWriters()));
-        StageManager.registerStage(StorageService.readStage_,
-                                   new MultiThreadedStage(StorageService.readStage_, DatabaseDescriptor.getConcurrentReaders()));
-        StageManager.registerStage(StorageService.streamStage_, new SingleThreadedStage(StorageService.streamStage_));
-
         Class<AbstractReplicationStrategy> cls = DatabaseDescriptor.getReplicaPlacementStrategyClass();
         Class [] parameterTypes = new Class[] { TokenMetadata.class, IPartitioner.class, int.class};
         try
@@ -998,7 +987,7 @@
                     }
                 }
             };
-            StageManager.getStage(streamStage_).execute(new Runnable()
+            StageManager.getStage(StageManager.streamStage_).execute(new Runnable()
             {
                 public void run()
                 {