You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/09/21 15:14:47 UTC

svn commit: r1173615 - in /cassandra/branches/cassandra-1.0.0: ./ src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/sstable/

Author: jbellis
Date: Wed Sep 21 13:14:47 2011
New Revision: 1173615

URL: http://svn.apache.org/viewvc?rev=1173615&view=rev
Log:
parallelize sstable open at server startup
patch by Melvin Wang and jbellis; reviewed by pyaskevich for CASSANDRA-2988

Modified:
    cassandra/branches/cassandra-1.0.0/CHANGES.txt
    cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
    cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/io/sstable/SSTableReader.java

Modified: cassandra/branches/cassandra-1.0.0/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/CHANGES.txt?rev=1173615&r1=1173614&r2=1173615&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/CHANGES.txt (original)
+++ cassandra/branches/cassandra-1.0.0/CHANGES.txt Wed Sep 21 13:14:47 2011
@@ -12,6 +12,7 @@
    schema definitions were found (CASSANDRA-3219)
  * Fixes for LeveledCompactionStrategy score computation, prioritization,
    and scheduling (CASSANDRA-3224)
+ * parallelize sstable open at server startup (CASSANDRA-2988)
 
 
 1.0.0-beta1

Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=1173615&r1=1173614&r2=1173615&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java (original)
+++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java Wed Sep 21 13:14:47 2011
@@ -98,6 +98,11 @@ public class DebuggableThreadPoolExecuto
         this.setRejectedExecutionHandler(blockingExecutionHandler);
     }
 
+    public static DebuggableThreadPoolExecutor createWithPoolSize(String threadPoolName, int size)
+    {
+        return new DebuggableThreadPoolExecutor(size, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName));
+    }
+
     protected void onInitialRejection(Runnable task) {}
     protected void onFinalAccept(Runnable task) {}
     protected void onFinalRejection(Runnable task) {}

Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1173615&r1=1173614&r2=1173615&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Sep 21 13:14:47 2011
@@ -212,15 +212,8 @@ public class ColumnFamilyStore implement
         // scan for sstables corresponding to this cf and load them
         data = new DataTracker(this);
         Set<DecoratedKey> savedKeys = keyCache.readSaved();
-        List<SSTableReader> sstables = new ArrayList<SSTableReader>();
-        for (Map.Entry<Descriptor,Set<Component>> sstableFiles : files(table.name, columnFamilyName, false, false).entrySet())
-        {
-            SSTableReader reader = openSSTableReader(sstableFiles, savedKeys, data, metadata, partitioner);
-
-            if (reader != null) // if == null, logger errors where already fired
-                sstables.add(reader);
-        }
-        data.addSSTables(sstables);
+        Set<Map.Entry<Descriptor, Set<Component>>> entries = files(table.name, columnFamilyName, false, false).entrySet();
+        data.addSSTables(SSTableReader.batchOpen(entries, savedKeys, data, metadata, this.partitioner));
 
         // compaction strategy should be created after the CFS has been prepared
         this.compactionStrategy = metadata.createCompactionStrategyInstance(this);
@@ -541,10 +534,15 @@ public class ColumnFamilyStore implement
                                                          descriptor));
 
             logger.info("Initializing new SSTable {}", rawSSTable);
-            reader = openSSTableReader(rawSSTable, savedKeys, data, metadata, partitioner);
-
-            if (reader == null)
-                continue; // something wrong with SSTable, skipping
+            try
+            {
+                reader = SSTableReader.open(rawSSTable.getKey(), rawSSTable.getValue(), savedKeys, data, metadata, partitioner);
+            }
+            catch (IOException e)
+            {
+                SSTableReader.logOpenException(rawSSTable.getKey(), e);
+                continue;
+            }
 
             sstables.add(reader);
 
@@ -1892,30 +1890,6 @@ public class ColumnFamilyStore implement
        return indexManager.getBuiltIndexes();
     }
 
-    private static SSTableReader openSSTableReader(Map.Entry<Descriptor, Set<Component>> rawSSTable,
-                                                   Set<DecoratedKey> savedKeys,
-                                                   DataTracker tracker,
-                                                   CFMetaData metadata,
-                                                   IPartitioner partitioner)
-    {
-        SSTableReader reader = null;
-
-        try
-        {
-            reader = SSTableReader.open(rawSSTable.getKey(), rawSSTable.getValue(), savedKeys, tracker, metadata, partitioner);
-        }
-        catch (FileNotFoundException ex)
-        {
-            logger.error("Missing sstable component in " + rawSSTable + "; skipped because of " + ex.getMessage());
-        }
-        catch (IOException ex)
-        {
-            logger.error("Corrupt sstable " + rawSSTable + "; skipped", ex);
-        }
-
-        return reader;
-    }
-
     public int getUnleveledSSTables()
     {
         return this.compactionStrategy instanceof LeveledCompactionStrategy

Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1173615&r1=1173614&r2=1173615&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Wed Sep 21 13:14:47 2011
@@ -24,9 +24,12 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.*;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Collections2;
+
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
 import org.slf4j.Logger;
@@ -164,6 +167,59 @@ public class SSTableReader extends SSTab
         return sstable;
     }
 
+    public static void logOpenException(Descriptor descriptor, IOException e)
+    {
+        if (e instanceof FileNotFoundException)
+            logger.error("Missing sstable component in " + descriptor + "; skipped because of " + e.getMessage());
+        else
+            logger.error("Corrupt sstable " + descriptor + "; skipped", e);
+    }
+
+    public static Collection<SSTableReader> batchOpen(Set<Map.Entry<Descriptor, Set<Component>>> entries,
+                                                      final Set<DecoratedKey> savedKeys,
+                                                      final DataTracker tracker,
+                                                      final CFMetaData metadata,
+                                                      final IPartitioner partitioner)
+    {
+        final Collection<SSTableReader> sstables = new LinkedBlockingQueue<SSTableReader>();
+
+        ExecutorService executor = DebuggableThreadPoolExecutor.createWithPoolSize("SSTableBatchOpen", Runtime.getRuntime().availableProcessors());
+        for (final Map.Entry<Descriptor, Set<Component>> entry : entries)
+        {
+            Runnable runnable = new Runnable()
+            {
+                public void run()
+                {
+                    SSTableReader sstable;
+                    try
+                    {
+                        sstable = open(entry.getKey(), entry.getValue(), savedKeys, tracker, metadata, partitioner);
+                    }
+                    catch (IOException ex)
+                    {
+                        logger.error("Corrupt sstable " + entry + "; skipped", ex);
+                        return;
+                    }
+                    sstables.add(sstable);
+                }
+            };
+            executor.submit(runnable);
+        }
+
+        executor.shutdown();
+        try
+        {
+            executor.awaitTermination(7, TimeUnit.DAYS);
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
+
+        return sstables;
+
+    }
+
     /**
      * Open a RowIndexedReader which already has its state initialized (by SSTableWriter).
      */