You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ee...@apache.org on 2010/08/20 23:30:59 UTC
svn commit: r987640 - in /cassandra/trunk: interface/cassandra.genavro
src/java/org/apache/cassandra/avro/CassandraServer.java
src/java/org/apache/cassandra/config/ColumnDefinition.java
test/system/test_avro_server.py
Author: eevans
Date: Fri Aug 20 21:30:59 2010
New Revision: 987640
URL: http://svn.apache.org/viewvc?rev=987640&view=rev
Log:
several new avro rpc method implementations
Patch by Antoine Toulme w/ some changes by eevans for CASSANDRA-1238
Modified:
cassandra/trunk/interface/cassandra.genavro
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
cassandra/trunk/src/java/org/apache/cassandra/config/ColumnDefinition.java
cassandra/trunk/test/system/test_avro_server.py
Modified: cassandra/trunk/interface/cassandra.genavro
URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/cassandra.genavro?rev=987640&r1=987639&r2=987640&view=diff
==============================================================================
--- cassandra/trunk/interface/cassandra.genavro (original)
+++ cassandra/trunk/interface/cassandra.genavro Fri Aug 20 21:30:59 2010
@@ -23,7 +23,7 @@
protocol Cassandra {
enum AccessLevel {
- NONE, READONLY, READWRITE, FALL
+ NONE, READONLY, READWRITE, FULL
}
record ColumnPath {
@@ -213,11 +213,22 @@ protocol Cassandra {
ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException;
+ string system_add_column_family(CfDef cf_def)
+ throws InvalidRequestException;
+
void system_add_keyspace(KsDef ks_def) throws InvalidRequestException;
+
+ string system_rename_column_family(string old_name, string new_name)
+ throws InvalidRequestException;
+
+ string system_drop_column_family(string column_family)
+ throws InvalidRequestException;
void set_keyspace(string keyspace) throws InvalidRequestException;
array<string> describe_keyspaces();
+
+ KsDef describe_keyspace(string keyspace) throws NotFoundException;
string describe_cluster_name();
Modified: cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java?rev=987640&r1=987639&r2=987640&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java Fri Aug 20 21:30:59 2010
@@ -30,6 +30,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import org.apache.avro.Schema;
@@ -37,10 +40,11 @@ import org.apache.avro.generic.GenericAr
import org.apache.avro.generic.GenericData;
import org.apache.avro.ipc.AvroRemoteException;
import org.apache.avro.util.Utf8;
-import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.avro.InvalidRequestException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.auth.AllowAllAuthenticator;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
@@ -53,7 +57,11 @@ import org.apache.cassandra.db.clock.Abs
import org.apache.cassandra.db.clock.TimestampReconciler;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.marshal.MarshalException;
+import org.apache.cassandra.db.migration.AddColumnFamily;
import org.apache.cassandra.db.migration.AddKeyspace;
+import org.apache.cassandra.db.migration.DropColumnFamily;
+import org.apache.cassandra.db.migration.Migration;
+import org.apache.cassandra.db.migration.RenameColumnFamily;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.scheduler.IRequestScheduler;
import org.apache.cassandra.service.StorageProxy;
@@ -76,6 +84,10 @@ public class CassandraServer implements
private final static String D_CF_SUBCOMPTYPE = "";
private final static String D_CF_RECONCILER = null;
+ //ColumnDef default values
+ public final static String D_COLDEF_INDEXTYPE = "KEYS";
+ public final static String D_COLDEF_INDEXNAME = null;
+
private ThreadLocal<AccessLevel> loginDone = new ThreadLocal<AccessLevel>()
{
@Override
@@ -548,6 +560,46 @@ public class CassandraServer implements
}
}
+ // Copy-pasted from the thrift CassandraServer, using the factory methods to create exceptions.
+ // helper method to apply migration on the migration stage. typical migration failures will throw an
+ // InvalidRequestException. atypical failures will throw a RuntimeException.
+ private static void applyMigrationOnStage(final Migration m) throws InvalidRequestException
+ {
+ Future f = StageManager.getStage(StageManager.MIGRATION_STAGE).submit(new Callable()
+ {
+ public Object call() throws Exception
+ {
+ m.apply();
+ m.announce();
+ return null;
+ }
+ });
+ try
+ {
+ f.get();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (ExecutionException e)
+ {
+ // this means call() threw an exception. deal with it directly.
+ if (e.getCause() != null)
+ {
+ InvalidRequestException ex = newInvalidRequestException(e.getCause().getMessage());
+ ex.initCause(e.getCause());
+ throw ex;
+ }
+ else
+ {
+ InvalidRequestException ex = newInvalidRequestException(e.getMessage());
+ ex.initCause(e);
+ throw ex;
+ }
+ }
+ }
+
private org.apache.cassandra.thrift.ConsistencyLevel thriftConsistencyLevel(ConsistencyLevel consistency)
{
switch (consistency)
@@ -678,6 +730,28 @@ public class CassandraServer implements
}
@Override
+ public CharSequence system_add_column_family(CfDef cfDef) throws AvroRemoteException, InvalidRequestException
+ {
+ checkKeyspaceAndLoginAuthorized(AccessLevel.FULL);
+ try
+ {
+ applyMigrationOnStage(new AddColumnFamily(convertToCFMetaData(cfDef)));
+ return DatabaseDescriptor.getDefsVersion().toString();
+ } catch (ConfigurationException e)
+ {
+ InvalidRequestException ex = newInvalidRequestException(e.getMessage());
+ ex.initCause(e);
+ throw ex;
+ }
+ catch (IOException e)
+ {
+ InvalidRequestException ex = newInvalidRequestException(e.getMessage());
+ ex.initCause(e);
+ throw ex;
+ }
+ }
+
+ @Override
public GenericArray<CharSequence> describe_keyspaces() throws AvroRemoteException
{
Set<String> keyspaces = DatabaseDescriptor.getTables();
@@ -708,6 +782,22 @@ public class CassandraServer implements
logger.debug("checking schema agreement");
return StorageProxy.checkSchemaAgreement();
}
+
+ protected void checkKeyspaceAndLoginAuthorized(AccessLevel level) throws InvalidRequestException
+ {
+ if (curKeyspace.get() == null)
+ {
+ throw newInvalidRequestException("You have not assigned a keyspace; please use set_keyspace (and login if necessary)");
+ }
+
+ if (!(DatabaseDescriptor.getAuthenticator() instanceof AllowAllAuthenticator))
+ {
+ if (loginDone.get() == null)
+ throw newInvalidRequestException("You have not logged into keyspace " + curKeyspace.get());
+ if (loginDone.get().compareTo(level) < 0)
+ throw newInvalidRequestException("Your credentials are not sufficient to perform " + level + " operations");
+ }
+ }
/**
* Schedule the current thread for access to the required services
@@ -724,4 +814,142 @@ public class CassandraServer implements
{
requestScheduler.release();
}
+
+ private CFMetaData convertToCFMetaData(CfDef cf_def) throws InvalidRequestException, ConfigurationException
+ {
+ String cfType = cf_def.column_type == null ? D_CF_CFTYPE : cf_def.column_type.toString();
+ ClockType clockType = ClockType.create(cf_def.clock_type == null ? D_CF_CFCLOCKTYPE : cf_def.clock_type.toString());
+ String compare = cf_def.comparator_type == null ? D_CF_COMPTYPE : cf_def.comparator_type.toString();
+ String subCompare = cf_def.subcomparator_type == null ? D_CF_SUBCOMPTYPE : cf_def.subcomparator_type.toString();
+ String reconcilerName = cf_def.reconciler == null ? D_CF_RECONCILER : cf_def.reconciler.toString();
+
+ AbstractReconciler reconciler = DatabaseDescriptor.getReconciler(reconcilerName);
+ if (reconciler == null)
+ {
+ if (clockType == ClockType.Timestamp)
+ reconciler = TimestampReconciler.instance; // default
+ else
+ throw new ConfigurationException("No reconciler specified for column family " + cf_def.name);
+ }
+
+ return new CFMetaData(cf_def.keyspace.toString(),
+ cf_def.name.toString(),
+ ColumnFamilyType.create(cfType),
+ clockType,
+ DatabaseDescriptor.getComparator(compare),
+ subCompare.length() == 0 ? null : DatabaseDescriptor.getComparator(subCompare),
+ reconciler,
+ cf_def.comment == null ? "" : cf_def.comment.toString(),
+ cf_def.row_cache_size == null ? CFMetaData.DEFAULT_ROW_CACHE_SIZE : cf_def.row_cache_size,
+ cf_def.preload_row_cache == null ? CFMetaData.DEFAULT_PRELOAD_ROW_CACHE : cf_def.preload_row_cache,
+ cf_def.key_cache_size == null ? CFMetaData.DEFAULT_KEY_CACHE_SIZE : cf_def.key_cache_size,
+ cf_def.read_repair_chance == null ? CFMetaData.DEFAULT_READ_REPAIR_CHANCE : cf_def.read_repair_chance,
+ cf_def.gc_grace_seconds != null ? cf_def.gc_grace_seconds : CFMetaData.DEFAULT_GC_GRACE_SECONDS,
+ ColumnDefinition.fromColumnDefs((Iterable<ColumnDef>) cf_def.column_metadata));
+ }
+
+ private CfDef convertToCfDef(CFMetaData cfMetadata) throws InvalidRequestException
+ {
+ CfDef cfDef = new CfDef();
+ if (cfMetadata.subcolumnComparator != null)
+ {
+ cfDef.subcomparator_type = cfMetadata.subcolumnComparator.getClass().getName();
+ cfDef.column_type = "Super";
+ }
+ cfDef.keyspace = cfMetadata.tableName;
+ cfDef.name = cfMetadata.cfName;
+ cfDef.clock_type = cfMetadata.clockType.name();
+ cfDef.column_type = cfMetadata.cfType.name();
+ cfDef.comment = cfMetadata.comment;
+ cfDef.comparator_type = cfMetadata.comparator.getClass().getName();
+
+ GenericArray<ColumnDef> column_metadata = new GenericData.Array<ColumnDef>(cfMetadata.column_metadata.size(), Schema.createArray(ColumnDef.SCHEMA$));
+ for (ColumnDefinition col_definition : cfMetadata.column_metadata.values())
+ {
+ ColumnDef cdef = new ColumnDef();
+ cdef.name = ByteBuffer.wrap(col_definition.name);
+ cdef.validation_class = col_definition.validator.getClass().getName();
+ cdef.index_name = col_definition.index_name;
+ cdef.index_type = IndexType.valueOf(col_definition.index_type.name());
+ column_metadata.add(cdef);
+ }
+ cfDef.column_metadata = column_metadata;
+ return cfDef;
+ }
+
+ @Override
+ public KsDef describe_keyspace(CharSequence keyspace) throws AvroRemoteException, NotFoundException
+ {
+ KSMetaData ksMetadata = DatabaseDescriptor.getTableDefinition(keyspace.toString());
+ if (ksMetadata == null)
+ throw new NotFoundException();
+
+ KsDef ksDef = new KsDef();
+ ksDef.name = keyspace;
+ ksDef.replication_factor = ksMetadata.replicationFactor;
+ ksDef.strategy_class = ksMetadata.strategyClass.getName();
+ if (ksMetadata.strategyOptions != null)
+ {
+ ksDef.strategy_options = new HashMap<CharSequence, CharSequence>();
+ ksDef.strategy_options.putAll(ksMetadata.strategyOptions);
+ }
+
+ GenericArray<CfDef> cfDefs = new GenericData.Array<CfDef>(ksMetadata.cfMetaData().size(), Schema.createArray(CfDef.SCHEMA$));
+ for (CFMetaData cfm : ksMetadata.cfMetaData().values())
+ {
+ cfDefs.add(convertToCfDef(cfm));
+ }
+ ksDef.cf_defs = cfDefs;
+
+ return ksDef;
+ }
+
+ @Override
+ public CharSequence system_rename_column_family(CharSequence old_name, CharSequence new_name)
+ throws AvroRemoteException, InvalidRequestException
+ {
+ checkKeyspaceAndLoginAuthorized(AccessLevel.FULL);
+
+ try
+ {
+ applyMigrationOnStage(new RenameColumnFamily(curKeyspace.get(), old_name.toString(), new_name.toString()));
+ return DatabaseDescriptor.getDefsVersion().toString();
+ }
+ catch (ConfigurationException e)
+ {
+ InvalidRequestException ex = newInvalidRequestException(e.getMessage());
+ ex.initCause(e);
+ throw ex;
+ }
+ catch (IOException e)
+ {
+ InvalidRequestException ex = newInvalidRequestException(e.getMessage());
+ ex.initCause(e);
+ throw ex;
+ }
+ }
+
+ @Override
+ public CharSequence system_drop_column_family(CharSequence column_family) throws AvroRemoteException, InvalidRequestException
+ {
+ checkKeyspaceAndLoginAuthorized(AccessLevel.FULL);
+
+ try
+ {
+ applyMigrationOnStage(new DropColumnFamily(curKeyspace.get(), column_family.toString(), true));
+ return DatabaseDescriptor.getDefsVersion().toString();
+ }
+ catch (ConfigurationException e)
+ {
+ InvalidRequestException ex = newInvalidRequestException(e.getMessage());
+ ex.initCause(e);
+ throw ex;
+ }
+ catch (IOException e)
+ {
+ InvalidRequestException ex = newInvalidRequestException(e.getMessage());
+ ex.initCause(e);
+ throw ex;
+ }
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/ColumnDefinition.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/ColumnDefinition.java?rev=987640&r1=987639&r2=987640&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/ColumnDefinition.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/ColumnDefinition.java Fri Aug 20 21:30:59 2010
@@ -107,6 +107,14 @@ public class ColumnDefinition {
{
return new ColumnDefinition(cd.name, cd.validation_class, cd.index_type, cd.index_name);
}
+
+ public static ColumnDefinition fromColumnDef(org.apache.cassandra.avro.ColumnDef cd) throws ConfigurationException
+ {
+ return new ColumnDefinition(cd.name.array(),
+ cd.validation_class.toString(),
+ IndexType.valueOf(cd.index_type == null ? org.apache.cassandra.avro.CassandraServer.D_COLDEF_INDEXTYPE : cd.index_type.name()),
+ cd.index_name == null ? org.apache.cassandra.avro.CassandraServer.D_COLDEF_INDEXNAME : cd.index_name.toString());
+ }
public static Map<byte[], ColumnDefinition> fromColumnDef(List<ColumnDef> thriftDefs) throws ConfigurationException
{
@@ -121,4 +129,18 @@ public class ColumnDefinition {
return Collections.unmodifiableMap(cds);
}
+
+ public static Map<byte[], ColumnDefinition> fromColumnDefs(Iterable<org.apache.cassandra.avro.ColumnDef> avroDefs) throws ConfigurationException
+ {
+ if (avroDefs == null)
+ return Collections.emptyMap();
+
+ Map<byte[], ColumnDefinition> cds = new TreeMap<byte[], ColumnDefinition>(FBUtilities.byteArrayComparator);
+ for (org.apache.cassandra.avro.ColumnDef avroColumnDef : avroDefs)
+ {
+ cds.put(avroColumnDef.name.array(), fromColumnDef(avroColumnDef));
+ }
+
+ return Collections.unmodifiableMap(cds);
+ }
}
Modified: cassandra/trunk/test/system/test_avro_server.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_avro_server.py?rev=987640&r1=987639&r2=987640&view=diff
==============================================================================
--- cassandra/trunk/test/system/test_avro_server.py (original)
+++ cassandra/trunk/test/system/test_avro_server.py Fri Aug 20 21:30:59 2010
@@ -315,6 +315,14 @@ class TestRpcOperations(AvroTester):
keyspaces = self.client.request('describe_keyspaces', {})
assert 'Keyspace1' in keyspaces, "Keyspace1 not in " + keyspaces
+ def test_describe_keyspace(self):
+ "retrieving a keyspace metadata"
+ ks1 = self.client.request('describe_keyspace',
+ {'keyspace': "Keyspace1"})
+ assert ks1['replication_factor'] == 1
+ cf0 = ks1['cf_defs'][0]
+ assert cf0['comparator_type'] == "org.apache.cassandra.db.marshal.BytesType"
+
def test_describe_cluster_name(self):
"retrieving the cluster name"
name = self.client.request('describe_cluster_name', {})
@@ -327,6 +335,43 @@ class TestRpcOperations(AvroTester):
segs = vers.split('.')
assert len(segs) == 3 and len([i for i in segs if i.isdigit()]) == 3, \
"incorrect api version format: " + vers
+
+ def test_system_column_family_operations(self):
+ "adding, renaming, and removing column families"
+ self.__set_keyspace('Keyspace1')
+
+ # create
+ columnDef = dict()
+ columnDef['name'] = b'ValidationColumn'
+ columnDef['validation_class'] = 'BytesType'
+
+ cfDef = dict()
+ cfDef['keyspace'] = 'Keyspace1'
+ cfDef['name'] = 'NewColumnFamily'
+ cfDef['column_metadata'] = [columnDef]
+ s = self.client.request('system_add_column_family', {'cf_def' : cfDef})
+ assert isinstance(s, unicode), \
+ 'returned type is %s, (not \'unicode\')' % type(s)
+
+ ks1 = self.client.request(
+ 'describe_keyspace', {'keyspace' : 'Keyspace1'})
+ assert 'NewColumnFamily' in [x['name'] for x in ks1['cf_defs']]
+
+ # rename
+ self.client.request('system_rename_column_family',
+ {'old_name' : 'NewColumnFamily', 'new_name': 'RenameColumnFamily'})
+ ks1 = self.client.request(
+ 'describe_keyspace', {'keyspace' : 'Keyspace1'})
+ assert 'RenameColumnFamily' in [x['name'] for x in ks1['cf_defs']]
+
+ # drop
+ self.client.request('system_drop_column_family',
+ {'column_family' : 'RenameColumnFamily'})
+ ks1 = self.client.request(
+ 'describe_keyspace', {'keyspace' : 'Keyspace1'})
+ assert 'RenameColumnFamily' not in [x['name'] for x in ks1['cf_defs']]
+ assert 'NewColumnFamily' not in [x['name'] for x in ks1['cf_defs']]
+ assert 'Standard1' in [x['name'] for x in ks1['cf_defs']]
def __get(self, key, cf, super_name, col_name, consistency_level='ONE'):
"""