You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/03/27 17:39:18 UTC
svn commit: r759223 - in
/incubator/cassandra/trunk/src/org/apache/cassandra: db/ service/
Author: jbellis
Date: Fri Mar 27 16:39:17 2009
New Revision: 759223
URL: http://svn.apache.org/viewvc?rev=759223&view=rev
Log:
finish remove support. Split CFS.resolve() into resolve(), which combines ColumnFamilies, and removeDeleted(), which takes a single ColumnFamily and returns a new one with deleted IColumns removed. Keep deletion information around until removeDeleted is called so that deletion information can properly supress older IColumns.
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/HintedHandOffManager.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutationVerbHandler.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java
incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java
incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java?rev=759223&r1=759222&r2=759223&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java Fri Mar 27 16:39:17 2009
@@ -18,15 +18,27 @@
package org.apache.cassandra.db;
-import java.io.*;
+import java.io.File;
+import java.io.IOException;
import java.math.BigInteger;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.StringTokenizer;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.log4j.Logger;
+
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.io.DataInputBuffer;
@@ -40,7 +52,6 @@
import org.apache.cassandra.utils.BloomFilter;
import org.apache.cassandra.utils.FileUtils;
import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
/**
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -431,66 +442,46 @@
binaryMemtable_.get().put(key, buffer);
}
+ public ColumnFamily getColumnFamily(String key, String columnFamilyColumn, IFilter filter) throws IOException
+ {
+ List<ColumnFamily> columnFamilies = getColumnFamilies(key, columnFamilyColumn, filter);
+ return resolveAndRemoveDeleted(columnFamilies);
+ }
+
/**
*
* Get the column family in the most efficient order.
* 1. Memtable
* 2. Sorted list of files
*/
- public ColumnFamily getColumnFamily(String key, String cf, IFilter filter) throws IOException
+ List<ColumnFamily> getColumnFamilies(String key, String columnFamilyColumn, IFilter filter) throws IOException
{
- List<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>();
- ColumnFamily columnFamily = null;
- long start = System.currentTimeMillis();
- /* Get the ColumnFamily from Memtable */
- getColumnFamilyFromCurrentMemtable(key, cf, filter, columnFamilies);
- if(columnFamilies.size() != 0)
+ List<ColumnFamily> columnFamilies = getMemoryColumnFamilies(key, columnFamilyColumn, filter);
+ if (columnFamilies.size() == 0 || !filter.isDone())
{
- if(filter.isDone())
- return columnFamilies.get(0);
+ long start = System.currentTimeMillis();
+ getColumnFamilyFromDisk(key, columnFamilyColumn, columnFamilies, filter);
+ logger_.debug("DISK TIME: " + (System.currentTimeMillis() - start) + " ms.");
}
- /* Check if MemtableManager has any historical information */
- MemtableManager.instance().getColumnFamily(key, columnFamily_, cf, filter, columnFamilies);
- if(columnFamilies.size() != 0)
- {
- columnFamily = resolve(columnFamilies);
- if(filter.isDone())
- return columnFamily;
- columnFamilies.clear();
- columnFamilies.add(columnFamily);
- }
- getColumnFamilyFromDisk(key, cf, columnFamilies, filter);
- logger_.debug("DISK TIME: " + (System.currentTimeMillis() - start)
- + " ms.");
- columnFamily = resolve(columnFamilies);
-
- return columnFamily;
+ return columnFamilies;
}
-
- public ColumnFamily getColumnFamilyFromMemory(String key, String cf, IFilter filter)
+
+ private List<ColumnFamily> getMemoryColumnFamilies(String key, String columnFamilyColumn, IFilter filter)
{
List<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>();
- ColumnFamily columnFamily = null;
- long start = System.currentTimeMillis();
/* Get the ColumnFamily from Memtable */
- getColumnFamilyFromCurrentMemtable(key, cf, filter, columnFamilies);
- if(columnFamilies.size() != 0)
+ getColumnFamilyFromCurrentMemtable(key, columnFamilyColumn, filter, columnFamilies);
+ if (columnFamilies.size() == 0 || !filter.isDone())
{
- if(filter.isDone())
- return columnFamilies.get(0);
- }
- /* Check if MemtableManager has any historical information */
- MemtableManager.instance().getColumnFamily(key, columnFamily_, cf, filter, columnFamilies);
- if(columnFamilies.size() != 0)
- {
- columnFamily = resolve(columnFamilies);
- if(filter.isDone())
- return columnFamily;
- columnFamilies.clear();
- columnFamilies.add(columnFamily);
+ /* Check if MemtableManager has any historical information */
+ MemtableManager.instance().getColumnFamily(key, columnFamily_, columnFamilyColumn, filter, columnFamilies);
}
- columnFamily = resolve(columnFamilies);
- return columnFamily;
+ return columnFamilies;
+ }
+
+ public ColumnFamily getColumnFamilyFromMemory(String key, String columnFamilyColumn, IFilter filter)
+ {
+ return resolveAndRemoveDeleted(getMemoryColumnFamilies(key, columnFamilyColumn, filter));
}
/**
@@ -530,33 +521,14 @@
long start = System.currentTimeMillis();
if (columnFamily != null)
{
- /*
- * TODO
- * By using the filter before removing deleted columns
- * we have a efficient implementation of timefilter
- * but for count filter this can return wrong results
- * we need to take care of that later.
- */
- /* suppress columns marked for delete */
- Map<String, IColumn> columns = columnFamily.getColumns();
- Set<String> cNames = columns.keySet();
-
- for (String cName : cNames)
- {
- IColumn column = columns.get(cName);
- if (column.isMarkedForDelete())
- columns.remove(cName);
- }
columnFamilies.add(columnFamily);
if(filter.isDone())
{
break;
}
}
- logger_.debug("DISK Data structure population TIME: " + (System.currentTimeMillis() - start)
- + " ms.");
+ logger_.debug("DISK Data structure population TIME: " + (System.currentTimeMillis() - start) + " ms.");
}
- files.clear();
}
@@ -570,12 +542,11 @@
if (bufIn.getLength() == 0)
return null;
start = System.currentTimeMillis();
- ColumnFamily columnFamily = null;
- columnFamily = ColumnFamily.serializer().deserialize(bufIn, cf, filter);
+ ColumnFamily columnFamily = ColumnFamily.serializer().deserialize(bufIn, cf, filter);
logger_.debug("DISK Deserialize TIME: " + (System.currentTimeMillis() - start) + " ms.");
if (columnFamily == null)
- return columnFamily;
- return (!columnFamily.isMarkedForDelete()) ? columnFamily : null;
+ return null;
+ return columnFamily;
}
private void getColumnFamilyFromCurrentMemtable(String key, String cf, IFilter filter, List<ColumnFamily> columnFamilies)
@@ -584,20 +555,66 @@
ColumnFamily columnFamily = memtable_.get().get(key, cf, filter);
if (columnFamily != null)
{
- if (!columnFamily.isMarkedForDelete())
- columnFamilies.add(columnFamily);
+ columnFamilies.add(columnFamily);
}
}
- private ColumnFamily resolve(List<ColumnFamily> columnFamilies)
+ /** merge all columnFamilies into a single instance, with only the newest versions of columns preserved. */
+ static ColumnFamily resolve(List<ColumnFamily> columnFamilies)
{
int size = columnFamilies.size();
if (size == 0)
- return null;
- ColumnFamily cf = columnFamilies.get(0);
- for ( int i = 1; i < size ; ++i )
- {
- cf.addColumns(columnFamilies.get(i));
+ return null;
+
+ // start from nothing so that we don't include potential deleted columns from the first instance
+ String cfname = columnFamilies.get(0).name();
+ ColumnFamily cf = new ColumnFamily(cfname);
+
+ // merge
+ for (ColumnFamily cf2 : columnFamilies)
+ {
+ assert cf.name().equals(cf2.name());
+ cf.addColumns(cf2);
+ cf.delete(Math.max(cf.getMarkedForDeleteAt(), cf2.getMarkedForDeleteAt()));
+ }
+ return cf;
+ }
+
+ /** like resolve, but leaves the resolved CF as the only item in the list */
+ private static void merge(List<ColumnFamily> columnFamilies)
+ {
+ ColumnFamily cf = resolve(columnFamilies);
+ columnFamilies.clear();
+ columnFamilies.add(cf);
+ }
+
+ private static ColumnFamily resolveAndRemoveDeleted(List<ColumnFamily> columnFamilies) {
+ ColumnFamily cf = resolve(columnFamilies);
+ return removeDeleted(cf);
+ }
+
+ static ColumnFamily removeDeleted(ColumnFamily cf) {
+ if (cf == null) {
+ return null;
+ }
+ for (String cname : new ArrayList<String>(cf.getColumns().keySet())) {
+ IColumn c = cf.getColumns().get(cname);
+ if (c instanceof SuperColumn) {
+ long min_timestamp = Math.max(c.getMarkedForDeleteAt(), cf.getMarkedForDeleteAt());
+ // don't operate directly on the supercolumn, it could be the one in the memtable
+ cf.remove(cname);
+ IColumn sc = new SuperColumn(cname);
+ for (IColumn subColumn : c.getSubColumns()) {
+ if (!subColumn.isMarkedForDelete() && subColumn.timestamp() >= min_timestamp) {
+ sc.addColumn(subColumn.name(), subColumn);
+ }
+ }
+ if (sc.getSubColumns().size() > 0) {
+ cf.addColumn(sc);
+ }
+ } else if (c.isMarkedForDelete() || c.timestamp() < cf.getMarkedForDeleteAt()) {
+ cf.remove(cname);
+ }
}
return cf;
}
@@ -610,8 +627,7 @@
*/
void applyNow(String key, ColumnFamily columnFamily) throws IOException
{
- if (!columnFamily.isMarkedForDelete())
- memtable_.get().putOnRecovery(key, columnFamily);
+ memtable_.get().putOnRecovery(key, columnFamily);
}
/*
@@ -1125,14 +1141,7 @@
if(columnFamilies.size() > 1)
{
// Now merge the 2 column families
- columnFamily = resolve(columnFamilies);
- columnFamilies.clear();
- if( columnFamily != null)
- {
- // add the merged columnfamily back to the list
- columnFamilies.add(columnFamily);
- }
-
+ merge(columnFamilies);
}
// deserialize into column families
columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.bufIn_));
@@ -1144,7 +1153,7 @@
}
}
// Now after merging all crap append to the sstable
- columnFamily = resolve(columnFamilies);
+ columnFamily = resolveAndRemoveDeleted(columnFamilies);
columnFamilies.clear();
if( columnFamily != null )
{
@@ -1375,15 +1384,7 @@
// We want to add only 2 and resolve them right there in order to save on memory footprint
if(columnFamilies.size() > 1)
{
- // Now merge the 2 column families
- columnFamily = resolve(columnFamilies);
- columnFamilies.clear();
- if( columnFamily != null)
- {
- // add the merged columnfamily back to the list
- columnFamilies.add(columnFamily);
- }
-
+ merge(columnFamilies);
}
// deserialize into column families
columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.bufIn_));
@@ -1394,7 +1395,7 @@
}
}
// Now after merging all crap append to the sstable
- columnFamily = resolve(columnFamilies);
+ columnFamily = resolveAndRemoveDeleted(columnFamilies);
columnFamilies.clear();
if( columnFamily != null )
{
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/HintedHandOffManager.java?rev=759223&r1=759222&r2=759223&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/HintedHandOffManager.java Fri Mar 27 16:39:17 2009
@@ -24,6 +24,8 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.log4j.Logger;
+
import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
import org.apache.cassandra.concurrent.ThreadFactoryImpl;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -32,11 +34,7 @@
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.IComponentShutdown;
-import org.apache.cassandra.service.IResponseResolver;
-import org.apache.cassandra.service.QuorumResponseHandler;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.WriteResponseResolver;
-import org.apache.log4j.Logger;
/**
@@ -110,14 +108,14 @@
private void deleteEndPoint(String endpointAddress, String key) throws Exception
{
RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), key_);
- rm.delete(Table.hints_ + ":" + key + ":" + endpointAddress);
+ rm.delete(Table.hints_ + ":" + key + ":" + endpointAddress, System.currentTimeMillis());
rm.apply();
}
private void deleteKey(String key) throws Exception
{
RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), key_);
- rm.delete(Table.hints_ + ":" + key);
+ rm.delete(Table.hints_ + ":" + key, System.currentTimeMillis());
rm.apply();
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java?rev=759223&r1=759222&r2=759223&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java Fri Mar 27 16:39:17 2009
@@ -307,6 +307,7 @@
int newObjectCount = oldCf.getColumnCount();
resolveSize(oldSize, newSize);
resolveCount(oldObjectCount, newObjectCount);
+ oldCf.delete(Math.max(oldCf.getMarkedForDeleteAt(), columnFamily.getMarkedForDeleteAt()));
}
else
{
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java?rev=759223&r1=759222&r2=759223&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java Fri Mar 27 16:39:17 2009
@@ -23,10 +23,15 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.Serializable;
+import java.io.ByteArrayOutputStream;
import org.apache.commons.lang.ArrayUtils;
import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
/**
@@ -34,9 +39,10 @@
*/
public class RowMutation implements Serializable
-{
- private static ICompactSerializer<RowMutation> serializer_;
-
+{
+ private static ICompactSerializer<RowMutation> serializer_;
+ public static final String HINT = "HINT";
+
static
{
serializer_ = new RowMutationSerializer();
@@ -46,23 +52,23 @@
{
return serializer_;
}
-
+
private String table_;
- private String key_;
- protected Map<String, ColumnFamily> modifications_ = new HashMap<String, ColumnFamily>();
+ private String key_;
+ protected Map<String, ColumnFamily> modifications_ = new HashMap<String, ColumnFamily>();
protected Map<String, ColumnFamily> deletions_ = new HashMap<String, ColumnFamily>();
-
+
/* Ctor for JAXB */
private RowMutation()
{
}
-
+
public RowMutation(String table, String key)
{
table_ = table;
key_ = key;
}
-
+
public RowMutation(String table, Row row)
{
table_ = table;
@@ -82,46 +88,46 @@
modifications_ = modifications;
deletions_ = deletions;
}
-
+
public static String[] getColumnAndColumnFamily(String cf)
{
return cf.split(":");
}
-
+
String table()
{
return table_;
}
-
+
public String key()
{
return key_;
}
-
+
void addHints(String hint) throws IOException, ColumnFamilyNotDefinedException
- {
+ {
String cfName = Table.hints_ + ":" + hint;
add(cfName, new byte[0]);
}
-
+
/*
* Specify a column family name and the corresponding column
- * family object.
+ * family object.
* param @ cf - column family name
* param @ columnFamily - the column family.
*/
public void add(String cf, ColumnFamily columnFamily)
- {
+ {
modifications_.put(cf, columnFamily);
}
-
+
/*
* Specify a column name and a corresponding value for
* the column. Column name is specified as <column family>:column.
* This will result in a ColumnFamily associated with
* <column family> as name and a Column with <column>
* as name.
- *
+ *
* param @ cf - column name as <column family>:<column>
* param @ value - value associated with the column
*/
@@ -129,26 +135,26 @@
{
add(cf, value, 0);
}
-
+
/*
* Specify a column name and a corresponding value for
* the column. Column name is specified as <column family>:column.
* This will result in a ColumnFamily associated with
* <column family> as name and a Column with <column>
- * as name. The columan can be further broken up
+ * as name. The columan can be further broken up
* as super column name : columnname in case of super columns
- *
+ *
* param @ cf - column name as <column family>:<column>
* param @ value - value associated with the column
* param @ timestamp - ts associated with this data.
*/
public void add(String cf, byte[] value, long timestamp)
- {
+ {
String[] values = RowMutation.getColumnAndColumnFamily(cf);
-
+
if ( values.length == 0 || values.length == 1 || values.length > 3 )
throw new IllegalArgumentException("Column Family " + cf + " in invalid format. Must be in <column family>:<column> format.");
-
+
ColumnFamily columnFamily = modifications_.get(values[0]);
if( values.length == 2 )
{
@@ -169,72 +175,95 @@
modifications_.put(values[0], columnFamily);
}
- /*
- * Specify a column name to be deleted. Column name is
- * specified as <column family>:column. This will result
- * in a ColumnFamily associated with <column family> as
- * name and perhaps Column with <column> as name being
- * marked as deleted.
- * TODO : Delete is NOT correct as we do not know
- * the CF type so we need to fix that.
- * param @ cf - column name as <column family>:<column>
- */
- public void delete(String columnFamilyColumn)
+ public void delete(String columnFamilyColumn, long timestamp)
{
- throw new UnsupportedOperationException();
+ String[] values = RowMutation.getColumnAndColumnFamily(columnFamilyColumn);
+ String cfName = values[0];
+ if (modifications_.containsKey(cfName))
+ {
+ throw new IllegalArgumentException("ColumnFamily " + cfName + " is already being modified");
+ }
+
+ if (values.length == 0 || values.length > 3)
+ throw new IllegalArgumentException("Column Family " + columnFamilyColumn + " in invalid format. Must be in <column family>:<column> format.");
+
+ ColumnFamily columnFamily = modifications_.get(cfName);
+ if (columnFamily == null)
+ columnFamily = new ColumnFamily(cfName);
+ if (values.length == 2)
+ {
+ columnFamily.addColumn(values[1], ArrayUtils.EMPTY_BYTE_ARRAY, timestamp, true);
+ }
+ else if (values.length == 3)
+ {
+ columnFamily.addColumn(values[1] + ":" + values[2], ArrayUtils.EMPTY_BYTE_ARRAY, timestamp, true);
+ }
+ else
+ {
+ assert values.length == 1;
+ columnFamily.delete(timestamp);
+ }
+ modifications_.put(cfName, columnFamily);
}
+
/*
* This is equivalent to calling commit. Applies the changes to
* to the table that is obtained by calling Table.open().
*/
public void apply() throws IOException, ColumnFamilyNotDefinedException
- {
+ {
Row row = new Row(key_);
- Table table = Table.open(table_);
- Set<String> cfNames = modifications_.keySet();
- for (String cfName : cfNames )
- {
- if ( !table.isValidColumnFamily(cfName) )
- throw new ColumnFamilyNotDefinedException("Column Family " + cfName + " has not been defined.");
- row.addColumnFamily( modifications_.get(cfName) );
- }
- table.apply(row);
+ apply(row);
}
-
- /*
- * This is equivalent to calling commit. Applies the changes to
- * to the table that is obtained by calling Table.open().
+
+ /*
+ * Allows RowMutationVerbHandler to optimize by re-using a single Row object.
*/
- void apply(Row row) throws IOException, ColumnFamilyNotDefinedException
- {
- Table table = Table.open(table_);
- Set<String> cfNames = modifications_.keySet();
- for (String cfName : cfNames )
- {
- if ( !table.isValidColumnFamily(cfName) )
+ void apply(Row emptyRow) throws IOException, ColumnFamilyNotDefinedException
+ {
+ assert emptyRow.getColumnFamilyMap().size() == 0;
+ Table table = Table.open(table_);
+ for (String cfName : modifications_.keySet())
+ {
+ if (!table.isValidColumnFamily(cfName))
throw new ColumnFamilyNotDefinedException("Column Family " + cfName + " has not been defined.");
- row.addColumnFamily( modifications_.get(cfName) );
+ emptyRow.addColumnFamily(modifications_.get(cfName));
}
- table.apply(row);
+ table.apply(emptyRow);
}
-
- /*
+
+ /*
* This is equivalent to calling commit. Applies the changes to
* to the table that is obtained by calling Table.open().
*/
void load(Row row) throws IOException, ColumnFamilyNotDefinedException
- {
- Table table = Table.open(table_);
+ {
+ Table table = Table.open(table_);
Set<String> cfNames = modifications_.keySet();
for (String cfName : cfNames )
- {
+ {
if ( !table.isValidColumnFamily(cfName) )
throw new ColumnFamilyNotDefinedException("Column Family " + cfName + " has not been defined.");
- row.addColumnFamily( modifications_.get(cfName) );
+ row.addColumnFamily( modifications_.get(cfName) );
}
table.load(row);
- }
+ }
+
+ public Message makeRowMutationMessage() throws IOException
+ {
+ return makeRowMutationMessage(StorageService.mutationVerbHandler_);
+ }
+
+ public Message makeRowMutationMessage(String verbHandlerName) throws IOException
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ serializer().serialize(this, dos);
+ EndPoint local = StorageService.getLocalStorageEndPoint();
+ EndPoint from = (local != null) ? local : new EndPoint(FBUtilities.getHostName(), 7000);
+ return new Message(from, StorageService.mutationStage_, verbHandlerName, bos.toByteArray());
+ }
}
class RowMutationSerializer implements ICompactSerializer<RowMutation>
@@ -244,56 +273,56 @@
int size = map.size();
dos.writeInt(size);
if ( size > 0 )
- {
+ {
Set<String> keys = map.keySet();
for( String key : keys )
- {
+ {
dos.writeUTF(key);
ColumnFamily cf = map.get(key);
if ( cf != null )
{
ColumnFamily.serializer().serialize(cf, dos);
- }
+ }
}
}
}
-
+
public void serialize(RowMutation rm, DataOutputStream dos) throws IOException
{
dos.writeUTF(rm.table());
dos.writeUTF(rm.key());
-
+
/* serialize the modifications_ in the mutation */
freezeTheMaps(rm.modifications_, dos);
-
+
/* serialize the deletions_ in the mutation */
freezeTheMaps(rm.deletions_, dos);
}
-
+
private Map<String, ColumnFamily> defreezeTheMaps(DataInputStream dis) throws IOException
{
Map<String, ColumnFamily> map = new HashMap<String, ColumnFamily>();
int size = dis.readInt();
for ( int i = 0; i < size; ++i )
{
- String key = dis.readUTF();
+ String key = dis.readUTF();
ColumnFamily cf = ColumnFamily.serializer().deserialize(dis);
- map.put(key, cf);
+ map.put(key, cf);
}
return map;
}
-
+
public RowMutation deserialize(DataInputStream dis) throws IOException
{
String table = dis.readUTF();
String key = dis.readUTF();
-
+
/* Defreeze the modifications_ map */
Map<String, ColumnFamily> modifications = defreezeTheMaps(dis);
-
+
/* Defreeze the deletions_ map */
Map<String, ColumnFamily> deletions = defreezeTheMaps(dis);
-
+
return new RowMutation(table, key, modifications, deletions);
}
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=759223&r1=759222&r2=759223&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutationVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutationVerbHandler.java Fri Mar 27 16:39:17 2009
@@ -53,7 +53,7 @@
public void doVerb(Message message)
{
- byte[] bytes = (byte[])message.getMessageBody()[0];
+ byte[] bytes = (byte[]) message.getMessageBody()[0];
/* Obtain a Row Mutation Context from TLS */
RowMutationContext rowMutationCtx = tls_.get();
if ( rowMutationCtx == null )
@@ -70,7 +70,7 @@
logger_.debug("Applying " + rm);
/* Check if there were any hints in this message */
- byte[] hintedBytes = message.getHeader(RowMutationMessage.hint_);
+ byte[] hintedBytes = message.getHeader(RowMutation.HINT);
if ( hintedBytes != null && hintedBytes.length > 0 )
{
EndPoint hint = EndPoint.fromBytes(hintedBytes);
@@ -83,6 +83,7 @@
long start = System.currentTimeMillis();
+ rowMutationCtx.row_.clear();
rowMutationCtx.row_.key(rm.key());
rm.apply(rowMutationCtx.row_);
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java?rev=759223&r1=759222&r2=759223&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java Fri Mar 27 16:39:17 2009
@@ -808,22 +808,18 @@
* Once this happens the data associated with the individual column families
* is also written to the column family store's memtable.
*/
- public void apply(Row row) throws IOException
- {
- String key = row.key();
+ void apply(Row row) throws IOException
+ {
/* Add row to the commit log. */
long start = System.currentTimeMillis();
-
CommitLog.CommitLogContext cLogCtx = CommitLog.open(table_).add(row);
- Map<String, ColumnFamily> columnFamilies = row.getColumnFamilyMap();
- Set<String> cNames = columnFamilies.keySet();
- for ( String cName : cNames )
+
+ for (ColumnFamily columnFamily : row.getColumnFamilies())
{
- ColumnFamily columnFamily = columnFamilies.get(cName);
ColumnFamilyStore cfStore = columnFamilyStores_.get(columnFamily.name());
- cfStore.apply( key, columnFamily, cLogCtx);
+ cfStore.apply(row.key(), columnFamily, cLogCtx);
}
- row.clear();
+
long timeTaken = System.currentTimeMillis() - start;
dbAnalyticsSource_.updateWriteStatistics(timeTaken);
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java?rev=759223&r1=759222&r2=759223&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java Fri Mar 27 16:39:17 2009
@@ -576,6 +576,19 @@
throw new UnsupportedOperationException();
}
+ public boolean remove(String tablename, String key, String columnFamily_column, long timestamp, int block_for)
+ {
+ logger_.debug("remove");
+ RowMutation rm = new RowMutation(tablename, key.trim());
+ rm.delete(columnFamily_column, timestamp);
+ if (block_for > 0) {
+ return StorageProxy.insertBlocking(rm);
+ } else {
+ StorageProxy.insert(rm);
+ return true;
+ }
+ }
+
public List<superColumn_t> get_slice_super_by_names(String tablename, String key, String columnFamily, List<String> superColumnNames) throws CassandraException, TException
{
ArrayList<superColumn_t> retlist = new ArrayList<superColumn_t>();
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java?rev=759223&r1=759222&r2=759223&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java Fri Mar 27 16:39:17 2009
@@ -26,6 +26,9 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.lang.StringUtils;
+
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ReadMessage;
import org.apache.cassandra.db.ReadResponseMessage;
@@ -53,28 +56,28 @@
* sent over the wire to N replicas where some of the replicas
* may be hints.
*/
- private static Map<EndPoint, Message> createWriteMessages(RowMutationMessage rmMessage, Map<EndPoint, EndPoint> endpointMap) throws IOException
+ private static Map<EndPoint, Message> createWriteMessages(RowMutation rm, Map<EndPoint, EndPoint> endpointMap) throws IOException
{
- Map<EndPoint, Message> messageMap = new HashMap<EndPoint, Message>();
- Message message = RowMutationMessage.makeRowMutationMessage(rmMessage);
-
- Set<EndPoint> targets = endpointMap.keySet();
- for( EndPoint target : targets )
- {
- EndPoint hint = endpointMap.get(target);
+ Map<EndPoint, Message> messageMap = new HashMap<EndPoint, Message>();
+ Message message = rm.makeRowMutationMessage();
+
+ for (Map.Entry<EndPoint, EndPoint> entry : endpointMap.entrySet())
+ {
+ EndPoint target = entry.getKey();
+ EndPoint hint = entry.getValue();
if ( !target.equals(hint) )
- {
- Message hintedMessage = RowMutationMessage.makeRowMutationMessage(rmMessage);
- hintedMessage.addHeader(RowMutationMessage.hint_, EndPoint.toBytes(hint) );
- logger_.debug("Sending the hint of " + target.getHost() + " to " + hint.getHost());
- messageMap.put(target, hintedMessage);
- }
- else
- {
- messageMap.put(target, message);
- }
- }
- return messageMap;
+ {
+ Message hintedMessage = rm.makeRowMutationMessage();
+ hintedMessage.addHeader(RowMutation.HINT, EndPoint.toBytes(hint) );
+ logger_.debug("Sending the hint of " + target.getHost() + " to " + hint.getHost());
+ messageMap.put(target, hintedMessage);
+ }
+ else
+ {
+ messageMap.put(target, message);
+ }
+ }
+ return messageMap;
}
/**
@@ -82,38 +85,65 @@
* across all replicas. This method will take care
* of the possibility of a replica being down and hint
* the data across to some other replica.
- * @param RowMutation the mutation to be applied
- * across the replicas
+ * @param rm the mutation to be applied across the replicas
*/
public static void insert(RowMutation rm)
- {
+ {
/*
* Get the N nodes from storage service where the data needs to be
* replicated
* Construct a message for write
* Send them asynchronously to the replicas.
*/
+ assert rm.key() != null;
+
+ try
+ {
+ Map<EndPoint, EndPoint> endpointMap = StorageService.instance().getNStorageEndPointMap(rm.key());
+ // TODO: throw a thrift exception if we do not have N nodes
+ Map<EndPoint, Message> messageMap = createWriteMessages(rm, endpointMap);
+ logger_.debug("insert writing to [" + StringUtils.join(messageMap.keySet(), ", ") + "]");
+ for (Map.Entry<EndPoint, Message> entry : messageMap.entrySet())
+ {
+ MessagingService.getMessagingInstance().sendOneWay(entry.getValue(), entry.getKey());
+ }
+ }
+ catch (Exception e)
+ {
+ logger_.error( LogUtil.throwableToString(e) );
+ }
+ return;
+ }
+
+ public static boolean insertBlocking(RowMutation rm)
+ {
+ assert rm.key() != null;
+
try
{
- logger_.debug(" insert");
- Map<EndPoint, EndPoint> endpointMap = StorageService.instance().getNStorageEndPointMap(rm.key());
+ Message message = rm.makeRowMutationMessage();
+
+ IResponseResolver<Boolean> writeResponseResolver = new WriteResponseResolver();
+ QuorumResponseHandler<Boolean> quorumResponseHandler = new QuorumResponseHandler<Boolean>(
+ DatabaseDescriptor.getReplicationFactor(),
+ writeResponseResolver);
+ EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(rm.key());
+ logger_.debug("insertBlocking writing to [" + StringUtils.join(endpoints, ", ") + "]");
// TODO: throw a thrift exception if we do not have N nodes
- RowMutationMessage rmMsg = new RowMutationMessage(rm);
- /* Create the write messages to be sent */
- Map<EndPoint, Message> messageMap = createWriteMessages(rmMsg, endpointMap);
- Set<EndPoint> endpoints = messageMap.keySet();
- for(EndPoint endpoint : endpoints)
- {
- MessagingService.getMessagingInstance().sendOneWay(messageMap.get(endpoint), endpoint);
- }
+
+ MessagingService.getMessagingInstance().sendRR(message, endpoints, quorumResponseHandler);
+ return quorumResponseHandler.get();
+
+ // TODO: if the result is false that means the writes to all the
+ // servers failed hence we need to throw an exception or return an
+ // error back to the client so that it can take appropriate action.
}
catch (Exception e)
{
- logger_.info( LogUtil.throwableToString(e) );
+ logger_.error( LogUtil.throwableToString(e) );
+ return false;
}
- return;
}
-
private static Map<String, Message> constructMessages(Map<String, ReadMessage> readMessages) throws IOException
{