You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/11/26 00:16:35 UTC
svn commit: r884331 - in /incubator/cassandra/trunk:
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/service/
test/unit/org/apache/cassandra/db/
Author: jbellis
Date: Wed Nov 25 23:16:34 2009
New Revision: 884331
URL: http://svn.apache.org/viewvc?rev=884331&view=rev
Log:
r/m unused Row code, and move table variable into callers rather than serializing it redundantly
patch by jbellis; tested by Dan Di Spaltro for CASSANDRA-578
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=884331&r1=884330&r2=884331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Nov 25 23:16:34 2009
@@ -1464,7 +1464,7 @@
columnNameSet.addAll(columnNames);
for (String key : rr.keys)
{
- Row row = new Row(table_, key);
+ Row row = new Row(key);
QueryFilter filter = sliceRange == null ? new NamesQueryFilter(key, queryPath, columnNameSet) : new SliceQueryFilter(key, queryPath, sliceRange.start, sliceRange.finish, sliceRange.reversed, sliceRange.count);
row.addColumnFamily(getColumnFamily(filter));
rows.add(row);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=884331&r1=884330&r2=884331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java Wed Nov 25 23:16:34 2009
@@ -74,11 +74,11 @@
/* Don't service reads! */
throw new RuntimeException("Cannot service reads while bootstrapping!");
}
- ReadCommand readCommand = ReadCommand.serializer().deserialize(readCtx.bufIn_);
- Table table = Table.open(readCommand.table);
- Row row = readCommand.getRow(table);
+ ReadCommand command = ReadCommand.serializer().deserialize(readCtx.bufIn_);
+ Table table = Table.open(command.table);
+ Row row = command.getRow(table);
ReadResponse readResponse;
- if (readCommand.isDigestQuery())
+ if (command.isDigestQuery())
{
if (logger_.isDebugEnabled())
logger_.debug("digest is " + FBUtilities.bytesToHex(row.digest()));
@@ -88,7 +88,7 @@
{
readResponse = new ReadResponse(row);
}
- readResponse.setIsDigestQuery(readCommand.isDigestQuery());
+ readResponse.setIsDigestQuery(command.isDigestQuery());
/* serialize the ReadResponseMessage. */
readCtx.bufOut_.reset();
@@ -99,13 +99,17 @@
Message response = message.getReply(FBUtilities.getLocalAddress(), bytes);
if (logger_.isDebugEnabled())
- logger_.debug("Read key " + readCommand.key + "; sending response to " + message.getMessageId() + "@" + message.getFrom());
+ logger_.debug("Read key " + command.key + "; sending response to " + message.getMessageId() + "@" + message.getFrom());
MessagingService.instance().sendOneWay(response, message.getFrom());
/* Do read repair if header of the message says so */
if (message.getHeader(ReadCommand.DO_REPAIR) != null)
{
- doReadRepair(row, readCommand);
+ List<InetAddress> endpoints = StorageService.instance().getLiveNaturalEndpoints(command.key);
+ /* Remove the local storage endpoint from the list. */
+ endpoints.remove(FBUtilities.getLocalAddress());
+ if (endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
+ StorageService.instance().doConsistencyCheck(row, endpoints, command);
}
}
catch (IOException ex)
@@ -113,14 +117,4 @@
throw new RuntimeException(ex);
}
}
-
- private void doReadRepair(Row row, ReadCommand readCommand)
- {
- List<InetAddress> endpoints = StorageService.instance().getLiveNaturalEndpoints(readCommand.key);
- /* Remove the local storage endpoint from the list. */
- endpoints.remove(FBUtilities.getLocalAddress());
-
- if (endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
- StorageService.instance().doConsistencyCheck(row, endpoints, readCommand);
- }
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java?rev=884331&r1=884330&r2=884331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java Wed Nov 25 23:16:34 2009
@@ -38,7 +38,6 @@
public class Row
{
private static Logger logger_ = Logger.getLogger(Row.class);
- private String table_;
private static RowSerializer serializer = new RowSerializer();
static RowSerializer serializer()
@@ -46,17 +45,11 @@
return serializer;
}
- public Row(String table, String key)
+ public Row(String key)
{
- assert table != null;
- this.table_ = table;
this.key_ = key;
}
- public String getTable() {
- return table_;
- }
-
private String key_;
private Map<String, ColumnFamily> columnFamilies_ = new HashMap<String, ColumnFamily>();
@@ -66,21 +59,6 @@
return key_;
}
- void setKey(String key)
- {
- key_ = key;
- }
-
- public void setTable(String table)
- {
- table_ = table;
- }
-
- public Set<String> getColumnFamilyNames()
- {
- return columnFamilies_.keySet();
- }
-
public Collection<ColumnFamily> getColumnFamilies()
{
return columnFamilies_.values();
@@ -96,11 +74,6 @@
columnFamilies_.put(columnFamily.name(), columnFamily);
}
- void removeColumnFamily(ColumnFamily columnFamily)
- {
- columnFamilies_.remove(columnFamily.name());
- }
-
public boolean isEmpty()
{
return (columnFamilies_.size() == 0);
@@ -138,7 +111,7 @@
*/
public Row diff(Row rowComposite)
{
- Row rowDiff = new Row(table_, key_);
+ Row rowDiff = new Row(key_);
for (ColumnFamily cfComposite : rowComposite.getColumnFamilies())
{
@@ -160,7 +133,7 @@
public Row cloneMe()
{
- Row row = new Row(table_, key_);
+ Row row = new Row(key_);
row.columnFamilies_ = new HashMap<String, ColumnFamily>(columnFamilies_);
return row;
}
@@ -185,18 +158,6 @@
return digest.digest();
}
- void clear()
- {
- columnFamilies_.clear();
- }
-
- public DataOutputBuffer getSerializedBuffer() throws IOException
- {
- DataOutputBuffer buffer = new DataOutputBuffer();
- Row.serializer().serialize(this, buffer);
- return buffer;
- }
-
public String toString()
{
return "Row(" + key_ + " [" + StringUtils.join(columnFamilies_.values(), ", ") + "])";
@@ -207,7 +168,6 @@
{
public void serialize(Row row, DataOutputStream dos) throws IOException
{
- dos.writeUTF(row.getTable());
dos.writeUTF(row.key());
Collection<ColumnFamily> columnFamilies = row.getColumnFamilies();
int size = columnFamilies.size();
@@ -221,9 +181,8 @@
public Row deserialize(DataInputStream dis) throws IOException
{
- String table = dis.readUTF();
String key = dis.readUTF();
- Row row = new Row(table, key);
+ Row row = new Row(key);
int size = dis.readInt();
for (int i = 0; i < size; ++i)
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=884331&r1=884330&r2=884331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed Nov 25 23:16:34 2009
@@ -399,7 +399,7 @@
@Deprecated // CF should be our atom of work, not Row
public Row get(String key) throws IOException
{
- Row row = new Row(table_, key);
+ Row row = new Row(key);
for (String columnFamily : getColumnFamilies())
{
ColumnFamily cf = get(key, columnFamily);
@@ -429,7 +429,7 @@
@Deprecated
public Row getRow(String key, String cfName) throws IOException
{
- Row row = new Row(table_, key);
+ Row row = new Row(key);
ColumnFamily columnFamily = get(key, cfName);
if ( columnFamily != null )
row.addColumnFamily(columnFamily);
@@ -439,7 +439,7 @@
public Row getRow(QueryFilter filter) throws IOException
{
ColumnFamilyStore cfStore = columnFamilyStores_.get(filter.getColumnFamilyName());
- Row row = new Row(table_, filter.key);
+ Row row = new Row(filter.key);
ColumnFamily columnFamily = cfStore.getColumnFamily(filter);
if (columnFamily != null)
row.addColumnFamily(columnFamily);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java?rev=884331&r1=884330&r2=884331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java Wed Nov 25 23:16:34 2009
@@ -40,8 +40,9 @@
class ConsistencyManager implements Runnable
{
private static Logger logger_ = Logger.getLogger(ConsistencyManager.class);
-
- class DigestResponseHandler implements IAsyncCallback
+ private final String table_;
+
+ class DigestResponseHandler implements IAsyncCallback
{
List<Message> responses_ = new ArrayList<Message>();
@@ -78,7 +79,7 @@
private void doReadRepair() throws IOException
{
- IResponseResolver<Row> readResponseResolver = new ReadResponseResolver();
+ IResponseResolver<Row> readResponseResolver = new ReadResponseResolver(table_);
/* Add the local storage endpoint to the replicas_ list */
replicas_.add(FBUtilities.getLocalAddress());
IAsyncCallback responseHandler = new DataRepairHandler(ConsistencyManager.this.replicas_.size(), readResponseResolver);
@@ -134,8 +135,9 @@
protected final List<InetAddress> replicas_;
private final ReadCommand readCommand_;
- public ConsistencyManager(Row row, List<InetAddress> replicas, ReadCommand readCommand)
+ public ConsistencyManager(String table, Row row, List<InetAddress> replicas, ReadCommand readCommand)
{
+ table_ = table;
row_ = row;
replicas_ = replicas;
readCommand_ = readCommand;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=884331&r1=884330&r2=884331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java Wed Nov 25 23:16:34 2009
@@ -48,14 +48,20 @@
public class ReadResponseResolver implements IResponseResolver<Row>
{
private static Logger logger_ = Logger.getLogger(ReadResponseResolver.class);
+ private final String table;
- /*
- * This method for resolving read data should look at the timestamps of each
- * of the columns that are read and should pick up columns with the latest
- * timestamp. For those columns where the timestamp is not the latest a
- * repair request should be scheduled.
- *
- */
+ public ReadResponseResolver(String table)
+ {
+ this.table = table;
+ }
+
+ /*
+ * This method for resolving read data should look at the timestamps of each
+ * of the columns that are read and should pick up columns with the latest
+ * timestamp. For those columns where the timestamp is not the latest a
+ * repair request should be scheduled.
+ *
+ */
public Row resolve(List<Message> responses) throws DigestMismatchException, IOException
{
long startTime = System.currentTimeMillis();
@@ -63,7 +69,6 @@
List<Row> rowList = new ArrayList<Row>();
List<InetAddress> endPoints = new ArrayList<InetAddress>();
String key = null;
- String table = null;
byte[] digest = new byte[0];
boolean isDigestQuery = false;
@@ -89,7 +94,6 @@
rowList.add(result.row());
endPoints.add(response.getFrom());
key = result.row().key();
- table = result.row().getTable();
}
}
// If there was a digest query compare it with all the data digests
@@ -114,7 +118,7 @@
}
/* Now calculate the resolved row */
- retRow = new Row(table, key);
+ retRow = new Row(key);
for (int i = 0; i < rowList.size(); i++)
{
retRow.repair(rowList.get(i));
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=884331&r1=884330&r2=884331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed Nov 25 23:16:34 2009
@@ -437,7 +437,7 @@
}
if (n < DatabaseDescriptor.getQuorum())
throw new UnavailableException();
- QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(DatabaseDescriptor.getQuorum(), new ReadResponseResolver());
+ QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(DatabaseDescriptor.getQuorum(), new ReadResponseResolver(command.table));
MessagingService.instance().sendRR(messages, endPoints, quorumResponseHandler);
quorumResponseHandlers.add(quorumResponseHandler);
commandEndPoints.add(endPoints);
@@ -465,7 +465,7 @@
{
if (DatabaseDescriptor.getConsistencyCheck())
{
- IResponseResolver<Row> readResponseResolverRepair = new ReadResponseResolver();
+ IResponseResolver<Row> readResponseResolverRepair = new ReadResponseResolver(command.table);
QuorumResponseHandler<Row> quorumResponseHandlerRepair = new QuorumResponseHandler<Row>(
DatabaseDescriptor.getQuorum(),
readResponseResolverRepair);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=884331&r1=884330&r2=884331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed Nov 25 23:16:34 2009
@@ -346,14 +346,14 @@
return endPointSnitch_.isInSameDataCenter(FBUtilities.getLocalAddress(), endpoint);
}
- /*
+ /**
* This method performs the requisite operations to make
* sure that the N replicas are in sync. We do this in the
* background when we do not care much about consistency.
*/
public void doConsistencyCheck(Row row, List<InetAddress> endpoints, ReadCommand command)
{
- Runnable consistencySentinel = new ConsistencyManager(row.cloneMe(), endpoints, command);
+ Runnable consistencySentinel = new ConsistencyManager(command.table, row.cloneMe(), endpoints, command);
consistencyManager_.submit(consistencySentinel);
}
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java?rev=884331&r1=884330&r2=884331&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java Wed Nov 25 23:16:34 2009
@@ -60,12 +60,12 @@
@Test
public void testRepair()
{
- Row row1 = new Row("", "");
+ Row row1 = new Row("");
ColumnFamily cf1 = ColumnFamily.create("Keyspace1", "Standard1");
cf1.addColumn(column("one", "A", 0));
row1.addColumnFamily(cf1);
- Row row2 = new Row("", "");
+ Row row2 = new Row("");
ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
cf2.addColumn(column("one", "B", 1));
cf2.addColumn(column("two", "C", 1));