You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/05/06 03:55:51 UTC

svn commit: r772026 - in /incubator/cassandra/trunk: conf/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/

Author: jbellis
Date: Wed May  6 01:55:51 2009
New Revision: 772026

URL: http://svn.apache.org/viewvc?rev=772026&view=rev
Log:
allow periodic flushing on a per-CF basis for infrequently-updated CFs (i.e.,
unlikely to reach the memtable size or object count limits often).  this will allow
purging old commitlog segments in a more timely fashion.
patch by Jun Rao; reviewed by jbellis for CASSANDRA-134

Added:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/PeriodicFlushManager.java
Modified:
    incubator/cassandra/trunk/conf/storage-conf.xml
    incubator/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java

Modified: incubator/cassandra/trunk/conf/storage-conf.xml
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/conf/storage-conf.xml?rev=772026&r1=772025&r2=772026&view=diff
==============================================================================
--- incubator/cassandra/trunk/conf/storage-conf.xml (original)
+++ incubator/cassandra/trunk/conf/storage-conf.xml Wed May  6 01:55:51 2009
@@ -7,7 +7,11 @@
     <!-- Tables and ColumnFamilies                                            -->
     <Tables>
         <Table Name="Table1">
-            <ColumnFamily ColumnSort="Name" Name="Standard1"/>
+            <!-- if FlushPeriodInMinutes is configured and positive, it will be
+                 flushed to disk with that period whether it is dirty or not.
+                 This is intended for lightly-used columnfamilies so that they
+                 do not prevent commitlog segments from being purged. -->
+            <ColumnFamily ColumnSort="Name" Name="Standard1" FlushPeriodInMinutes="60"/>
             <ColumnFamily ColumnSort="Name" Name="Standard2"/>
             <ColumnFamily ColumnSort="Time" Name="StandardByTime1"/>
             <ColumnFamily ColumnSort="Time" Name="StandardByTime2"/>

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=772026&r1=772025&r2=772026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Wed May  6 01:55:51 2009
@@ -35,7 +35,8 @@
     public String n_columnKey;
     public String n_columnValue;
     public String n_columnTimestamp;
- 
+    public int    flushPeriodInMinutes = 0; // flush interval, if <=0, no periodic flusher is scheduled
+    
     // a quick and dirty pretty printer for describing the column family...
     public String pretty()
     {
@@ -49,6 +50,7 @@
         
         desc += "Column Family Type: " + columnType + "\n" +
                 "Columns Sorted By: " + indexProperty_ + "\n";
+        desc += "flush period: " + flushPeriodInMinutes + " minutes\n";
         return desc;
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=772026&r1=772025&r2=772026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Wed May  6 01:55:51 2009
@@ -119,7 +119,7 @@
 
     // the path qualified config file (storage-conf.xml) name
     private static String configFileName_;
-
+    
     static
     {
         try
@@ -240,7 +240,6 @@
             if ( doConsistencyCheck != null )
                 doConsistencyCheck_ = Boolean.parseBoolean(doConsistencyCheck);
 
-
             /* read the size at which we should do column indexes */
             String columnIndexSizeInKB = xmlUtils.getNodeValue("/Storage/ColumnIndexSizeInKB");
             if(columnIndexSizeInKB == null)
@@ -367,6 +366,13 @@
                         throw new ConfigurationException("invalid column sort value " + rawColumnIndexType);
                     }
 
+                    // see if flush period is set
+                    String flushPeriodInMinutes = XMLUtils.getAttributeValue(columnFamily, "FlushPeriodInMinutes");
+                    int flushPeriod=0;
+                    if ( flushPeriodInMinutes != null )
+                        flushPeriod = Integer.parseInt(flushPeriodInMinutes);
+
+                    
                     // Parse out user-specified logical names for the various dimensions
                     // of a the column family from the config.
                     String n_superColumnMap = xmlUtils.getNodeValue(xqlCF + "SuperColumnMap");
@@ -413,7 +419,8 @@
                         cfMetaData.n_superColumnKey = n_superColumnKey;
                         cfMetaData.n_superColumnMap = n_superColumnMap;
                     }
-
+                    cfMetaData.flushPeriodInMinutes = flushPeriod;
+                    
                     tableToCFMetaDataMap_.get(tName).put(cName, cfMetaData);
                 }
             }
@@ -609,6 +616,15 @@
         return cfMetaData.columnType;
     }
 
+    public static int getFlushPeriod(String tableName, String columnFamilyName)
+    {
+        CFMetaData cfMetaData = getCFMetaData(tableName, columnFamilyName);
+        
+        if (cfMetaData == null)
+            return 0;
+        return cfMetaData.flushPeriodInMinutes;
+    }
+
     public static boolean isNameSortingEnabled(String cfName)
     {
         String table = getTables().get(0);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=772026&r1=772025&r2=772026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed May  6 01:55:51 2009
@@ -190,6 +190,11 @@
         }
         // TODO this seems unnecessary -- each memtable flush checks to see if it needs to compact, too
         MinorCompactionManager.instance().submitPeriodicCompaction(this);
+        
+        /* submit periodic flusher if required */
+        int flushPeriod = DatabaseDescriptor.getFlushPeriod(table_, columnFamily_);
+        if (flushPeriod > 0)
+            PeriodicFlushManager.instance().submitPeriodicFlusher(this, flushPeriod);
     }
 
     List<String> getAllSSTablesOnDisk()

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=772026&r1=772025&r2=772026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Wed May  6 01:55:51 2009
@@ -207,6 +207,9 @@
     */
     public void forceflush()
     {
+        if (columnFamilies_.isEmpty())
+            return;
+
         try
         {
             enqueueFlush(CommitLog.open(table_).getContext());
@@ -320,14 +323,6 @@
     void flush(CommitLog.CommitLogContext cLogCtx) throws IOException
     {
         ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
-        if ( columnFamilies_.size() == 0 )
-        {
-        	// This should be called even if size is 0
-        	// This is because we should try to delete the useless commitlogs
-        	// even though there is nothing to flush in memtables for a given family like Hints etc.
-            cfStore.onMemtableFlush(cLogCtx);
-            return;
-        }
 
         String directory = DatabaseDescriptor.getDataFileLocation();
         String filename = cfStore.getTempFileName();

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/PeriodicFlushManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/PeriodicFlushManager.java?rev=772026&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/PeriodicFlushManager.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/PeriodicFlushManager.java Wed May  6 01:55:51 2009
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.concurrent.*;
+import org.apache.cassandra.service.IComponentShutdown;
+import org.apache.cassandra.service.StorageService;
+
+/**
+ *  Background flusher that force-flushes a column family periodically.
+ */
+class PeriodicFlushManager implements IComponentShutdown
+{
+    private static Logger logger_ = Logger.getLogger(PeriodicFlushManager.class);
+    private static PeriodicFlushManager instance_;
+    private static Lock lock_ = new ReentrantLock();
+    private ScheduledExecutorService flusher_ = new DebuggableScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("PERIODIC-FLUSHER-POOL"));
+
+    public static PeriodicFlushManager instance()
+    {
+        if ( instance_ == null )
+        {
+            lock_.lock();
+            try
+            {
+                if ( instance_ == null )
+                    instance_ = new PeriodicFlushManager();
+            }
+            finally
+            {
+                lock_.unlock();
+            }
+        }
+        return instance_;
+    }
+
+    public PeriodicFlushManager()
+    {
+        StorageService.instance().registerComponentForShutdown(this);
+    }
+
+    public void shutdown()
+    {
+        flusher_.shutdownNow();
+    }
+
+    public void submitPeriodicFlusher(final ColumnFamilyStore columnFamilyStore, int flushPeriodInMinutes)
+    {        
+        Runnable runnable= new Runnable()
+        {
+            public void run()
+            {
+                columnFamilyStore.forceFlush();
+            }
+        };
+        logger_.info("start periodic flush daemon every " + flushPeriodInMinutes + " minutes for " + columnFamilyStore.columnFamily_);
+        flusher_.scheduleWithFixedDelay(runnable, flushPeriodInMinutes, flushPeriodInMinutes, TimeUnit.MINUTES);       
+    }
+}