You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/08/28 18:27:59 UTC
svn commit: r808942 - in /incubator/cassandra/trunk: conf/
src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/service/
src/java/org/apache/cassandra/tools/
Author: jbellis
Date: Fri Aug 28 16:27:58 2009
New Revision: 808942
URL: http://svn.apache.org/viewvc?rev=808942&view=rev
Log:
Fixes to make BinaryMemtable useful. Highlights are configurable threads for [binary]memtable flushing and flushAndShutdown JMX/nodeprobe directive.
patch by Chris Goffinet; reviewed by jbellis for CASSANDRA-337
Modified:
incubator/cassandra/trunk/conf/storage-conf.xml
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
Modified: incubator/cassandra/trunk/conf/storage-conf.xml
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/conf/storage-conf.xml?rev=808942&r1=808941&r2=808942&view=diff
==============================================================================
--- incubator/cassandra/trunk/conf/storage-conf.xml (original)
+++ incubator/cassandra/trunk/conf/storage-conf.xml Fri Aug 28 16:27:58 2009
@@ -308,4 +308,19 @@
~ ten days.
-->
<GCGraceSeconds>864000</GCGraceSeconds>
+
+ <!--
+ ~ Number of threads to run when flushing memtables to disk. Set this to
+ ~ the number of disks you physically have in your machine allocated for DataDirectory * 2.
+ ~ If you are planning to use the Binary Memtable, its recommended to increase the max threads
+ ~ to maintain a higher quality of service while under load when normal memtables are flushing to disk.
+ -->
+ <FlushMinThreads>1</FlushMinThreads>
+ <FlushMaxThreads>1</FlushMaxThreads>
+
+ <!--
+ ~ The threshold size in megabytes the binary memtable must grow to, before it's submitted for flushing to disk.
+ -->
+ <BinaryMemtableSizeInMB>256</BinaryMemtableSizeInMB>
+
</Storage>
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=808942&r1=808941&r2=808942&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Fri Aug 28 16:27:58 2009
@@ -74,6 +74,9 @@
private static int slicedReadBufferSizeInKB_ = 64;
private static List<String> tables_ = new ArrayList<String>();
private static Set<String> applicationColumnFamilies_ = new HashSet<String>();
+ private static int flushMinThreads_ = 1;
+ private static int flushMaxThreads_ = 1;
+ private static int bmtThreshold_ = 256;
// Default descriptive names for introspection. The user can override
// these choices in the config file. These are not case sensitive.
@@ -271,6 +274,24 @@
slicedReadBufferSizeInKB_ = Integer.parseInt(rawSlicedBuffer);
}
+ String rawflushMinThreads = xmlUtils.getNodeValue("/Storage/FlushMinThreads");
+ if (rawflushMinThreads != null)
+ {
+ flushMinThreads_ = Integer.parseInt(rawflushMinThreads);
+ }
+
+ String rawflushMaxThreads = xmlUtils.getNodeValue("/Storage/FlushMaxThreads");
+ if (rawflushMaxThreads != null)
+ {
+ flushMaxThreads_ = Integer.parseInt(rawflushMaxThreads);
+ }
+
+ String bmtThreshold = xmlUtils.getNodeValue("/Storage/BinaryMemtableSizeInMB");
+ if (bmtThreshold != null)
+ {
+ bmtThreshold_ = Integer.parseInt(bmtThreshold);
+ }
+
/* TCP port on which the storage system listens */
String port = xmlUtils.getNodeValue("/Storage/StoragePort");
if ( port != null )
@@ -999,4 +1020,19 @@
{
return slicedReadBufferSizeInKB_;
}
+
+ public static int getFlushMinThreads()
+ {
+ return flushMinThreads_;
+ }
+
+ public static int getFlushMaxThreads()
+ {
+ return flushMaxThreads_;
+ }
+
+ public static int getBMTThreshold()
+ {
+ return bmtThreshold_;
+ }
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java?rev=808942&r1=808941&r2=808942&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java Fri Aug 28 16:27:58 2009
@@ -34,11 +34,13 @@
import org.apache.log4j.Logger;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
+import java.util.*;
+import org.apache.cassandra.dht.IPartitioner;
public class BinaryMemtable
{
private static Logger logger_ = Logger.getLogger( Memtable.class );
- private int threshold_ = 512*1024*1024;
+ private int threshold_ = DatabaseDescriptor.getBMTThreshold()*1024*1024;
private AtomicInteger currentSize_ = new AtomicInteger(0);
/* Table and ColumnFamily name are used to determine the ColumnFamilyStore */
@@ -138,10 +140,31 @@
* Use the SSTable to write the contents of the TreeMap
* to disk.
*/
+
+ String path;
+ SSTableWriter writer;
ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
List<String> keys = new ArrayList<String>( columnFamilies_.keySet() );
- SSTableWriter writer = new SSTableWriter(cfStore.getTempSSTablePath(), keys.size(), StorageService.getPartitioner());
- Collections.sort(keys);
+ /*
+ Adding a lock here so data directories are evenly used. By default currentIndex
+ is incremented, not an AtomicInteger. Let's fix this!
+ */
+ lock_.lock();
+ try
+ {
+ path = cfStore.getTempSSTablePath();
+ writer = new SSTableWriter(path, keys.size(), StorageService.getPartitioner());
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+
+ final IPartitioner partitioner = StorageService.getPartitioner();
+ final Comparator<String> dc = partitioner.getDecoratedKeyComparator();
+ Collections.sort(keys, dc);
+
+
/* Use this BloomFilter to decide if a key exists in a SSTable */
for ( String key : keys )
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=808942&r1=808941&r2=808942&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Fri Aug 28 16:27:58 2009
@@ -38,6 +38,7 @@
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.*;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.ThreadFactoryImpl;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.marshal.AbstractType;
@@ -55,8 +56,8 @@
private static final int BUFSIZE = 128 * 1024 * 1024;
private static NonBlockingHashMap<String, Set<Memtable>> memtablesPendingFlush = new NonBlockingHashMap<String, Set<Memtable>>();
- private static ExecutorService flusher_ = new DebuggableThreadPoolExecutor("MEMTABLE-FLUSHER-POOL");
-
+ private static ExecutorService flusher_ = new DebuggableThreadPoolExecutor(DatabaseDescriptor.getFlushMinThreads(), DatabaseDescriptor.getFlushMaxThreads(), Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryImpl("MEMTABLE-FLUSHER-POOL"));
+
private final String table_;
public final String columnFamily_;
private final boolean isSuper_;
@@ -457,7 +458,7 @@
assert oldMemtable.isFlushed() || oldMemtable.isClean();
}
- void forceFlushBinary()
+ public void forceFlushBinary()
{
submitFlush(binaryMemtable_.get());
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=808942&r1=808941&r2=808942&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Fri Aug 28 16:27:58 2009
@@ -642,11 +642,11 @@
for (ColumnFamily columnFamily : row.getColumnFamilies())
{
Collection<IColumn> columns = columnFamily.getSortedColumns();
- for(IColumn column : columns)
+ for (IColumn column : columns)
{
- ColumnFamilyStore cfStore = columnFamilyStores_.get(column.name());
+ ColumnFamilyStore cfStore = columnFamilyStores_.get(new String(column.name(), "UTF-8"));
cfStore.applyBinary(key, column.value());
- }
+ }
}
row.clear();
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=808942&r1=808941&r2=808942&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Fri Aug 28 16:27:58 2009
@@ -478,14 +478,26 @@
{
isStreaming_.set(bVal);
}
+ public static void flushAndshutdown()
+ {
+ // safely shutdown and send all writes
+ for(Map.Entry<String, TcpConnectionManager> entry : poolTable_.entrySet() )
+ {
+ for(TcpConnection connection: entry.getValue().getConnections())
+ {
+ connection.doPendingWrites();
+ }
+ }
+ shutdown();
+ }
public static void shutdown()
{
logger_.info("Shutting down ...");
- synchronized ( MessagingService.class )
- {
- /* Stop listening on any socket */
- for( SelectionKey skey : listenSockets_.values() )
+ synchronized (MessagingService.class)
+ {
+ /* Stop listening on any socket */
+ for (SelectionKey skey : listenSockets_.values())
{
skey.cancel();
try
@@ -495,26 +507,25 @@
catch (IOException e) {}
}
listenSockets_.clear();
-
- /* Shutdown the threads in the EventQueue's */
- messageDeserializationExecutor_.shutdownNow();
+
+ /* Shutdown the threads in the EventQueue's */
+ messageDeserializationExecutor_.shutdownNow();
messageSerializerExecutor_.shutdownNow();
messageDeserializerExecutor_.shutdownNow();
streamExecutor_.shutdownNow();
-
+
/* shut down the cachetables */
taskCompletionMap_.shutdown();
- callbackMap_.shutdown();
-
+ callbackMap_.shutdown();
+
/* Interrupt the selector manager thread */
SelectorManager.getSelectorManager().interrupt();
-
- poolTable_.clear();
- verbHandlers_.clear();
+
+ poolTable_.clear();
+ verbHandlers_.clear();
bShutdown_ = true;
}
- if (logger_.isDebugEnabled())
- logger_.debug("Shutdown invocation complete.");
+ logger_.info("Shutdown invocation complete.");
}
public static void receive(Message message)
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java?rev=808942&r1=808941&r2=808942&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java Fri Aug 28 16:27:58 2009
@@ -387,7 +387,7 @@
resumeStreaming();
}
- void doPendingWrites()
+ public void doPendingWrites()
{
synchronized(this)
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java?rev=808942&r1=808941&r2=808942&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java Fri Aug 28 16:27:58 2009
@@ -211,4 +211,8 @@
{
return allConnections_.contains(connection);
}
+ List<TcpConnection> getConnections()
+ {
+ return allConnections_;
+ }
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=808942&r1=808941&r2=808942&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Fri Aug 28 16:27:58 2009
@@ -778,6 +778,24 @@
logger_.debug("Cleared out all snapshot directories");
}
+ public void forceTableFlushBinary(String tableName) throws IOException
+ {
+ if (DatabaseDescriptor.getTable(tableName) == null)
+ {
+ throw new IOException("Table " + tableName + "does not exist");
+ }
+
+ Table table = Table.open(tableName);
+ Set<String> columnFamilies = table.getColumnFamilies();
+ for (String columnFamily : columnFamilies)
+ {
+ ColumnFamilyStore cfStore = table.getColumnFamilyStore(columnFamily);
+ logger_.debug("Forcing flush on keyspace " + tableName + " on CF " + columnFamily);
+ cfStore.forceFlushBinary();
+ }
+ }
+
+
/* End of MBean interface methods */
/**
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=808942&r1=808941&r2=808942&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java Fri Aug 28 16:27:58 2009
@@ -84,4 +84,11 @@
* Remove all the existing snapshots.
*/
public void clearSnapshot() throws IOException;
+
+ /**
+ * Flush all binary memtables for a table
+ * @param tableName
+ * @throws IOException
+ */
+ public void forceTableFlushBinary(String tableName) throws IOException;
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=808942&r1=808941&r2=808942&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java Fri Aug 28 16:27:58 2009
@@ -257,7 +257,16 @@
{
ssProxy.forceTableCompaction();
}
-
+
+ /**
+ * Trigger a binary flush on CFs of a table.
+ */
+ public void forceTableFlushBinary(String tableName) throws IOException
+ {
+ ssProxy.forceTableFlushBinary(tableName);
+ }
+
+
/**
* Write a textual representation of the Cassandra ring.
*
@@ -517,7 +526,7 @@
{
HelpFormatter hf = new HelpFormatter();
String header = String.format(
- "%nAvailable commands: ring, cluster, info, cleanup, compact, cfstats, snapshot [name], clearsnapshot, bootstrap, tpstats");
+ "%nAvailable commands: ring, cluster, info, cleanup, compact, cfstats, snapshot [name], clearsnapshot, bootstrap, tpstats, flush_binary");
String usage = String.format("java %s -host <arg> <command>%n", NodeProbe.class.getName());
hf.printHelp(usage, "", options, header);
}
@@ -609,6 +618,16 @@
{
probe.printThreadPoolStats(System.out);
}
+ else if (cmdName.equals("flush_binary"))
+ {
+ if (probe.getArgs().length < 2)
+ {
+ System.err.println("Missing keyspace argument.");
+ NodeProbe.printUsage();
+ System.exit(1);
+ }
+ probe.forceTableFlushBinary(probe.getArgs()[1]);
+ }
else
{
System.err.println("Unrecognized command: " + cmdName + ".");