You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/09/21 15:14:47 UTC
svn commit: r1173615 - in /cassandra/branches/cassandra-1.0.0: ./
src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/io/sstable/
Author: jbellis
Date: Wed Sep 21 13:14:47 2011
New Revision: 1173615
URL: http://svn.apache.org/viewvc?rev=1173615&view=rev
Log:
parallelize sstable open at server startup
patch by Melvin Wang and jbellis; reviewed by pyaskevich for CASSANDRA-2988
Modified:
cassandra/branches/cassandra-1.0.0/CHANGES.txt
cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
Modified: cassandra/branches/cassandra-1.0.0/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/CHANGES.txt?rev=1173615&r1=1173614&r2=1173615&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/CHANGES.txt (original)
+++ cassandra/branches/cassandra-1.0.0/CHANGES.txt Wed Sep 21 13:14:47 2011
@@ -12,6 +12,7 @@
schema definitions were found (CASSANDRA-3219)
* Fixes for LeveledCompactionStrategy score computation, prioritization,
and scheduling (CASSANDRA-3224)
+ * parallelize sstable open at server startup (CASSANDRA-2988)
1.0.0-beta1
Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=1173615&r1=1173614&r2=1173615&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java (original)
+++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java Wed Sep 21 13:14:47 2011
@@ -98,6 +98,11 @@ public class DebuggableThreadPoolExecuto
this.setRejectedExecutionHandler(blockingExecutionHandler);
}
+ public static DebuggableThreadPoolExecutor createWithPoolSize(String threadPoolName, int size)
+ {
+ return new DebuggableThreadPoolExecutor(size, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(threadPoolName));
+ }
+
protected void onInitialRejection(Runnable task) {}
protected void onFinalAccept(Runnable task) {}
protected void onFinalRejection(Runnable task) {}
Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1173615&r1=1173614&r2=1173615&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Sep 21 13:14:47 2011
@@ -212,15 +212,8 @@ public class ColumnFamilyStore implement
// scan for sstables corresponding to this cf and load them
data = new DataTracker(this);
Set<DecoratedKey> savedKeys = keyCache.readSaved();
- List<SSTableReader> sstables = new ArrayList<SSTableReader>();
- for (Map.Entry<Descriptor,Set<Component>> sstableFiles : files(table.name, columnFamilyName, false, false).entrySet())
- {
- SSTableReader reader = openSSTableReader(sstableFiles, savedKeys, data, metadata, partitioner);
-
- if (reader != null) // if == null, logger errors where already fired
- sstables.add(reader);
- }
- data.addSSTables(sstables);
+ Set<Map.Entry<Descriptor, Set<Component>>> entries = files(table.name, columnFamilyName, false, false).entrySet();
+ data.addSSTables(SSTableReader.batchOpen(entries, savedKeys, data, metadata, this.partitioner));
// compaction strategy should be created after the CFS has been prepared
this.compactionStrategy = metadata.createCompactionStrategyInstance(this);
@@ -541,10 +534,15 @@ public class ColumnFamilyStore implement
descriptor));
logger.info("Initializing new SSTable {}", rawSSTable);
- reader = openSSTableReader(rawSSTable, savedKeys, data, metadata, partitioner);
-
- if (reader == null)
- continue; // something wrong with SSTable, skipping
+ try
+ {
+ reader = SSTableReader.open(rawSSTable.getKey(), rawSSTable.getValue(), savedKeys, data, metadata, partitioner);
+ }
+ catch (IOException e)
+ {
+ SSTableReader.logOpenException(rawSSTable.getKey(), e);
+ continue;
+ }
sstables.add(reader);
@@ -1892,30 +1890,6 @@ public class ColumnFamilyStore implement
return indexManager.getBuiltIndexes();
}
- private static SSTableReader openSSTableReader(Map.Entry<Descriptor, Set<Component>> rawSSTable,
- Set<DecoratedKey> savedKeys,
- DataTracker tracker,
- CFMetaData metadata,
- IPartitioner partitioner)
- {
- SSTableReader reader = null;
-
- try
- {
- reader = SSTableReader.open(rawSSTable.getKey(), rawSSTable.getValue(), savedKeys, tracker, metadata, partitioner);
- }
- catch (FileNotFoundException ex)
- {
- logger.error("Missing sstable component in " + rawSSTable + "; skipped because of " + ex.getMessage());
- }
- catch (IOException ex)
- {
- logger.error("Corrupt sstable " + rawSSTable + "; skipped", ex);
- }
-
- return reader;
- }
-
public int getUnleveledSSTables()
{
return this.compactionStrategy instanceof LeveledCompactionStrategy
Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1173615&r1=1173614&r2=1173615&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Wed Sep 21 13:14:47 2011
@@ -24,9 +24,12 @@ import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.*;
import com.google.common.base.Function;
import com.google.common.collect.Collections2;
+
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
import org.slf4j.Logger;
@@ -164,6 +167,59 @@ public class SSTableReader extends SSTab
return sstable;
}
+ public static void logOpenException(Descriptor descriptor, IOException e)
+ {
+ if (e instanceof FileNotFoundException)
+ logger.error("Missing sstable component in " + descriptor + "; skipped because of " + e.getMessage());
+ else
+ logger.error("Corrupt sstable " + descriptor + "; skipped", e);
+ }
+
+ public static Collection<SSTableReader> batchOpen(Set<Map.Entry<Descriptor, Set<Component>>> entries,
+ final Set<DecoratedKey> savedKeys,
+ final DataTracker tracker,
+ final CFMetaData metadata,
+ final IPartitioner partitioner)
+ {
+ final Collection<SSTableReader> sstables = new LinkedBlockingQueue<SSTableReader>();
+
+ ExecutorService executor = DebuggableThreadPoolExecutor.createWithPoolSize("SSTableBatchOpen", Runtime.getRuntime().availableProcessors());
+ for (final Map.Entry<Descriptor, Set<Component>> entry : entries)
+ {
+ Runnable runnable = new Runnable()
+ {
+ public void run()
+ {
+ SSTableReader sstable;
+ try
+ {
+ sstable = open(entry.getKey(), entry.getValue(), savedKeys, tracker, metadata, partitioner);
+ }
+ catch (IOException ex)
+ {
+ logger.error("Corrupt sstable " + entry + "; skipped", ex);
+ return;
+ }
+ sstables.add(sstable);
+ }
+ };
+ executor.submit(runnable);
+ }
+
+ executor.shutdown();
+ try
+ {
+ executor.awaitTermination(7, TimeUnit.DAYS);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+
+ return sstables;
+
+ }
+
/**
* Open a RowIndexedReader which already has its state initialized (by SSTableWriter).
*/