You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/03/27 17:35:34 UTC
svn commit: r759209 - in
/incubator/cassandra/trunk/src/org/apache/cassandra/db:
ColumnFamilyStore.java HintedHandOffManager.java Memtable.java
MemtableManager.java Table.java
Author: jbellis
Date: Fri Mar 27 16:35:33 2009
New Revision: 759209
URL: http://svn.apache.org/viewvc?rev=759209&view=rev
Log:
add SuperColumn support to forceFlush. split out recovery flushing into flushOnRecovery.
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/HintedHandOffManager.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableManager.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java?rev=759209&r1=759208&r2=759209&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java Fri Mar 27 16:35:33 2009
@@ -395,11 +395,11 @@
binaryMemtable_.get().put(key, buffer);
}
- void forceFlush(boolean fRecovery) throws IOException
+ void forceFlush() throws IOException
{
//MemtableManager.instance().submit(getColumnFamilyName(), memtable_.get() , CommitLog.CommitLogContext.NULL);
//memtable_.get().flush(true, CommitLog.CommitLogContext.NULL);
- memtable_.get().forceflush(this, fRecovery);
+ memtable_.get().forceflush(this);
}
void forceFlushBinary() throws IOException
@@ -1512,4 +1512,14 @@
+ totalBytesWritten + " Total keys read ..." + totalkeysRead);
return;
}
+
+ public boolean isSuper()
+ {
+ return DatabaseDescriptor.getColumnType(getColumnFamilyName()).equals("Super");
+ }
+
+ public void flushMemtableOnRecovery() throws IOException
+ {
+ memtable_.get().flushOnRecovery();
+ }
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/HintedHandOffManager.java?rev=759209&r1=759208&r2=759209&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/HintedHandOffManager.java Fri Mar 27 16:35:33 2009
@@ -142,7 +142,7 @@
if(hintedColumnFamily == null)
{
// Force flush now
- columnFamilyStore_.forceFlush(false);
+ columnFamilyStore_.forceFlush();
return;
}
Collection<IColumn> keys = hintedColumnFamily.getAllColumns();
@@ -177,7 +177,7 @@
}
}
// Force flush now
- columnFamilyStore_.forceFlush(false);
+ columnFamilyStore_.forceFlush();
// Now do a major compaction
columnFamilyStore_.forceCompaction(null, null, 0, null);
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java?rev=759209&r1=759208&r2=759209&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java Fri Mar 27 16:35:33 2009
@@ -20,7 +20,12 @@
import java.io.FileOutputStream;
import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -124,7 +129,7 @@
key_ = key;
columnFamilyName_ = cfName;
}
-
+
Getter(String key, String cfName, IFilter filter)
{
this(key, cfName);
@@ -133,7 +138,7 @@
public ColumnFamily call()
{
- ColumnFamily cf = getLocalCopy(key_, columnFamilyName_, filter_);
+ ColumnFamily cf = getLocalCopy(key_, columnFamilyName_, filter_);
return cf;
}
}
@@ -179,7 +184,7 @@
}
/**
- * Compares two Memtable based on creation time.
+ * Compares two Memtable based on creation time.
* @param rhs
* @return
*/
@@ -283,27 +288,30 @@
/*
* This version is used to switch memtable and force flush.
*/
- void forceflush(ColumnFamilyStore cfStore, boolean fRecovery) throws IOException
+ public void forceflush(ColumnFamilyStore cfStore) throws IOException
{
- if(!fRecovery)
+ RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), flushKey_);
+
+ try
{
- RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), flushKey_);
- try
- {
- rm.add(cfStore.columnFamily_ + ":Column","0".getBytes());
- rm.apply();
- }
- catch(ColumnFamilyNotDefinedException ex)
- {
- logger_.debug(LogUtil.throwableToString(ex));
- }
+ if (cfStore.isSuper())
+ {
+ rm.add(cfStore.getColumnFamilyName() + ":SC1:Column", "0".getBytes(), 0);
+ } else {
+ rm.add(cfStore.getColumnFamilyName() + ":Column", "0".getBytes(), 0);
+ }
+ rm.apply();
}
- else
+ catch(ColumnFamilyNotDefinedException ex)
{
- flush(CommitLog.CommitLogContext.NULL);
+ logger_.debug(LogUtil.throwableToString(ex));
}
}
+ void flushOnRecovery() throws IOException {
+ flush(CommitLog.CommitLogContext.NULL);
+ }
+
private void resolve(String key, ColumnFamily columnFamily)
{
ColumnFamily oldCf = columnFamilies_.get(key);
@@ -397,7 +405,7 @@
}
return cf;
}
-
+
ColumnFamily get(String key, String cfName, IFilter filter)
{
printExecutorStats();
@@ -431,10 +439,6 @@
apartments_.get(cfName_).submit(deleter);
}
- /*
- * param recoveryMode - indicates if this was invoked during
- * recovery.
- */
void flush(CommitLog.CommitLogContext cLogCtx) throws IOException
{
ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
@@ -451,18 +455,18 @@
String directory = DatabaseDescriptor.getDataFileLocation();
String filename = cfStore.getNextFileName();
SSTable ssTable = new SSTable(directory, filename, pType);
- switch (pType)
+ switch (pType)
{
case OPHF:
flushForOrderPreservingPartitioner(ssTable, cfStore, cLogCtx);
break;
-
+
default:
flushForRandomPartitioner(ssTable, cfStore, cLogCtx);
break;
- }
+ }
}
-
+
private void flushForRandomPartitioner(SSTable ssTable, ColumnFamilyStore cfStore, CommitLog.CommitLogContext cLogCtx) throws IOException
{
/* List of primary keys in sorted order */
@@ -489,7 +493,7 @@
cfStore.storeLocation( ssTable.getDataFileLocation(), bf );
buffer.close();
}
-
+
private void flushForOrderPreservingPartitioner(SSTable ssTable, ColumnFamilyStore cfStore, CommitLog.CommitLogContext cLogCtx) throws IOException
{
List<String> keys = new ArrayList<String>( columnFamilies_.keySet() );
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableManager.java?rev=759209&r1=759208&r2=759209&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableManager.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableManager.java Fri Mar 27 16:35:33 2009
@@ -27,10 +27,8 @@
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.ThreadFactoryImpl;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.utils.LogUtil;
import org.apache.log4j.Logger;
-import org.apache.cassandra.utils.*;
/**
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java?rev=759209&r1=759208&r2=759209&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java Fri Mar 27 16:35:33 2009
@@ -847,10 +847,15 @@
Set<String> cfNames = columnFamilyStores_.keySet();
for ( String cfName : cfNames )
{
- columnFamilyStores_.get(cfName).forceFlush(fRecovery);
+ if (fRecovery) {
+ columnFamilyStores_.get(cfName).flushMemtableOnRecovery();
+ } else {
+ columnFamilyStores_.get(cfName).forceFlush();
+ }
}
}
+
void delete(Row row) throws IOException
{
String key = row.key();
@@ -895,7 +900,7 @@
}
else if(column.timestamp() == 3)
{
- cfStore.forceFlush(false);
+ cfStore.forceFlush();
}
else if(column.timestamp() == 4)
{