You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/09/24 21:55:14 UTC
svn commit: r818607 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra/db: CommitLog.java
RowMutation.java RowMutationVerbHandler.java Table.java
Author: jbellis
Date: Thu Sep 24 19:55:10 2009
New Revision: 818607
URL: http://svn.apache.org/viewvc?rev=818607&view=rev
Log:
CommitLog and Table.apply/applyNow/load paths use RMs directly instead of converting to Rows first. patch by Sandeep Tata; reviewed by jbellis for CASSANDRA-456
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=818607&r1=818606&r2=818607&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java Thu Sep 24 19:55:10 2009
@@ -317,27 +317,27 @@
bufIn.reset(bytes, bytes.length);
/* read the commit log entry */
- Row row = Row.serializer().deserialize(bufIn);
+ RowMutation rm = RowMutation.serializer().deserialize(bufIn);
if (logger_.isDebugEnabled())
logger_.debug(String.format("replaying mutation for %s.%s: %s",
- row.getTable(),
- row.key(),
- "{" + StringUtils.join(row.getColumnFamilies(), ", ") + "}"));
- Table table = Table.open(row.getTable());
+ rm.getTable(),
+ rm.key(),
+ "{" + StringUtils.join(rm.getColumnFamilies(), ", ") + "}"));
+ Table table = Table.open(rm.getTable());
tablesRecovered.add(table);
- Collection<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>(row.getColumnFamilies());
+ Collection<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>(rm.getColumnFamilies());
/* remove column families that have already been flushed */
for (ColumnFamily columnFamily : columnFamilies)
{
int id = table.getColumnFamilyId(columnFamily.name());
if (!clHeader.isDirty(id) || reader.getFilePointer() < clHeader.getPosition(id))
{
- row.removeColumnFamily(columnFamily);
+ rm.removeColumnFamily(columnFamily);
}
}
- if (!row.isEmpty())
+ if (!rm.isEmpty())
{
- table.applyNow(row);
+ table.applyNow(rm);
}
}
reader.close();
@@ -353,10 +353,10 @@
* Update the header of the commit log if a new column family
* is encountered for the first time.
*/
- private void maybeUpdateHeader(Row row) throws IOException
+ private void maybeUpdateHeader(RowMutation rm) throws IOException
{
- Table table = Table.open(row.getTable());
- for (ColumnFamily columnFamily : row.getColumnFamilies())
+ Table table = Table.open(rm.getTable());
+ for (ColumnFamily columnFamily : rm.getColumnFamilies())
{
int id = table.getColumnFamilyId(columnFamily.name());
if (!clHeader_.isDirty(id) || (clHeader_.isDirty(id) && clHeader_.getPosition(id) == 0))
@@ -396,9 +396,9 @@
* of any problems. This way we can assume that the subsequent commit log
* entry will override the garbage left over by the previous write.
*/
- void add(Row row, DataOutputBuffer serializedRow) throws IOException
+ void add(RowMutation rowMutation, DataOutputBuffer serializedRow) throws IOException
{
- Callable<CommitLogContext> task = new LogRecordAdder(row, serializedRow);
+ Callable<CommitLogContext> task = new LogRecordAdder(rowMutation, serializedRow);
try
{
@@ -558,12 +558,12 @@
class LogRecordAdder implements Callable<CommitLog.CommitLogContext>
{
- final Row row;
+ final RowMutation rowMutation;
final Object serializedRow;
- LogRecordAdder(Row row, DataOutputBuffer serializedRow)
+ LogRecordAdder(RowMutation rm, DataOutputBuffer serializedRow)
{
- this.row = row;
+ this.rowMutation = rm;
this.serializedRow = serializedRow;
}
@@ -576,7 +576,7 @@
currentPosition = logWriter_.getFilePointer();
CommitLogContext cLogCtx = new CommitLogContext(logFile_, currentPosition);
/* Update the header */
- maybeUpdateHeader(row);
+ maybeUpdateHeader(rowMutation);
if (serializedRow instanceof DataOutputBuffer)
{
DataOutputBuffer buffer = (DataOutputBuffer) serializedRow;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=818607&r1=818606&r2=818607&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Thu Sep 24 19:55:10 2009
@@ -23,6 +23,7 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.Serializable;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -33,6 +34,7 @@
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
+import org.apache.cassandra.io.DataOutputBuffer;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.net.Message;
@@ -83,7 +85,7 @@
modifications_ = modifications;
}
- public String table()
+ public String getTable()
{
return table_;
}
@@ -97,6 +99,11 @@
{
return modifications_.keySet();
}
+
+ public Collection<ColumnFamily> getColumnFamilies()
+ {
+ return modifications_.values();
+ }
void addHints(String key, String host) throws IOException
{
@@ -119,6 +126,17 @@
modifications_.put(columnFamily.name(), columnFamily);
}
+ /** should only be called by commitlog replay code */
+ public void removeColumnFamily(ColumnFamily columnFamily)
+ {
+ modifications_.remove(columnFamily.name());
+ }
+
+ public boolean isEmpty()
+ {
+ return modifications_.isEmpty();
+ }
+
/*
* Specify a column name and a corresponding value for
* the column. Column name is specified as <column family>:column.
@@ -183,19 +201,8 @@
* to the table that is obtained by calling Table.open().
*/
public void apply() throws IOException
- {
- Row row = createRow();
- Table.open(table_).apply(row, row.getSerializedBuffer());
- }
-
- private Row createRow()
- {
- Row row = new Row(table_, key_);
- for (String cfName : modifications_.keySet())
- {
- row.addColumnFamily(modifications_.get(cfName));
- }
- return row;
+ {
+ Table.open(table_).apply(this, this.getSerializedBuffer());
}
/*
@@ -204,7 +211,7 @@
*/
void applyBinary() throws IOException, ExecutionException, InterruptedException
{
- Table.open(table_).load(createRow());
+ Table.open(table_).load(this);
}
public Message makeRowMutationMessage() throws IOException
@@ -247,6 +254,13 @@
}
return rm;
}
+
+ public DataOutputBuffer getSerializedBuffer() throws IOException
+ {
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ RowMutation.serializer().serialize(this, buffer);
+ return buffer;
+ }
public String toString()
{
@@ -281,7 +295,7 @@
public void serialize(RowMutation rm, DataOutputStream dos) throws IOException
{
- dos.writeUTF(rm.table());
+ dos.writeUTF(rm.getTable());
dos.writeUTF(rm.key());
/* serialize the modifications_ in the mutation */
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=818607&r1=818606&r2=818607&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Thu Sep 24 19:55:10 2009
@@ -53,14 +53,14 @@
if (logger_.isDebugEnabled())
logger_.debug("Adding hint for " + hint);
/* add necessary hints to this mutation */
- RowMutation hintedMutation = new RowMutation(Table.SYSTEM_TABLE, rm.table());
+ RowMutation hintedMutation = new RowMutation(Table.SYSTEM_TABLE, rm.getTable());
hintedMutation.addHints(rm.key(), hint.getHost());
hintedMutation.apply();
}
rm.apply();
- WriteResponse response = new WriteResponse(rm.table(), rm.key(), true);
+ WriteResponse response = new WriteResponse(rm.getTable(), rm.key(), true);
Message responseMessage = WriteResponse.makeWriteResponseMessage(message, response);
if (logger_.isDebugEnabled())
logger_.debug(rm + " applied. Sending response to " + message.getMessageId() + "@" + message.getFrom());
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=818607&r1=818606&r2=818607&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Thu Sep 24 19:55:10 2009
@@ -594,20 +594,20 @@
* Once this happens the data associated with the individual column families
* is also written to the column family store's memtable.
*/
- void apply(Row row, DataOutputBuffer serializedRow) throws IOException
+ void apply(RowMutation mutation, DataOutputBuffer serializedMutation) throws IOException
{
HashMap<ColumnFamilyStore,Memtable> memtablesToFlush = new HashMap<ColumnFamilyStore, Memtable>(2);
flusherLock_.readLock().lock();
try
{
- CommitLog.open().add(row, serializedRow);
+ CommitLog.open().add(mutation, serializedMutation);
- for (ColumnFamily columnFamily : row.getColumnFamilies())
+ for (ColumnFamily columnFamily : mutation.getColumnFamilies())
{
Memtable memtableToFlush;
ColumnFamilyStore cfStore = columnFamilyStores_.get(columnFamily.name());
- if ((memtableToFlush=cfStore.apply(row.key(), columnFamily)) != null)
+ if ((memtableToFlush=cfStore.apply(mutation.key(), columnFamily)) != null)
memtablesToFlush.put(cfStore, memtableToFlush);
}
}
@@ -621,7 +621,7 @@
entry.getKey().switchMemtable(entry.getValue());
}
- void applyNow(Row row) throws IOException
+ void applyNow(RowMutation row) throws IOException
{
String key = row.key();
for (ColumnFamily columnFamily : row.getColumnFamilies())
@@ -647,11 +647,11 @@
}
// for binary load path. skips commitlog.
- void load(Row row) throws IOException
+ void load(RowMutation rowMutation) throws IOException
{
- String key = row.key();
+ String key = rowMutation.key();
- for (ColumnFamily columnFamily : row.getColumnFamilies())
+ for (ColumnFamily columnFamily : rowMutation.getColumnFamilies())
{
Collection<IColumn> columns = columnFamily.getSortedColumns();
for (IColumn column : columns)
@@ -660,7 +660,6 @@
cfStore.applyBinary(key, column.value());
}
}
- row.clear();
}
public SortedSet<String> getApplicationColumnFamilies()