You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ee...@apache.org on 2010/08/20 23:30:59 UTC

svn commit: r987640 - in /cassandra/trunk: interface/cassandra.genavro src/java/org/apache/cassandra/avro/CassandraServer.java src/java/org/apache/cassandra/config/ColumnDefinition.java test/system/test_avro_server.py

Author: eevans
Date: Fri Aug 20 21:30:59 2010
New Revision: 987640

URL: http://svn.apache.org/viewvc?rev=987640&view=rev
Log:
several new avro rpc method implementations

Patch by Antoine Toulme w/ some changes by eevans for CASSANDRA-1238

Modified:
    cassandra/trunk/interface/cassandra.genavro
    cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
    cassandra/trunk/src/java/org/apache/cassandra/config/ColumnDefinition.java
    cassandra/trunk/test/system/test_avro_server.py

Modified: cassandra/trunk/interface/cassandra.genavro
URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/cassandra.genavro?rev=987640&r1=987639&r2=987640&view=diff
==============================================================================
--- cassandra/trunk/interface/cassandra.genavro (original)
+++ cassandra/trunk/interface/cassandra.genavro Fri Aug 20 21:30:59 2010
@@ -23,7 +23,7 @@
 
 protocol Cassandra {
     enum AccessLevel {
-        NONE, READONLY, READWRITE, FALL
+        NONE, READONLY, READWRITE, FULL
     }
 
     record ColumnPath {
@@ -213,11 +213,22 @@ protocol Cassandra {
                       ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException;
 
+    string system_add_column_family(CfDef cf_def)
+    throws InvalidRequestException;
+
     void system_add_keyspace(KsDef ks_def) throws InvalidRequestException;
+    
+    string system_rename_column_family(string old_name, string new_name)
+    throws InvalidRequestException;
+    
+    string system_drop_column_family(string column_family)
+    throws InvalidRequestException;
 
     void set_keyspace(string keyspace) throws InvalidRequestException;
 
     array<string> describe_keyspaces();
+    
+    KsDef describe_keyspace(string keyspace) throws NotFoundException;
 
     string describe_cluster_name();
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java?rev=987640&r1=987639&r2=987640&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java Fri Aug 20 21:30:59 2010
@@ -30,6 +30,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.avro.Schema;
@@ -37,10 +40,11 @@ import org.apache.avro.generic.GenericAr
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.ipc.AvroRemoteException;
 import org.apache.avro.util.Utf8;
-import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.avro.InvalidRequestException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.auth.AllowAllAuthenticator;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
@@ -53,7 +57,11 @@ import org.apache.cassandra.db.clock.Abs
 import org.apache.cassandra.db.clock.TimestampReconciler;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.MarshalException;
+import org.apache.cassandra.db.migration.AddColumnFamily;
 import org.apache.cassandra.db.migration.AddKeyspace;
+import org.apache.cassandra.db.migration.DropColumnFamily;
+import org.apache.cassandra.db.migration.Migration;
+import org.apache.cassandra.db.migration.RenameColumnFamily;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.scheduler.IRequestScheduler;
 import org.apache.cassandra.service.StorageProxy;
@@ -76,6 +84,10 @@ public class CassandraServer implements 
     private final static String D_CF_SUBCOMPTYPE = "";
     private final static String D_CF_RECONCILER = null;
     
+    //ColumnDef default values
+    public final static String D_COLDEF_INDEXTYPE = "KEYS";
+    public final static String D_COLDEF_INDEXNAME = null;
+    
     private ThreadLocal<AccessLevel> loginDone = new ThreadLocal<AccessLevel>()
     {
         @Override
@@ -548,6 +560,46 @@ public class CassandraServer implements 
         }
     }
     
+    // Copy-pasted from the thrift CassandraServer, using the factory methods to create exceptions.
+    // helper method to apply migration on the migration stage. typical migration failures will throw an 
+    // InvalidRequestException. atypical failures will throw a RuntimeException.
+    private static void applyMigrationOnStage(final Migration m) throws InvalidRequestException
+    {
+        Future f = StageManager.getStage(StageManager.MIGRATION_STAGE).submit(new Callable()
+        {
+            public Object call() throws Exception
+            {
+                m.apply();
+                m.announce();
+                return null;
+            }
+        });
+        try
+        {
+            f.get();
+        }
+        catch (InterruptedException e)
+        {
+            throw new RuntimeException(e);
+        }
+        catch (ExecutionException e)
+        {
+            // this means call() threw an exception. deal with it directly.
+            if (e.getCause() != null)
+            {
+                InvalidRequestException ex = newInvalidRequestException(e.getCause().getMessage());
+                ex.initCause(e.getCause());
+                throw ex;
+            }
+            else
+            {
+                InvalidRequestException ex = newInvalidRequestException(e.getMessage());
+                ex.initCause(e);
+                throw ex;
+            }
+        }
+    }
+    
     private org.apache.cassandra.thrift.ConsistencyLevel thriftConsistencyLevel(ConsistencyLevel consistency)
     {
         switch (consistency)
@@ -678,6 +730,28 @@ public class CassandraServer implements 
     }
 
     @Override
+    public CharSequence system_add_column_family(CfDef cfDef) throws AvroRemoteException, InvalidRequestException
+    {
+        checkKeyspaceAndLoginAuthorized(AccessLevel.FULL);
+        try
+        {
+            applyMigrationOnStage(new AddColumnFamily(convertToCFMetaData(cfDef)));
+            return DatabaseDescriptor.getDefsVersion().toString();
+        } catch (ConfigurationException e)
+        {
+            InvalidRequestException ex = newInvalidRequestException(e.getMessage());
+            ex.initCause(e);
+            throw ex;
+        }
+        catch (IOException e)
+        {
+            InvalidRequestException ex = newInvalidRequestException(e.getMessage());
+            ex.initCause(e);
+            throw ex;
+        }
+    }
+
+    @Override
     public GenericArray<CharSequence> describe_keyspaces() throws AvroRemoteException
     {
         Set<String> keyspaces = DatabaseDescriptor.getTables();
@@ -708,6 +782,22 @@ public class CassandraServer implements 
         logger.debug("checking schema agreement");      
         return StorageProxy.checkSchemaAgreement();
     }
+    
+    protected void checkKeyspaceAndLoginAuthorized(AccessLevel level) throws InvalidRequestException
+    {
+        if (curKeyspace.get() == null)
+        {
+            throw newInvalidRequestException("You have not assigned a keyspace; please use set_keyspace (and login if necessary)");
+        }
+        
+        if (!(DatabaseDescriptor.getAuthenticator() instanceof AllowAllAuthenticator))
+        {
+            if (loginDone.get() == null)
+                throw newInvalidRequestException("You have not logged into keyspace " + curKeyspace.get());
+            if (loginDone.get().compareTo(level) < 0)
+                throw newInvalidRequestException("Your credentials are not sufficient to perform " + level + " operations");
+        }
+    }
 
     /**
      * Schedule the current thread for access to the required services
@@ -724,4 +814,142 @@ public class CassandraServer implements 
     {
         requestScheduler.release();
     }
+    
+    private CFMetaData convertToCFMetaData(CfDef cf_def) throws InvalidRequestException, ConfigurationException
+    {
+        String cfType = cf_def.column_type == null ? D_CF_CFTYPE : cf_def.column_type.toString();
+        ClockType clockType = ClockType.create(cf_def.clock_type == null ? D_CF_CFCLOCKTYPE : cf_def.clock_type.toString());
+        String compare = cf_def.comparator_type == null ? D_CF_COMPTYPE : cf_def.comparator_type.toString();
+        String subCompare = cf_def.subcomparator_type == null ? D_CF_SUBCOMPTYPE : cf_def.subcomparator_type.toString();
+        String reconcilerName = cf_def.reconciler == null  ? D_CF_RECONCILER : cf_def.reconciler.toString();
+        
+        AbstractReconciler reconciler = DatabaseDescriptor.getReconciler(reconcilerName);
+        if (reconciler == null)
+        {
+            if (clockType == ClockType.Timestamp)    
+                reconciler = TimestampReconciler.instance; // default
+            else
+                throw new ConfigurationException("No reconciler specified for column family " + cf_def.name);
+        }
+
+        return new CFMetaData(cf_def.keyspace.toString(),
+                              cf_def.name.toString(),
+                              ColumnFamilyType.create(cfType),
+                              clockType,
+                              DatabaseDescriptor.getComparator(compare),
+                              subCompare.length() == 0 ? null : DatabaseDescriptor.getComparator(subCompare),
+                              reconciler,
+                              cf_def.comment == null ? "" : cf_def.comment.toString(),
+                              cf_def.row_cache_size == null ? CFMetaData.DEFAULT_ROW_CACHE_SIZE : cf_def.row_cache_size,
+                              cf_def.preload_row_cache == null ? CFMetaData.DEFAULT_PRELOAD_ROW_CACHE : cf_def.preload_row_cache,
+                              cf_def.key_cache_size == null ? CFMetaData.DEFAULT_KEY_CACHE_SIZE : cf_def.key_cache_size,
+                              cf_def.read_repair_chance == null ? CFMetaData.DEFAULT_READ_REPAIR_CHANCE : cf_def.read_repair_chance,
+                              cf_def.gc_grace_seconds != null ? cf_def.gc_grace_seconds : CFMetaData.DEFAULT_GC_GRACE_SECONDS,
+                              ColumnDefinition.fromColumnDefs((Iterable<ColumnDef>) cf_def.column_metadata));
+    }
+    
+    private CfDef convertToCfDef(CFMetaData cfMetadata) throws InvalidRequestException
+    {
+        CfDef cfDef = new CfDef();
+        if (cfMetadata.subcolumnComparator != null)
+        {
+            cfDef.subcomparator_type = cfMetadata.subcolumnComparator.getClass().getName();
+            cfDef.column_type = "Super";
+        }
+        cfDef.keyspace = cfMetadata.tableName;
+        cfDef.name = cfMetadata.cfName;
+        cfDef.clock_type = cfMetadata.clockType.name();
+        cfDef.column_type = cfMetadata.cfType.name();
+        cfDef.comment = cfMetadata.comment;
+        cfDef.comparator_type = cfMetadata.comparator.getClass().getName();
+        
+        GenericArray<ColumnDef> column_metadata = new GenericData.Array<ColumnDef>(cfMetadata.column_metadata.size(), Schema.createArray(ColumnDef.SCHEMA$));
+        for (ColumnDefinition col_definition : cfMetadata.column_metadata.values())
+        {
+            ColumnDef cdef = new ColumnDef();
+            cdef.name = ByteBuffer.wrap(col_definition.name);
+            cdef.validation_class = col_definition.validator.getClass().getName();
+            cdef.index_name = col_definition.index_name;
+            cdef.index_type = IndexType.valueOf(col_definition.index_type.name());
+            column_metadata.add(cdef);
+        }
+        cfDef.column_metadata = column_metadata;
+        return cfDef;
+    }
+
+    @Override
+    public KsDef describe_keyspace(CharSequence keyspace) throws AvroRemoteException, NotFoundException
+    {
+        KSMetaData ksMetadata = DatabaseDescriptor.getTableDefinition(keyspace.toString());
+        if (ksMetadata == null)
+            throw new NotFoundException();
+        
+        KsDef ksDef = new KsDef();
+        ksDef.name = keyspace;
+        ksDef.replication_factor = ksMetadata.replicationFactor;
+        ksDef.strategy_class = ksMetadata.strategyClass.getName();
+        if (ksMetadata.strategyOptions != null)
+        {
+            ksDef.strategy_options = new HashMap<CharSequence, CharSequence>();
+            ksDef.strategy_options.putAll(ksMetadata.strategyOptions);
+        }
+        
+        GenericArray<CfDef> cfDefs = new GenericData.Array<CfDef>(ksMetadata.cfMetaData().size(), Schema.createArray(CfDef.SCHEMA$));
+        for (CFMetaData cfm : ksMetadata.cfMetaData().values())
+        {
+            cfDefs.add(convertToCfDef(cfm));
+        }
+        ksDef.cf_defs = cfDefs;
+        
+        return ksDef;
+    }
+
+    @Override
+    public CharSequence system_rename_column_family(CharSequence old_name, CharSequence new_name)
+    throws AvroRemoteException, InvalidRequestException
+    {
+        checkKeyspaceAndLoginAuthorized(AccessLevel.FULL);
+        
+        try
+        {
+            applyMigrationOnStage(new RenameColumnFamily(curKeyspace.get(), old_name.toString(), new_name.toString()));
+            return DatabaseDescriptor.getDefsVersion().toString();
+        }
+        catch (ConfigurationException e)
+        {
+            InvalidRequestException ex = newInvalidRequestException(e.getMessage());
+            ex.initCause(e);
+            throw ex;
+        }
+        catch (IOException e)
+        {
+            InvalidRequestException ex = newInvalidRequestException(e.getMessage());
+            ex.initCause(e);
+            throw ex;
+        }
+    }
+
+    @Override
+    public CharSequence system_drop_column_family(CharSequence column_family) throws AvroRemoteException, InvalidRequestException
+    {
+        checkKeyspaceAndLoginAuthorized(AccessLevel.FULL);
+        
+        try
+        {
+            applyMigrationOnStage(new DropColumnFamily(curKeyspace.get(), column_family.toString(), true));
+            return DatabaseDescriptor.getDefsVersion().toString();
+        }
+        catch (ConfigurationException e)
+        {
+            InvalidRequestException ex = newInvalidRequestException(e.getMessage());
+            ex.initCause(e);
+            throw ex;
+        }
+        catch (IOException e)
+        {
+            InvalidRequestException ex = newInvalidRequestException(e.getMessage());
+            ex.initCause(e);
+            throw ex;
+        }
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/ColumnDefinition.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/ColumnDefinition.java?rev=987640&r1=987639&r2=987640&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/ColumnDefinition.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/ColumnDefinition.java Fri Aug 20 21:30:59 2010
@@ -107,6 +107,14 @@ public class ColumnDefinition {
     {
         return new ColumnDefinition(cd.name, cd.validation_class, cd.index_type, cd.index_name);
     }
+    
+    public static ColumnDefinition fromColumnDef(org.apache.cassandra.avro.ColumnDef cd) throws ConfigurationException
+    {
+        return new ColumnDefinition(cd.name.array(),
+                cd.validation_class.toString(),
+                IndexType.valueOf(cd.index_type == null ? org.apache.cassandra.avro.CassandraServer.D_COLDEF_INDEXTYPE : cd.index_type.name()),
+                cd.index_name == null ? org.apache.cassandra.avro.CassandraServer.D_COLDEF_INDEXNAME : cd.index_name.toString());
+    }
 
     public static Map<byte[], ColumnDefinition> fromColumnDef(List<ColumnDef> thriftDefs) throws ConfigurationException
     {
@@ -121,4 +129,18 @@ public class ColumnDefinition {
 
         return Collections.unmodifiableMap(cds);
     }
+    
+    public static Map<byte[], ColumnDefinition> fromColumnDefs(Iterable<org.apache.cassandra.avro.ColumnDef> avroDefs) throws ConfigurationException
+    {
+        if (avroDefs == null)
+            return Collections.emptyMap();
+
+        Map<byte[], ColumnDefinition> cds = new TreeMap<byte[], ColumnDefinition>(FBUtilities.byteArrayComparator);
+        for (org.apache.cassandra.avro.ColumnDef avroColumnDef : avroDefs)
+        {
+            cds.put(avroColumnDef.name.array(), fromColumnDef(avroColumnDef));
+        }
+
+        return Collections.unmodifiableMap(cds);
+    }
 }

Modified: cassandra/trunk/test/system/test_avro_server.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_avro_server.py?rev=987640&r1=987639&r2=987640&view=diff
==============================================================================
--- cassandra/trunk/test/system/test_avro_server.py (original)
+++ cassandra/trunk/test/system/test_avro_server.py Fri Aug 20 21:30:59 2010
@@ -315,6 +315,14 @@ class TestRpcOperations(AvroTester):
         keyspaces = self.client.request('describe_keyspaces', {})
         assert 'Keyspace1' in keyspaces, "Keyspace1 not in " + keyspaces
 
+    def test_describe_keyspace(self):
+        "retrieving a keyspace metadata"
+        ks1 = self.client.request('describe_keyspace',
+                {'keyspace': "Keyspace1"})
+        assert ks1['replication_factor'] == 1
+        cf0 = ks1['cf_defs'][0]
+        assert cf0['comparator_type'] == "org.apache.cassandra.db.marshal.BytesType"
+
     def test_describe_cluster_name(self):
         "retrieving the cluster name"
         name = self.client.request('describe_cluster_name', {})
@@ -327,6 +335,43 @@ class TestRpcOperations(AvroTester):
         segs = vers.split('.')
         assert len(segs) == 3 and len([i for i in segs if i.isdigit()]) == 3, \
                "incorrect api version format: " + vers
+              
+    def test_system_column_family_operations(self):
+        "adding, renaming, and removing column families"
+        self.__set_keyspace('Keyspace1')
+        
+        # create
+        columnDef = dict()
+        columnDef['name'] = b'ValidationColumn'
+        columnDef['validation_class'] = 'BytesType'
+        
+        cfDef = dict()
+        cfDef['keyspace'] = 'Keyspace1'
+        cfDef['name'] = 'NewColumnFamily'
+        cfDef['column_metadata'] = [columnDef]
+        s = self.client.request('system_add_column_family', {'cf_def' : cfDef})
+        assert isinstance(s, unicode), \
+            'returned type is %s, (not \'unicode\')' % type(s)
+        
+        ks1 = self.client.request(
+            'describe_keyspace', {'keyspace' : 'Keyspace1'})
+        assert 'NewColumnFamily' in [x['name'] for x in ks1['cf_defs']]
+
+        # rename
+        self.client.request('system_rename_column_family',
+            {'old_name' : 'NewColumnFamily', 'new_name': 'RenameColumnFamily'})
+        ks1 = self.client.request(
+            'describe_keyspace', {'keyspace' : 'Keyspace1'})
+        assert 'RenameColumnFamily' in [x['name'] for x in ks1['cf_defs']]
+
+        # drop
+        self.client.request('system_drop_column_family',
+            {'column_family' : 'RenameColumnFamily'})
+        ks1 = self.client.request(
+                'describe_keyspace', {'keyspace' : 'Keyspace1'})
+        assert 'RenameColumnFamily' not in [x['name'] for x in ks1['cf_defs']]
+        assert 'NewColumnFamily' not in [x['name'] for x in ks1['cf_defs']]
+        assert 'Standard1' in [x['name'] for x in ks1['cf_defs']]
 
     def __get(self, key, cf, super_name, col_name, consistency_level='ONE'):
         """