You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/05/06 03:55:51 UTC
svn commit: r772026 - in /incubator/cassandra/trunk: conf/
src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/
Author: jbellis
Date: Wed May 6 01:55:51 2009
New Revision: 772026
URL: http://svn.apache.org/viewvc?rev=772026&view=rev
Log:
allow periodic flushing on a per-CF basis for infrequently-updated CFs (i.e.,
unlikely to reach the memtable size or object count limits often). this will allow
purging old commitlog segments in a more timely fashion.
patch by Jun Rao; reviewed by jbellis for CASSANDRA-134
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/PeriodicFlushManager.java
Modified:
incubator/cassandra/trunk/conf/storage-conf.xml
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
Modified: incubator/cassandra/trunk/conf/storage-conf.xml
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/conf/storage-conf.xml?rev=772026&r1=772025&r2=772026&view=diff
==============================================================================
--- incubator/cassandra/trunk/conf/storage-conf.xml (original)
+++ incubator/cassandra/trunk/conf/storage-conf.xml Wed May 6 01:55:51 2009
@@ -7,7 +7,11 @@
<!-- Tables and ColumnFamilies -->
<Tables>
<Table Name="Table1">
- <ColumnFamily ColumnSort="Name" Name="Standard1"/>
+ <!-- if FlushPeriodInMinutes is configured and positive, it will be
+ flushed to disk with that period whether it is dirty or not.
+ This is intended for lightly-used columnfamilies so that they
+ do not prevent commitlog segments from being purged. -->
+ <ColumnFamily ColumnSort="Name" Name="Standard1" FlushPeriodInMinutes="60"/>
<ColumnFamily ColumnSort="Name" Name="Standard2"/>
<ColumnFamily ColumnSort="Time" Name="StandardByTime1"/>
<ColumnFamily ColumnSort="Time" Name="StandardByTime2"/>
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=772026&r1=772025&r2=772026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Wed May 6 01:55:51 2009
@@ -35,7 +35,8 @@
public String n_columnKey;
public String n_columnValue;
public String n_columnTimestamp;
-
+ public int flushPeriodInMinutes = 0; // flush interval, if <=0, no periodic flusher is scheduled
+
// a quick and dirty pretty printer for describing the column family...
public String pretty()
{
@@ -49,6 +50,7 @@
desc += "Column Family Type: " + columnType + "\n" +
"Columns Sorted By: " + indexProperty_ + "\n";
+ desc += "flush period: " + flushPeriodInMinutes + " minutes\n";
return desc;
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=772026&r1=772025&r2=772026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Wed May 6 01:55:51 2009
@@ -119,7 +119,7 @@
// the path qualified config file (storage-conf.xml) name
private static String configFileName_;
-
+
static
{
try
@@ -240,7 +240,6 @@
if ( doConsistencyCheck != null )
doConsistencyCheck_ = Boolean.parseBoolean(doConsistencyCheck);
-
/* read the size at which we should do column indexes */
String columnIndexSizeInKB = xmlUtils.getNodeValue("/Storage/ColumnIndexSizeInKB");
if(columnIndexSizeInKB == null)
@@ -367,6 +366,13 @@
throw new ConfigurationException("invalid column sort value " + rawColumnIndexType);
}
+ // see if flush period is set
+ String flushPeriodInMinutes = XMLUtils.getAttributeValue(columnFamily, "FlushPeriodInMinutes");
+ int flushPeriod=0;
+ if ( flushPeriodInMinutes != null )
+ flushPeriod = Integer.parseInt(flushPeriodInMinutes);
+
+
// Parse out user-specified logical names for the various dimensions
// of a the column family from the config.
String n_superColumnMap = xmlUtils.getNodeValue(xqlCF + "SuperColumnMap");
@@ -413,7 +419,8 @@
cfMetaData.n_superColumnKey = n_superColumnKey;
cfMetaData.n_superColumnMap = n_superColumnMap;
}
-
+ cfMetaData.flushPeriodInMinutes = flushPeriod;
+
tableToCFMetaDataMap_.get(tName).put(cName, cfMetaData);
}
}
@@ -609,6 +616,15 @@
return cfMetaData.columnType;
}
+ public static int getFlushPeriod(String tableName, String columnFamilyName)
+ {
+ CFMetaData cfMetaData = getCFMetaData(tableName, columnFamilyName);
+
+ if (cfMetaData == null)
+ return 0;
+ return cfMetaData.flushPeriodInMinutes;
+ }
+
public static boolean isNameSortingEnabled(String cfName)
{
String table = getTables().get(0);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=772026&r1=772025&r2=772026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed May 6 01:55:51 2009
@@ -190,6 +190,11 @@
}
// TODO this seems unnecessary -- each memtable flush checks to see if it needs to compact, too
MinorCompactionManager.instance().submitPeriodicCompaction(this);
+
+ /* submit periodic flusher if required */
+ int flushPeriod = DatabaseDescriptor.getFlushPeriod(table_, columnFamily_);
+ if (flushPeriod > 0)
+ PeriodicFlushManager.instance().submitPeriodicFlusher(this, flushPeriod);
}
List<String> getAllSSTablesOnDisk()
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=772026&r1=772025&r2=772026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Wed May 6 01:55:51 2009
@@ -207,6 +207,9 @@
*/
public void forceflush()
{
+ if (columnFamilies_.isEmpty())
+ return;
+
try
{
enqueueFlush(CommitLog.open(table_).getContext());
@@ -320,14 +323,6 @@
void flush(CommitLog.CommitLogContext cLogCtx) throws IOException
{
ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
- if ( columnFamilies_.size() == 0 )
- {
- // This should be called even if size is 0
- // This is because we should try to delete the useless commitlogs
- // even though there is nothing to flush in memtables for a given family like Hints etc.
- cfStore.onMemtableFlush(cLogCtx);
- return;
- }
String directory = DatabaseDescriptor.getDataFileLocation();
String filename = cfStore.getTempFileName();
Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/PeriodicFlushManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/PeriodicFlushManager.java?rev=772026&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/PeriodicFlushManager.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/PeriodicFlushManager.java Wed May 6 01:55:51 2009
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.concurrent.*;
+import org.apache.cassandra.service.IComponentShutdown;
+import org.apache.cassandra.service.StorageService;
+
+/**
+ * Background flusher that force-flushes a column family periodically.
+ */
+class PeriodicFlushManager implements IComponentShutdown
+{
+ private static Logger logger_ = Logger.getLogger(PeriodicFlushManager.class);
+ private static PeriodicFlushManager instance_;
+ private static Lock lock_ = new ReentrantLock();
+ private ScheduledExecutorService flusher_ = new DebuggableScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("PERIODIC-FLUSHER-POOL"));
+
+ public static PeriodicFlushManager instance()
+ {
+ if ( instance_ == null )
+ {
+ lock_.lock();
+ try
+ {
+ if ( instance_ == null )
+ instance_ = new PeriodicFlushManager();
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+ return instance_;
+ }
+
+ public PeriodicFlushManager()
+ {
+ StorageService.instance().registerComponentForShutdown(this);
+ }
+
+ public void shutdown()
+ {
+ flusher_.shutdownNow();
+ }
+
+ public void submitPeriodicFlusher(final ColumnFamilyStore columnFamilyStore, int flushPeriodInMinutes)
+ {
+ Runnable runnable= new Runnable()
+ {
+ public void run()
+ {
+ columnFamilyStore.forceFlush();
+ }
+ };
+ logger_.info("start periodic flush daemon every " + flushPeriodInMinutes + " minutes for " + columnFamilyStore.columnFamily_);
+ flusher_.scheduleWithFixedDelay(runnable, flushPeriodInMinutes, flushPeriodInMinutes, TimeUnit.MINUTES);
+ }
+}