You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/03/27 03:43:54 UTC
svn commit: r758993 - in /incubator/cassandra/trunk:
src/org/apache/cassandra/db/ src/org/apache/cassandra/service/
test/org/apache/cassandra/db/
Author: jbellis
Date: Fri Mar 27 02:43:53 2009
New Revision: 758993
URL: http://svn.apache.org/viewvc?rev=758993&view=rev
Log:
move row mutation factory code into RowMutation; change RM.add(name, cf) to RM.add(cf)
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java
incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java
incubator/cassandra/trunk/src/org/apache/cassandra/service/ReadResponseResolver.java
incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java?rev=758993&r1=758992&r2=758993&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java Fri Mar 27 02:43:53 2009
@@ -18,19 +18,29 @@
package org.apache.cassandra.db;
-import java.util.*;
+import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.Serializable;
-import java.io.ByteArrayOutputStream;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.batch_mutation_super_t;
+import org.apache.cassandra.service.batch_mutation_t;
+import org.apache.cassandra.service.column_t;
+import org.apache.cassandra.service.superColumn_t;
import org.apache.cassandra.utils.FBUtilities;
@@ -40,13 +50,13 @@
public class RowMutation implements Serializable
{
- private static ICompactSerializer<RowMutation> serializer_;
+ private static ICompactSerializer<RowMutation> serializer_;
public static final String HINT = "HINT";
static
{
serializer_ = new RowMutationSerializer();
- }
+ }
static ICompactSerializer<RowMutation> serializer()
{
@@ -56,7 +66,6 @@
private String table_;
private String key_;
protected Map<String, ColumnFamily> modifications_ = new HashMap<String, ColumnFamily>();
- protected Map<String, ColumnFamily> deletions_ = new HashMap<String, ColumnFamily>();
/* Ctor for JAXB */
private RowMutation()
@@ -73,20 +82,17 @@
{
table_ = table;
key_ = row.key();
- Map<String, ColumnFamily> cfSet = row.getColumnFamilyMap();
- Set<String> keyset = cfSet.keySet();
- for(String cfName : keyset)
+ for (ColumnFamily cf : row.getColumnFamilies())
{
- add(cfName, cfSet.get(cfName));
+ add(cf);
}
}
- protected RowMutation(String table, String key, Map<String, ColumnFamily> modifications, Map<String, ColumnFamily> deletions)
+ protected RowMutation(String table, String key, Map<String, ColumnFamily> modifications)
{
- table_ = table;
- key_ = key;
- modifications_ = modifications;
- deletions_ = deletions;
+ table_ = table;
+ key_ = key;
+ modifications_ = modifications;
}
public static String[] getColumnAndColumnFamily(String cf)
@@ -107,7 +113,7 @@
void addHints(String hint) throws IOException, ColumnFamilyNotDefinedException
{
String cfName = Table.hints_ + ":" + hint;
- add(cfName, new byte[0]);
+ add(cfName, ArrayUtils.EMPTY_BYTE_ARRAY, 0);
}
/*
@@ -116,24 +122,13 @@
* param @ cf - column family name
* param @ columnFamily - the column family.
*/
- public void add(String cf, ColumnFamily columnFamily)
+ public void add(ColumnFamily columnFamily)
{
- modifications_.put(cf, columnFamily);
- }
-
- /*
- * Specify a column name and a corresponding value for
- * the column. Column name is specified as <column family>:column.
- * This will result in a ColumnFamily associated with
- * <column family> as name and a Column with <column>
- * as name.
- *
- * param @ cf - column name as <column family>:<column>
- * param @ value - value associated with the column
- */
- public void add(String cf, byte[] value) throws IOException, ColumnFamilyNotDefinedException
- {
- add(cf, value, 0);
+ if (modifications_.containsKey(columnFamily.name()))
+ {
+ throw new IllegalArgumentException("ColumnFamily " + columnFamily.name() + " is already being modified");
+ }
+ modifications_.put(columnFamily.name(), columnFamily);
}
/*
@@ -206,7 +201,6 @@
modifications_.put(cfName, columnFamily);
}
-
/*
* This is equivalent to calling commit. Applies the changes to
* to the table that is obtained by calling Table.open().
@@ -237,15 +231,15 @@
* This is equivalent to calling commit. Applies the changes to
* to the table that is obtained by calling Table.open().
*/
- void load(Row row) throws IOException, ColumnFamilyNotDefinedException
+ void load(Row row) throws IOException, ColumnFamilyNotDefinedException, ExecutionException, InterruptedException
{
Table table = Table.open(table_);
Set<String> cfNames = modifications_.keySet();
- for (String cfName : cfNames )
+ for (String cfName : cfNames)
{
- if ( !table.isValidColumnFamily(cfName) )
+ if (!table.isValidColumnFamily(cfName))
throw new ColumnFamilyNotDefinedException("Column Family " + cfName + " has not been defined.");
- row.addColumnFamily( modifications_.get(cfName) );
+ row.addColumnFamily(modifications_.get(cfName));
}
table.load(row);
}
@@ -264,65 +258,110 @@
EndPoint from = (local != null) ? local : new EndPoint(FBUtilities.getHostName(), 7000);
return new Message(from, StorageService.mutationStage_, verbHandlerName, bos.toByteArray());
}
+
+ public static RowMutation getRowMutation(batch_mutation_t batchMutation)
+ {
+ RowMutation rm = new RowMutation(batchMutation.table,
+ batchMutation.key.trim());
+ for (String cfname : batchMutation.cfmap.keySet())
+ {
+ List<column_t> list = batchMutation.cfmap.get(cfname);
+ for (column_t columnData : list)
+ {
+ rm.add(cfname + ":" + columnData.columnName,
+ columnData.value.getBytes(), columnData.timestamp);
+
+ }
+ }
+ return rm;
+ }
+
+ public static RowMutation getRowMutation(batch_mutation_super_t batchMutationSuper)
+ {
+ RowMutation rm = new RowMutation(batchMutationSuper.table,
+ batchMutationSuper.key.trim());
+ Set keys = batchMutationSuper.cfmap.keySet();
+ Iterator keyIter = keys.iterator();
+ while (keyIter.hasNext())
+ {
+ Object key = keyIter.next(); // Get the next key.
+ List<superColumn_t> list = batchMutationSuper.cfmap.get(key);
+ for (superColumn_t superColumnData : list)
+ {
+ if (superColumnData.columns.size() != 0)
+ {
+ for (column_t columnData : superColumnData.columns)
+ {
+ rm.add(key.toString() + ":" + superColumnData.name + ":" + columnData.columnName,
+ columnData.value.getBytes(), columnData.timestamp);
+ }
+ }
+ else
+ {
+ rm.add(key.toString() + ":" + superColumnData.name, ArrayUtils.EMPTY_BYTE_ARRAY, 0);
+ }
+ }
+ }
+ return rm;
+ }
+
+ public String toString()
+ {
+ return "RowMutation(" +
+ "key='" + key_ + '\'' +
+ ", modifications=[" + StringUtils.join(modifications_.values(), ", ") + "]" +
+ ')';
+ }
}
class RowMutationSerializer implements ICompactSerializer<RowMutation>
{
- private void freezeTheMaps(Map<String, ColumnFamily> map, DataOutputStream dos) throws IOException
- {
- int size = map.size();
+ private void freezeTheMaps(Map<String, ColumnFamily> map, DataOutputStream dos) throws IOException
+ {
+ int size = map.size();
dos.writeInt(size);
- if ( size > 0 )
+ if (size > 0)
{
Set<String> keys = map.keySet();
- for( String key : keys )
+ for (String key : keys)
{
- dos.writeUTF(key);
+ dos.writeUTF(key);
ColumnFamily cf = map.get(key);
- if ( cf != null )
+ if (cf != null)
{
ColumnFamily.serializer().serialize(cf, dos);
}
}
}
- }
+ }
- public void serialize(RowMutation rm, DataOutputStream dos) throws IOException
- {
- dos.writeUTF(rm.table());
- dos.writeUTF(rm.key());
+ public void serialize(RowMutation rm, DataOutputStream dos) throws IOException
+ {
+ dos.writeUTF(rm.table());
+ dos.writeUTF(rm.key());
- /* serialize the modifications_ in the mutation */
+ /* serialize the modifications_ in the mutation */
freezeTheMaps(rm.modifications_, dos);
+ }
- /* serialize the deletions_ in the mutation */
- freezeTheMaps(rm.deletions_, dos);
- }
-
- private Map<String, ColumnFamily> defreezeTheMaps(DataInputStream dis) throws IOException
- {
- Map<String, ColumnFamily> map = new HashMap<String, ColumnFamily>();
+ private Map<String, ColumnFamily> defreezeTheMaps(DataInputStream dis) throws IOException
+ {
+ Map<String, ColumnFamily> map = new HashMap<String, ColumnFamily>();
int size = dis.readInt();
- for ( int i = 0; i < size; ++i )
+ for (int i = 0; i < size; ++i)
{
String key = dis.readUTF();
ColumnFamily cf = ColumnFamily.serializer().deserialize(dis);
map.put(key, cf);
}
return map;
- }
+ }
public RowMutation deserialize(DataInputStream dis) throws IOException
{
- String table = dis.readUTF();
- String key = dis.readUTF();
-
- /* Defreeze the modifications_ map */
- Map<String, ColumnFamily> modifications = defreezeTheMaps(dis);
-
- /* Defreeze the deletions_ map */
- Map<String, ColumnFamily> deletions = defreezeTheMaps(dis);
-
- return new RowMutation(table, key, modifications, deletions);
+ String table = dis.readUTF();
+ String key = dis.readUTF();
+ Map<String, ColumnFamily> modifications = defreezeTheMaps(dis);
+ return new RowMutation(table, key, modifications);
}
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java?rev=758993&r1=758992&r2=758993&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java Fri Mar 27 02:43:53 2009
@@ -18,21 +18,29 @@
package org.apache.cassandra.service;
-import com.facebook.thrift.*;
-import com.facebook.thrift.server.*;
-import com.facebook.thrift.server.TThreadPoolServer.Options;
-import com.facebook.thrift.transport.*;
-import com.facebook.thrift.protocol.*;
-import com.facebook.fb303.FacebookBase;
-import com.facebook.fb303.fb_status;
-import java.io.*;
-import java.util.Collection;
-import java.util.List;
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.Arrays;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.TimeUnit;
+import org.apache.log4j.Logger;
+
+import com.facebook.fb303.FacebookBase;
+import com.facebook.fb303.fb_status;
+import com.facebook.thrift.TException;
+import com.facebook.thrift.protocol.TBinaryProtocol;
+import com.facebook.thrift.protocol.TProtocolFactory;
+import com.facebook.thrift.server.TThreadPoolServer;
+import com.facebook.thrift.server.TThreadPoolServer.Options;
+import com.facebook.thrift.transport.TServerSocket;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql.common.CqlResult;
@@ -45,8 +53,11 @@
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.IAsyncResult;
import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.DataOutputBuffer;
+
/**
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
*/
@@ -69,7 +80,7 @@
storageService = StorageService.instance();
}
- public CassandraServer() throws Throwable
+ public CassandraServer()
{
super("CassandraServer");
// Create the instance of the storage service
@@ -81,7 +92,7 @@
* specified port.
*/
public void start() throws Throwable
- {
+ {
LogUtil.init();
//LogUtil.setLogLevel("com.facebook", "DEBUG");
// Start the storage service
@@ -474,107 +485,21 @@
public boolean batch_insert_blocking(batch_mutation_t batchMutation)
{
- // 1. Get the N nodes from storage service where the data needs to be
- // replicated
- // 2. Construct a message for read\write
- // 3. SendRR ( to all the nodes above )
- // 4. Wait for a response from atleast X nodes where X <= N
- // 5. return success
- boolean result = false;
- try
- {
- logger_.warn(" batch_insert_blocking");
- validateTable(batchMutation.table);
- IResponseResolver<Boolean> writeResponseResolver = new WriteResponseResolver();
- QuorumResponseHandler<Boolean> quorumResponseHandler = new QuorumResponseHandler<Boolean>(
- DatabaseDescriptor.getReplicationFactor(),
- writeResponseResolver);
- EndPoint[] endpoints = storageService.getNStorageEndPoint(batchMutation.key);
- // TODO: throw a thrift exception if we do not have N nodes
-
- logger_.debug(" Creating the row mutation");
- RowMutation rm = new RowMutation(batchMutation.table,
- batchMutation.key.trim());
- Set keys = batchMutation.cfmap.keySet();
- Iterator keyIter = keys.iterator();
- while (keyIter.hasNext())
- {
- Object key = keyIter.next(); // Get the next key.
- List<column_t> list = batchMutation.cfmap.get(key);
- for (column_t columnData : list)
- {
- rm.add(key.toString() + ":" + columnData.columnName,
- columnData.value.getBytes(), columnData.timestamp);
-
- }
- }
-
- RowMutationMessage rmMsg = new RowMutationMessage(rm);
- Message message = new Message(StorageService.getLocalStorageEndPoint(),
- StorageService.mutationStage_,
- StorageService.mutationVerbHandler_,
- new Object[]{ rmMsg }
- );
- MessagingService.getMessagingInstance().sendRR(message, endpoints,
- quorumResponseHandler);
- logger_.debug(" Calling quorum response handler's get");
- result = quorumResponseHandler.get();
-
- // TODO: if the result is false that means the writes to all the
- // servers failed hence we need to throw an exception or return an
- // error back to the client so that it can take appropriate action.
- }
- catch (Exception e)
- {
- logger_.info( LogUtil.throwableToString(e) );
- }
- return result;
-
+ logger_.debug("batch_insert_blocking");
+ RowMutation rm = RowMutation.getRowMutation(batchMutation);
+ return StorageProxy.insertBlocking(rm);
}
+
public void batch_insert(batch_mutation_t batchMutation)
{
- // 1. Get the N nodes from storage service where the data needs to be
- // replicated
- // 2. Construct a message for read\write
- // 3. SendRR ( to all the nodes above )
- // 4. Wait for a response from atleast X nodes where X <= N
- // 5. return success
-
- try
- {
- logger_.debug(" batch_insert");
- logger_.debug(" Creating the row mutation");
- validateTable(batchMutation.table);
- RowMutation rm = new RowMutation(batchMutation.table,
- batchMutation.key.trim());
- if(batchMutation.cfmap != null)
- {
- Set keys = batchMutation.cfmap.keySet();
- Iterator keyIter = keys.iterator();
- while (keyIter.hasNext())
- {
- Object key = keyIter.next(); // Get the next key.
- List<column_t> list = batchMutation.cfmap.get(key);
- for (column_t columnData : list)
- {
- rm.add(key.toString() + ":" + columnData.columnName,
- columnData.value.getBytes(), columnData.timestamp);
-
- }
- }
- }
- StorageProxy.insert(rm);
- }
- catch (Exception e)
- {
- logger_.info( LogUtil.throwableToString(e) );
- }
- return;
+ logger_.debug("batch_insert");
+ RowMutation rm = RowMutation.getRowMutation(batchMutation);
+ StorageProxy.insert(rm);
}
public void remove(String tablename, String key, String columnFamily_column)
{
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException("Remove is coming soon");
}
public boolean remove(String tablename, String key, String columnFamily_column, long timestamp, int block_for)
@@ -795,86 +720,16 @@
public boolean batch_insert_superColumn_blocking(batch_mutation_super_t batchMutationSuper)
{
- boolean result = false;
- try
- {
- logger_.warn(" batch_insert_SuperColumn_blocking");
- logger_.debug(" Creating the row mutation");
- validateTable(batchMutationSuper.table);
- RowMutation rm = new RowMutation(batchMutationSuper.table,
- batchMutationSuper.key.trim());
- Set keys = batchMutationSuper.cfmap.keySet();
- Iterator keyIter = keys.iterator();
- while (keyIter.hasNext())
- {
- Object key = keyIter.next(); // Get the next key.
- List<superColumn_t> list = batchMutationSuper.cfmap.get(key);
- for (superColumn_t superColumnData : list)
- {
- if(superColumnData.columns.size() != 0 )
- {
- for (column_t columnData : superColumnData.columns)
- {
- rm.add(key.toString() + ":" + superColumnData.name +":" + columnData.columnName,
- columnData.value.getBytes(), columnData.timestamp);
- }
- }
- else
- {
- rm.add(key.toString() + ":" + superColumnData.name, new byte[0], 0);
- }
- }
- }
- StorageProxy.insert(rm);
- }
- catch (Exception e)
- {
- logger_.info( LogUtil.throwableToString(e) );
- }
- return result;
-
+ logger_.debug("batch_insert_SuperColumn_blocking");
+ RowMutation rm = RowMutation.getRowMutation(batchMutationSuper);
+ return StorageProxy.insertBlocking(rm);
}
+
public void batch_insert_superColumn(batch_mutation_super_t batchMutationSuper)
{
- try
- {
- logger_.debug(" batch_insert");
- logger_.debug(" Creating the row mutation");
- validateTable(batchMutationSuper.table);
- RowMutation rm = new RowMutation(batchMutationSuper.table,
- batchMutationSuper.key.trim());
- if(batchMutationSuper.cfmap != null)
- {
- Set keys = batchMutationSuper.cfmap.keySet();
- Iterator keyIter = keys.iterator();
- while (keyIter.hasNext())
- {
- Object key = keyIter.next(); // Get the next key.
- List<superColumn_t> list = batchMutationSuper.cfmap.get(key);
- for (superColumn_t superColumnData : list)
- {
- if(superColumnData.columns.size() != 0 )
- {
- for (column_t columnData : superColumnData.columns)
- {
- rm.add(key.toString() + ":" + superColumnData.name +":" + columnData.columnName,
- columnData.value.getBytes(), columnData.timestamp);
- }
- }
- else
- {
- rm.add(key.toString() + ":" + superColumnData.name, new byte[0], 0);
- }
- }
- }
- }
- StorageProxy.insert(rm);
- }
- catch (Exception e)
- {
- logger_.info( LogUtil.throwableToString(e) );
- }
- return;
+ logger_.debug("batch_insert_SuperColumn");
+ RowMutation rm = RowMutation.getRowMutation(batchMutationSuper);
+ StorageProxy.insert(rm);
}
public String getStringProperty(String propertyName) throws TException
@@ -961,7 +816,7 @@
}
return result;
}
-
+
/*
* This method is used to ensure that all keys
* prior to the specified key, as dtermined by
@@ -999,6 +854,7 @@
public static void main(String[] args) throws Throwable
{
int port = 9160;
+
try
{
CassandraServer peerStorageServer = new CassandraServer();
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/ReadResponseResolver.java?rev=758993&r1=758992&r2=758993&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/ReadResponseResolver.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/ReadResponseResolver.java Fri Mar 27 02:43:53 2009
@@ -147,7 +147,7 @@
for ( String cfName : cfNames )
{
ColumnFamily cf = columnFamilies.get(cfName);
- rowMutation.add(cfName, cf);
+ rowMutation.add(cf);
}
RowMutationMessage rowMutationMessage = new RowMutationMessage(rowMutation);
// schedule the read repair
Modified: incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=758993&r1=758992&r2=758993&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java (original)
+++ incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java Fri Mar 27 02:43:53 2009
@@ -73,7 +73,7 @@
rm = new RowMutation("Table1", "key1");
ColumnFamily cf = new ColumnFamily("Standard1");
cf.delete(1);
- rm.add(cf.name(), cf);
+ rm.add(cf);
rm.apply();
ColumnFamily retrieved = store.getColumnFamily("key1", "Standard1", new IdentityFilter());
@@ -98,7 +98,7 @@
SuperColumn sc = new SuperColumn("SC1");
sc.markForDeleteAt(1);
cf.addColumn(sc);
- rm.add(cf.name(), cf);
+ rm.add(cf);
rm.apply();
List<ColumnFamily> families = store.getColumnFamilies("key1", "Super1", new IdentityFilter());