You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/08/28 18:27:59 UTC

svn commit: r808942 - in /incubator/cassandra/trunk: conf/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/tools/

Author: jbellis
Date: Fri Aug 28 16:27:58 2009
New Revision: 808942

URL: http://svn.apache.org/viewvc?rev=808942&view=rev
Log:
Fixes to make BinaryMemtable useful.  Highlights are configurable threads for [binary]memtable flushing and flushAndShutdown JMX/nodeprobe directive.
patch by Chris Goffinet; reviewed by jbellis for CASSANDRA-337

Modified:
    incubator/cassandra/trunk/conf/storage-conf.xml
    incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java

Modified: incubator/cassandra/trunk/conf/storage-conf.xml
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/conf/storage-conf.xml?rev=808942&r1=808941&r2=808942&view=diff
==============================================================================
--- incubator/cassandra/trunk/conf/storage-conf.xml (original)
+++ incubator/cassandra/trunk/conf/storage-conf.xml Fri Aug 28 16:27:58 2009
@@ -308,4 +308,19 @@
    ~ ten days.
   -->
   <GCGraceSeconds>864000</GCGraceSeconds>
+
+  <!--
+   ~ Number of threads to run when flushing memtables to disk.  Set this to
+   ~ the number of disks you physically have in your machine allocated for DataDirectory * 2. 
+   ~ If you are planning to use the Binary Memtable, its recommended to increase the max threads
+   ~ to maintain a higher quality of service while under load when normal memtables are flushing to disk. 
+  -->
+  <FlushMinThreads>1</FlushMinThreads>
+  <FlushMaxThreads>1</FlushMaxThreads>
+
+  <!--
+   ~ The threshold size in megabytes the binary memtable must grow to, before it's submitted for flushing to disk.
+  -->
+  <BinaryMemtableSizeInMB>256</BinaryMemtableSizeInMB>
+
 </Storage>

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=808942&r1=808941&r2=808942&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Fri Aug 28 16:27:58 2009
@@ -74,6 +74,9 @@
     private static int slicedReadBufferSizeInKB_ = 64;
     private static List<String> tables_ = new ArrayList<String>();
     private static Set<String> applicationColumnFamilies_ = new HashSet<String>();
+    private static int flushMinThreads_ = 1;
+    private static int flushMaxThreads_ = 1;
+    private static int bmtThreshold_ = 256;
 
     // Default descriptive names for introspection. The user can override
     // these choices in the config file. These are not case sensitive.
@@ -271,6 +274,24 @@
                 slicedReadBufferSizeInKB_ = Integer.parseInt(rawSlicedBuffer);
             }
 
+            String rawflushMinThreads = xmlUtils.getNodeValue("/Storage/FlushMinThreads");
+            if (rawflushMinThreads != null)
+            {
+                flushMinThreads_ = Integer.parseInt(rawflushMinThreads);
+            }
+
+            String rawflushMaxThreads = xmlUtils.getNodeValue("/Storage/FlushMaxThreads");
+            if (rawflushMaxThreads != null)
+            {
+                flushMaxThreads_ = Integer.parseInt(rawflushMaxThreads);
+            }
+
+            String bmtThreshold = xmlUtils.getNodeValue("/Storage/BinaryMemtableSizeInMB");
+            if (bmtThreshold != null)
+            {
+                bmtThreshold_ = Integer.parseInt(bmtThreshold);
+            }
+
             /* TCP port on which the storage system listens */
             String port = xmlUtils.getNodeValue("/Storage/StoragePort");
             if ( port != null )
@@ -999,4 +1020,19 @@
     {
         return slicedReadBufferSizeInKB_;
     }
+
+    public static int getFlushMinThreads()
+    {
+        return flushMinThreads_;
+    }
+
+    public static int getFlushMaxThreads()
+    {
+        return flushMaxThreads_;
+    }
+
+    public static int getBMTThreshold()
+    {
+        return bmtThreshold_;
+    }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java?rev=808942&r1=808941&r2=808942&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java Fri Aug 28 16:27:58 2009
@@ -34,11 +34,13 @@
 
 import org.apache.log4j.Logger;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
+import java.util.*;
+import org.apache.cassandra.dht.IPartitioner;
 
 public class BinaryMemtable
 {
     private static Logger logger_ = Logger.getLogger( Memtable.class );
-    private int threshold_ = 512*1024*1024;
+    private int threshold_ = DatabaseDescriptor.getBMTThreshold()*1024*1024;
     private AtomicInteger currentSize_ = new AtomicInteger(0);
 
     /* Table and ColumnFamily name are used to determine the ColumnFamilyStore */
@@ -138,10 +140,31 @@
          * Use the SSTable to write the contents of the TreeMap
          * to disk.
         */
+
+        String path;
+        SSTableWriter writer;
         ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
         List<String> keys = new ArrayList<String>( columnFamilies_.keySet() );
-        SSTableWriter writer = new SSTableWriter(cfStore.getTempSSTablePath(), keys.size(), StorageService.getPartitioner());
-        Collections.sort(keys);
+        /*
+            Adding a lock here so data directories are evenly used. By default currentIndex
+            is incremented, not an AtomicInteger. Let's fix this!
+         */
+        lock_.lock();
+        try
+        {
+            path = cfStore.getTempSSTablePath();
+            writer = new SSTableWriter(path, keys.size(), StorageService.getPartitioner());
+        }
+        finally
+        {
+            lock_.unlock();
+        }
+
+        final IPartitioner partitioner = StorageService.getPartitioner();
+        final Comparator<String> dc = partitioner.getDecoratedKeyComparator();
+        Collections.sort(keys, dc);
+
+
         /* Use this BloomFilter to decide if a key exists in a SSTable */
         for ( String key : keys )
         {           

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=808942&r1=808941&r2=808942&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Fri Aug 28 16:27:58 2009
@@ -38,6 +38,7 @@
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.*;
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.ThreadFactoryImpl;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.marshal.AbstractType;
 
@@ -55,8 +56,8 @@
     private static final int BUFSIZE = 128 * 1024 * 1024;
 
     private static NonBlockingHashMap<String, Set<Memtable>> memtablesPendingFlush = new NonBlockingHashMap<String, Set<Memtable>>();
-    private static ExecutorService flusher_ = new DebuggableThreadPoolExecutor("MEMTABLE-FLUSHER-POOL");
-
+    private static ExecutorService flusher_ = new DebuggableThreadPoolExecutor(DatabaseDescriptor.getFlushMinThreads(), DatabaseDescriptor.getFlushMaxThreads(), Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryImpl("MEMTABLE-FLUSHER-POOL"));
+    
     private final String table_;
     public final String columnFamily_;
     private final boolean isSuper_;
@@ -457,7 +458,7 @@
         assert oldMemtable.isFlushed() || oldMemtable.isClean(); 
     }
 
-    void forceFlushBinary()
+    public void forceFlushBinary()
     {
         submitFlush(binaryMemtable_.get());
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=808942&r1=808941&r2=808942&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Fri Aug 28 16:27:58 2009
@@ -642,11 +642,11 @@
         for (ColumnFamily columnFamily : row.getColumnFamilies())
         {
             Collection<IColumn> columns = columnFamily.getSortedColumns();
-            for(IColumn column : columns)
+            for (IColumn column : columns)
             {
-                ColumnFamilyStore cfStore = columnFamilyStores_.get(column.name());
+                ColumnFamilyStore cfStore = columnFamilyStores_.get(new String(column.name(), "UTF-8"));
                 cfStore.applyBinary(key, column.value());
-        	}
+            }
         }
         row.clear();
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=808942&r1=808941&r2=808942&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Fri Aug 28 16:27:58 2009
@@ -478,14 +478,26 @@
     {
         isStreaming_.set(bVal);
     }
+    public static void flushAndshutdown()
+    {
+        // safely shutdown and send all writes
+        for(Map.Entry<String, TcpConnectionManager> entry : poolTable_.entrySet() )
+        {
+            for(TcpConnection connection: entry.getValue().getConnections())
+            {
+                connection.doPendingWrites();
+            }
+        }
+        shutdown();
+    }
     
     public static void shutdown()
     {
         logger_.info("Shutting down ...");
-        synchronized ( MessagingService.class )
-        {          
-            /* Stop listening on any socket */            
-            for( SelectionKey skey : listenSockets_.values() )
+        synchronized (MessagingService.class)
+        {
+            /* Stop listening on any socket */
+            for (SelectionKey skey : listenSockets_.values())
             {
                 skey.cancel();
                 try
@@ -495,26 +507,25 @@
                 catch (IOException e) {}
             }
             listenSockets_.clear();
-            
-            /* Shutdown the threads in the EventQueue's */            
-            messageDeserializationExecutor_.shutdownNow();            
+
+            /* Shutdown the threads in the EventQueue's */
+            messageDeserializationExecutor_.shutdownNow();
             messageSerializerExecutor_.shutdownNow();
             messageDeserializerExecutor_.shutdownNow();
             streamExecutor_.shutdownNow();
-            
+
             /* shut down the cachetables */
             taskCompletionMap_.shutdown();
-            callbackMap_.shutdown();                        
-                        
+            callbackMap_.shutdown();
+
             /* Interrupt the selector manager thread */
             SelectorManager.getSelectorManager().interrupt();
-            
-            poolTable_.clear();            
-            verbHandlers_.clear();                                    
+
+            poolTable_.clear();
+            verbHandlers_.clear();
             bShutdown_ = true;
         }
-        if (logger_.isDebugEnabled())
-          logger_.debug("Shutdown invocation complete.");
+        logger_.info("Shutdown invocation complete.");
     }
 
     public static void receive(Message message)

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java?rev=808942&r1=808941&r2=808942&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java Fri Aug 28 16:27:58 2009
@@ -387,7 +387,7 @@
          resumeStreaming();        
     }
     
-    void doPendingWrites()
+    public void doPendingWrites()
     {
         synchronized(this)
         {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java?rev=808942&r1=808941&r2=808942&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnectionManager.java Fri Aug 28 16:27:58 2009
@@ -211,4 +211,8 @@
     {
         return allConnections_.contains(connection);
     }
+    List<TcpConnection> getConnections()
+    {
+        return allConnections_;
+    }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=808942&r1=808941&r2=808942&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Fri Aug 28 16:27:58 2009
@@ -778,6 +778,24 @@
             logger_.debug("Cleared out all snapshot directories");
     }
 
+    public void forceTableFlushBinary(String tableName) throws IOException
+    {
+        if (DatabaseDescriptor.getTable(tableName) == null)
+        {
+            throw new IOException("Table " + tableName + "does not exist");
+        }
+
+        Table table = Table.open(tableName);
+        Set<String> columnFamilies = table.getColumnFamilies();
+        for (String columnFamily : columnFamilies)
+        {
+            ColumnFamilyStore cfStore = table.getColumnFamilyStore(columnFamily);
+            logger_.debug("Forcing flush on keyspace " + tableName + " on CF " + columnFamily);
+            cfStore.forceFlushBinary();
+        }
+    }
+
+
     /* End of MBean interface methods */
     
     /**

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=808942&r1=808941&r2=808942&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java Fri Aug 28 16:27:58 2009
@@ -84,4 +84,11 @@
      * Remove all the existing snapshots.
      */
     public void clearSnapshot() throws IOException;
+
+    /**
+     * Flush all binary memtables for a table
+     * @param tableName
+     * @throws IOException
+     */
+    public void forceTableFlushBinary(String tableName) throws IOException;
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=808942&r1=808941&r2=808942&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java Fri Aug 28 16:27:58 2009
@@ -257,7 +257,16 @@
     {
         ssProxy.forceTableCompaction();
     }
-    
+
+    /**
+     * Trigger a binary flush on CFs of a table.
+     */
+    public void forceTableFlushBinary(String tableName) throws IOException
+    {
+        ssProxy.forceTableFlushBinary(tableName);
+    }
+
+
     /**
      * Write a textual representation of the Cassandra ring.
      * 
@@ -517,7 +526,7 @@
     {
         HelpFormatter hf = new HelpFormatter();
         String header = String.format(
-                "%nAvailable commands: ring, cluster, info, cleanup, compact, cfstats, snapshot [name], clearsnapshot, bootstrap, tpstats");
+                "%nAvailable commands: ring, cluster, info, cleanup, compact, cfstats, snapshot [name], clearsnapshot, bootstrap, tpstats, flush_binary");
         String usage = String.format("java %s -host <arg> <command>%n", NodeProbe.class.getName());
         hf.printHelp(usage, "", options, header);
     }
@@ -609,6 +618,16 @@
         {
             probe.printThreadPoolStats(System.out);
         }
+        else if (cmdName.equals("flush_binary"))
+        {
+            if (probe.getArgs().length < 2)
+            {
+                System.err.println("Missing keyspace argument.");
+                NodeProbe.printUsage();
+                System.exit(1);
+            }
+            probe.forceTableFlushBinary(probe.getArgs()[1]);
+        }
         else
         {
             System.err.println("Unrecognized command: " + cmdName + ".");