You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/07/09 21:58:22 UTC

svn commit: r962675 - in /cassandra/trunk: ./ conf/ src/java/org/apache/cassandra/avro/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/scheduler/ src/java/org/apache/cassandra/thrift/ test/conf/ test/unit/org/apache/cassandra/sched...

Author: jbellis
Date: Fri Jul  9 19:58:21 2010
New Revision: 962675

URL: http://svn.apache.org/viewvc?rev=962675&view=rev
Log:
implement keyspace round-robin scheduler.  patch by Nirmal Ranganathan; reviewed by Stu Hood and jbellis for CASSANDRA-1035

Added:
    cassandra/trunk/src/java/org/apache/cassandra/scheduler/
    cassandra/trunk/src/java/org/apache/cassandra/scheduler/IRequestScheduler.java
    cassandra/trunk/src/java/org/apache/cassandra/scheduler/NoScheduler.java
    cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
    cassandra/trunk/test/unit/org/apache/cassandra/scheduler/
    cassandra/trunk/test/unit/org/apache/cassandra/scheduler/RoundRobinSchedulerTest.java
Modified:
    cassandra/trunk/NEWS.txt
    cassandra/trunk/conf/cassandra.yaml
    cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
    cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
    cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
    cassandra/trunk/test/conf/cassandra.yaml

Modified: cassandra/trunk/NEWS.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/NEWS.txt?rev=962675&r1=962674&r2=962675&view=diff
==============================================================================
--- cassandra/trunk/NEWS.txt (original)
+++ cassandra/trunk/NEWS.txt Fri Jul  9 19:58:21 2010
@@ -18,6 +18,8 @@ Features
     - Streaming data for repair or node movement no longer requires 
       anticompaction step first
     - keyspace is per-connection in the thrift API instead of per-call
+    - optional round-robin scheduling between keyspaces for multitenant
+      clusters
 
 Configuraton
 ------------

Modified: cassandra/trunk/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=962675&r1=962674&r2=962675&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra.yaml (original)
+++ cassandra/trunk/conf/cassandra.yaml Fri Jul  9 19:58:21 2010
@@ -141,6 +141,35 @@ gc_grace_seconds: 864000
 # org.apache.cassandra.locator.PropertyFileSnitch.
 endpoint_snitch: org.apache.cassandra.locator.SimpleSnitch
 
+# request_scheduler -- Set this to a class that implements
+# RequestScheduler, which will schedule incoming client requests
+# according to the specific policy. This is useful for multi-tenancy
+# with a single Cassandra cluster.
+# NOTE: This is specifically for requests from the client and does
+# not affect inter node communication.
+# org.apache.cassandra.scheduler.NoScheduler - No scheduling takes place
+# org.apache.cassandra.scheduler.RoundRobinScheduler - Round robin of
+# client requests to a node with a sepearte queue for each
+# reques_scheduler_id. The requests are throttled based on the limit set
+# in throttle_limit in the requeset_scheduler_options
+request_scheduler: org.apache.cassandra.scheduler.NoScheduler
+
+# Scheduler Options vary based on the type of scheduler
+# NoScheduler - Has no options
+# RoundRobin
+#  - throttle_limit -- The throttle_limit is the number of in-flight
+#                      requests per client.  Requests beyond 
+#                      that limit are queued up until
+#                      running requests can complete.
+#                      The value of 80 here is twice the number of
+#                      concurrent_reads + concurrent_writes.
+# request_scheduler_options:
+#    throttle_limit: 80
+
+# request_scheduler_id -- An identifer based on which to perform
+# the request scheduling. The current supported option is "keyspace"
+request_scheduler_id: keyspace
+
 # A ColumnFamily is the Cassandra concept closest to a relational table. 
 #
 # Keyspaces are separate groups of ColumnFamilies.  Except in very

Modified: cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java?rev=962675&r1=962674&r2=962675&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java Fri Jul  9 19:58:21 2010
@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.KSMetaData;
@@ -53,6 +54,7 @@ import org.apache.cassandra.db.filter.Qu
 import org.apache.cassandra.db.marshal.MarshalException;
 import org.apache.cassandra.db.migration.AddKeyspace;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.scheduler.IRequestScheduler;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
 
@@ -84,12 +86,27 @@ public class CassandraServer implements 
     // Session keyspace.
     private ThreadLocal<String> curKeyspace = new ThreadLocal<String>();
 
+    /*
+     * An associated Id for scheduling the requests
+     */
+    private ThreadLocal<String> requestSchedulerId = new ThreadLocal<String>();
+
+    /*
+     * RequestScheduler to perform the scheduling of incoming requests
+     */
+    private final IRequestScheduler requestScheduler;
+
+    public CassandraServer()
+    {
+        requestScheduler = DatabaseDescriptor.getRequestScheduler();
+    }
+
     @Override
     public ColumnOrSuperColumn get(ByteBuffer key, ColumnPath columnPath, ConsistencyLevel consistencyLevel)
     throws AvroRemoteException, InvalidRequestException, NotFoundException, UnavailableException, TimedOutException {
         if (logger.isDebugEnabled())
             logger.debug("get");
-        
+
         AvroValidation.validateColumnPath(curKeyspace.get(), columnPath);
         
         // FIXME: This is repetitive.
@@ -129,6 +146,7 @@ public class CassandraServer implements 
         List<Row> rows;
         try
         {
+            schedule();
             rows = StorageProxy.readProtocol(commands, thriftConsistencyLevel(consistency));
         }
         catch (TimeoutException e) 
@@ -145,6 +163,10 @@ public class CassandraServer implements 
         {
             throw new UnavailableException();
         }
+        finally
+        {
+            release();
+        }
 
         for (Row row: rows)
         {
@@ -398,6 +420,7 @@ public class CassandraServer implements 
         {
             try
             {
+                schedule();
                 StorageProxy.mutateBlocking(Arrays.asList(rm), thriftConsistencyLevel(consistency));
             }
             catch (TimeoutException e)
@@ -408,10 +431,22 @@ public class CassandraServer implements 
             {
                 throw new UnavailableException();
             }
+            finally
+            {
+                release();
+            }
         }
         else
         {
-            StorageProxy.mutate(Arrays.asList(rm));
+            try
+            {
+                schedule();
+                StorageProxy.mutate(Arrays.asList(rm));
+            }
+            finally
+            {
+                release();
+            }
         }
     }
 
@@ -441,12 +476,21 @@ public class CassandraServer implements 
         
         if (consistencyLevel == ConsistencyLevel.ZERO)
         {
-            StorageProxy.mutate(rowMutations);
+            try
+            {
+                schedule();
+                StorageProxy.mutate(rowMutations);
+            }
+            finally
+            {
+                release();
+            }
         }
         else
         {
             try
             {
+                schedule();
                 StorageProxy.mutateBlocking(rowMutations, thriftConsistencyLevel(consistencyLevel));
             }
             catch (TimeoutException te)
@@ -458,6 +502,10 @@ public class CassandraServer implements 
             {
                 throw newUnavailableException();
             }
+            finally
+            {
+                release();
+            }
         }
         
         return null;
@@ -553,7 +601,11 @@ public class CassandraServer implements 
             loginDone.set(AccessLevel.NONE);
         
         this.curKeyspace.set(keyspaceStr);
-        
+
+        if (DatabaseDescriptor.getRequestSchedulerId().equals(Config.RequestSchedulerId.keyspace)) {
+            requestSchedulerId.set(curKeyspace.get());
+        }
+
         return null;
     }
 
@@ -663,4 +715,20 @@ public class CassandraServer implements 
         logger.debug("checking schema agreement");      
         return StorageProxy.checkSchemaAgreement();
     }
+
+    /**
+     * Schedule the current thread for access to the required services
+     */
+    private void schedule()
+    {
+        requestScheduler.queue(Thread.currentThread(), requestSchedulerId.get());
+    }
+
+    /**
+     * Release a count of resources used to the request scheduler
+     */
+    private void release()
+    {
+        requestScheduler.release();
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=962675&r1=962674&r2=962675&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Fri Jul  9 19:58:21 2010
@@ -72,6 +72,10 @@ public class Config {
     
     public String endpoint_snitch;
     
+    public String request_scheduler;
+    public RequestSchedulerId request_scheduler_id;
+    public RequestSchedulerOptions request_scheduler_options;
+
     public List<Keyspace> keyspaces;
     
     public static enum CommitLogSync {
@@ -86,4 +90,8 @@ public class Config {
         standard,
     }
     
+    public static enum RequestSchedulerId
+    {
+        keyspace
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=962675&r1=962674&r2=962675&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Fri Jul  9 19:58:21 2010
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.auth.AllowAllAuthenticator;
 import org.apache.cassandra.auth.IAuthenticator;
+import org.apache.cassandra.config.Config.RequestSchedulerId;
 import org.apache.cassandra.db.ClockType;
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.db.DefsTable;
@@ -46,6 +47,8 @@ import org.apache.cassandra.dht.IPartiti
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.scheduler.IRequestScheduler;
+import org.apache.cassandra.scheduler.NoScheduler;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
@@ -85,6 +88,10 @@ public class DatabaseDescriptor
 
     private final static String STORAGE_CONF_FILE = "cassandra.yaml";
 
+    private static IRequestScheduler requestScheduler;
+    private static RequestSchedulerId requestSchedulerId;
+    private static RequestSchedulerOptions requestSchedulerOptions;
+
     public static final UUID INITIAL_VERSION = new UUID(4096, 0); // has type nibble set to 1, everything else to zero.
     private static UUID defsVersion = INITIAL_VERSION;
 
@@ -259,6 +266,39 @@ public class DatabaseDescriptor
             }
             snitch = createEndpointSnitch(conf.endpoint_snitch);
             
+            /* Request Scheduler setup */
+            requestSchedulerOptions = conf.request_scheduler_options;
+            if (conf.request_scheduler != null)
+            {
+                try
+                {
+                    if (requestSchedulerOptions == null)
+                    {
+                        requestSchedulerOptions = new RequestSchedulerOptions();
+                    }
+                    Class cls = Class.forName(conf.request_scheduler);
+                    requestScheduler = (IRequestScheduler) cls.getConstructor(RequestSchedulerOptions.class).newInstance(requestSchedulerOptions);
+                }
+                catch (ClassNotFoundException e)
+                {
+                    throw new ConfigurationException("Invalid Request Scheduler class " + conf.request_scheduler);
+                }
+            }
+            else
+            {
+                requestScheduler = new NoScheduler();
+            }
+
+            if (conf.request_scheduler_id == RequestSchedulerId.keyspace)
+            {
+                requestSchedulerId = conf.request_scheduler_id;
+            }
+            else
+            {
+                // Default to Keyspace
+                requestSchedulerId = RequestSchedulerId.keyspace;
+            }
+
             if (logger.isDebugEnabled() && conf.auto_bootstrap != null)
             {
                 logger.debug("setting auto_bootstrap to " + conf.auto_bootstrap);
@@ -716,6 +756,21 @@ public class DatabaseDescriptor
         return snitch;
     }
 
+    public static IRequestScheduler getRequestScheduler()
+    {
+        return requestScheduler;
+    }
+
+    public static RequestSchedulerOptions getRequestSchedulerOptions()
+    {
+        return requestSchedulerOptions;
+    }
+
+    public static RequestSchedulerId getRequestSchedulerId()
+    {
+        return requestSchedulerId;
+    }
+
     public static Class<? extends AbstractReplicationStrategy> getReplicaPlacementStrategyClass(String table)
     {
     	KSMetaData meta = tables.get(table);

Added: cassandra/trunk/src/java/org/apache/cassandra/scheduler/IRequestScheduler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/scheduler/IRequestScheduler.java?rev=962675&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/scheduler/IRequestScheduler.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/scheduler/IRequestScheduler.java Fri Jul  9 19:58:21 2010
@@ -0,0 +1,41 @@
+package org.apache.cassandra.scheduler;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+/**
+ * Implementors of IRequestScheduler must provide a constructor taking a RequestSchedulerOptions object.
+ */
+public interface IRequestScheduler
+{
+    /**
+     * Queue incoming request threads
+     * 
+     * @param t Thread handing the request
+     * @param id    Scheduling parameter, an id to distinguish profiles (users/keyspace)
+     */
+    public void queue(Thread t, String id);
+
+    /**
+     * A convenience method for indicating when a particular request has completed
+     * processing, and before a return to the client
+     */
+    public void release();
+}

Added: cassandra/trunk/src/java/org/apache/cassandra/scheduler/NoScheduler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/scheduler/NoScheduler.java?rev=962675&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/scheduler/NoScheduler.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/scheduler/NoScheduler.java Fri Jul  9 19:58:21 2010
@@ -0,0 +1,40 @@
+package org.apache.cassandra.scheduler;
+
+import org.apache.cassandra.config.RequestSchedulerOptions;
+
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+/**
+ * This is basically not having a scheduler, the requests are
+ * processed as normally would be handled by the JVM. 
+ */
+public class NoScheduler implements IRequestScheduler
+{
+
+    public NoScheduler(RequestSchedulerOptions options) {}
+
+    public NoScheduler() {}
+
+    public void queue(Thread t, String id) {}
+
+    public void release() {}
+}

Added: cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java?rev=962675&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java Fri Jul  9 19:58:21 2010
@@ -0,0 +1,126 @@
+package org.apache.cassandra.scheduler;
+
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.SynchronousQueue;
+
+import org.apache.cassandra.config.RequestSchedulerOptions;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A very basic Round Robin implementation of the RequestScheduler. It handles 
+ * request groups identified on user/keyspace by placing them in separate 
+ * queues and servicing a request from each queue in a RoundRobin fashion. 
+ */
+public class RoundRobinScheduler implements IRequestScheduler
+{
+    private static final Logger logger = LoggerFactory.getLogger(RoundRobinScheduler.class);
+    private final NonBlockingHashMap<String, SynchronousQueue<Thread>> queues;
+    private static boolean started = false;
+
+    private final Semaphore taskCount;
+
+    // Used by the the scheduler thread so we don't need to busy-wait until there is a request to process
+    private final Semaphore queueSize = new Semaphore(0, false);
+
+    public RoundRobinScheduler(RequestSchedulerOptions options)
+    {
+        assert !started;
+
+        taskCount = new Semaphore(options.throttle_limit);
+        queues = new NonBlockingHashMap<String, SynchronousQueue<Thread>>();
+        Runnable runnable = new Runnable()
+        {
+            public void run()
+            {
+                while (true)
+                {
+                    schedule();
+                }
+            }
+        };
+        Thread scheduler = new Thread(runnable, "REQUEST-SCHEDULER");
+        scheduler.start();
+        logger.info("Started the RoundRobin Request Scheduler");
+        started = true;
+    }
+
+    public void queue(Thread t, String id)
+    {
+        SynchronousQueue<Thread> queue = getQueue(id);
+
+        try
+        {
+            queueSize.release();
+            queue.put(t);
+        }
+        catch (InterruptedException e)
+        {
+            throw new RuntimeException("Interrupted while queueing requests", e);
+        }
+    }
+
+    public void release()
+    {
+        taskCount.release();
+    }
+
+    private void schedule()
+    {
+        queueSize.acquireUninterruptibly();
+        for (SynchronousQueue<Thread> queue : queues.values())
+        {
+            Thread t = queue.poll();
+            if (t != null)
+            {
+                taskCount.acquireUninterruptibly();
+                queueSize.acquireUninterruptibly();
+            }
+        }
+        queueSize.release();
+    }
+
+    /*
+     * Get the Queue for the respective id, if one is not available 
+     * create a new queue for that corresponding id and return it
+     */
+    private SynchronousQueue<Thread> getQueue(String id)
+    {
+        SynchronousQueue<Thread> queue = queues.get(id);
+        if (queue != null)
+            // queue existed
+            return queue;
+
+        SynchronousQueue<Thread> maybenew = new SynchronousQueue<Thread>(true);
+        queue = queues.putIfAbsent(id, maybenew);
+        if (queue == null)
+            // created new queue
+            return maybenew;
+
+        // another thread created the queue
+        return queue;
+    }
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=962675&r1=962674&r2=962675&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Fri Jul  9 19:58:21 2010
@@ -27,6 +27,15 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.auth.AllowAllAuthenticator;
 import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.db.migration.AddColumnFamily;
+import org.apache.cassandra.db.migration.AddKeyspace;
+import org.apache.cassandra.db.migration.DropColumnFamily;
+import org.apache.cassandra.db.migration.DropKeyspace;
+import org.apache.cassandra.db.migration.RenameColumnFamily;
+import org.apache.cassandra.db.migration.RenameKeyspace;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.ColumnFamily;
@@ -34,9 +43,12 @@ import org.apache.cassandra.db.clock.Abs
 import org.apache.cassandra.db.clock.TimestampReconciler;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.MarshalException;
-import org.apache.cassandra.db.migration.*;
-import org.apache.cassandra.dht.*;
-import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.scheduler.IRequestScheduler;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
 import org.apache.thrift.TException;
@@ -64,6 +76,16 @@ public class CassandraServer implements 
     private ThreadLocal<String> keySpace = new ThreadLocal<String>();
 
     /*
+     * An associated Id for scheduling the requests
+     */
+    private ThreadLocal<String> requestSchedulerId = new ThreadLocal<String>();
+
+    /*
+     * RequestScheduler to perform the scheduling of incoming requests
+     */
+    private final IRequestScheduler requestScheduler;
+
+    /*
       * Handle to the storage service to interact with the other machines in the
       * cluster.
       */
@@ -72,6 +94,7 @@ public class CassandraServer implements 
     public CassandraServer()
     {
         storageService = StorageService.instance;
+        requestScheduler = DatabaseDescriptor.getRequestScheduler();
     }
     
     protected Map<DecoratedKey, ColumnFamily> readColumnFamily(List<ReadCommand> commands, ConsistencyLevel consistency_level)
@@ -92,7 +115,15 @@ public class CassandraServer implements 
         List<Row> rows;
         try
         {
-            rows = StorageProxy.readProtocol(commands, consistency_level);
+            try
+            {
+                schedule();
+                rows = StorageProxy.readProtocol(commands, consistency_level);
+            }
+            finally
+            {
+                release();
+            }
         }
         catch (TimeoutException e) 
         {
@@ -424,21 +455,30 @@ public class CassandraServer implements 
 
     private void doInsert(ConsistencyLevel consistency_level, List<RowMutation> mutations) throws UnavailableException, TimedOutException
     {
-        if (consistency_level == ConsistencyLevel.ZERO)
-        {
-            StorageProxy.mutate(mutations);
-        }
-        else
+        try
         {
-            try
+            schedule();
+
+            if (consistency_level == ConsistencyLevel.ZERO)
             {
-            	StorageProxy.mutateBlocking(mutations, consistency_level);
+                StorageProxy.mutate(mutations);
             }
-            catch (TimeoutException e)
+            else
             {
-            	throw new TimedOutException();
+                try
+                {
+                    StorageProxy.mutateBlocking(mutations, consistency_level);
+                }
+                catch (TimeoutException e)
+                {
+                    throw new TimedOutException();
+                }
             }
         }
+        finally
+        {
+            release();
+        }
     }
 
     public Map<String, Map<String, String>> describe_keyspace(String table) throws NotFoundException
@@ -503,7 +543,15 @@ public class CassandraServer implements 
             {
                 bounds = new Bounds(p.getToken(range.start_key), p.getToken(range.end_key));
             }
-            rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_parent, predicate, bounds, range.count), consistency_level);
+            try
+            {
+                schedule();
+                rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_parent, predicate, bounds, range.count), consistency_level);
+            }
+            finally
+            {
+                release();
+            }
             assert rows != null;
         }
         catch (TimeoutException e)
@@ -681,6 +729,22 @@ public class CassandraServer implements 
         }
     }
 
+    /**
+     * Schedule the current thread for access to the required services
+     */
+    private void schedule()
+    {
+        requestScheduler.queue(Thread.currentThread(), requestSchedulerId.get());
+    }
+
+    /**
+     * Release count for the used up resources
+     */
+    private void release()
+    {
+        requestScheduler.release();
+    }
+
     public String system_add_column_family(CfDef cf_def) throws InvalidRequestException, TException
     {
         checkKeyspaceAndLoginAuthorized(AccessLevel.FULL);
@@ -919,6 +983,7 @@ public class CassandraServer implements 
         checkKeyspaceAndLoginAuthorized(AccessLevel.FULL);
         try
         {
+            schedule();
             StorageProxy.truncateBlocking(keySpace.get(), cfname);
         }
         catch (TimeoutException e)
@@ -929,9 +994,14 @@ public class CassandraServer implements 
         {
             throw (UnavailableException) new UnavailableException().initCause(e);
         }
+        finally
+        {
+            release();
+        }
     }
 
-    public void set_keyspace(String keyspace) throws InvalidRequestException, TException {
+    public void set_keyspace(String keyspace) throws InvalidRequestException, TException
+    {
         if (DatabaseDescriptor.getTableDefinition(keyspace) == null)
         {
             throw new InvalidRequestException("Keyspace does not exist");
@@ -941,7 +1011,8 @@ public class CassandraServer implements 
         if (keySpace.get() != null && !keySpace.get().equals(keyspace))
             loginDone.set(AccessLevel.NONE);
         
-        keySpace.set(keyspace); 
+        keySpace.set(keyspace);
+        requestSchedulerId.set(keyspace);
     }
 
     public Map<String, List<String>> check_schema_agreement() throws TException, InvalidRequestException

Modified: cassandra/trunk/test/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/conf/cassandra.yaml?rev=962675&r1=962674&r2=962675&view=diff
==============================================================================
--- cassandra/trunk/test/conf/cassandra.yaml (original)
+++ cassandra/trunk/test/conf/cassandra.yaml Fri Jul  9 19:58:21 2010
@@ -19,6 +19,8 @@ memtable_operations_in_millions: 0.00002
 seeds:
     - 127.0.0.2
 endpoint_snitch: org.apache.cassandra.locator.SimpleSnitch
+request_scheduler: org.apache.cassandra.scheduler.RoundRobinScheduler
+request_scheduler_id: keyspace
 keyspaces:
     - name: Keyspace1
       replica_placement_strategy: org.apache.cassandra.locator.RackUnawareStrategy

Added: cassandra/trunk/test/unit/org/apache/cassandra/scheduler/RoundRobinSchedulerTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/scheduler/RoundRobinSchedulerTest.java?rev=962675&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/scheduler/RoundRobinSchedulerTest.java (added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/scheduler/RoundRobinSchedulerTest.java Fri Jul  9 19:58:21 2010
@@ -0,0 +1,150 @@
+package org.apache.cassandra.scheduler;
+
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+import static org.junit.Assert.*;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.cassandra.config.RequestSchedulerOptions;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RoundRobinSchedulerTest
+{
+    ExecutorService executor;
+    IRequestScheduler scheduler;
+    AtomicInteger counter = new AtomicInteger(0);
+    static final String KS1 = "TestKeyspace";
+    static final String KS2 = "DevKeyspace";
+    static final String KS3 = "ProdKeyspace";
+    
+    Map<Integer, Integer> testValues = new HashMap<Integer, Integer>();
+
+    @Before
+    public void setUp()
+    {
+        RequestSchedulerOptions options = new RequestSchedulerOptions();
+        options.throttle_limit = 5;
+        scheduler = new RoundRobinScheduler(options);
+        SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>();
+
+        executor = new ThreadPoolExecutor(20,
+                                          Integer.MAX_VALUE,
+                                          60*1000,
+                                          TimeUnit.MILLISECONDS,
+                                          queue);
+        // When there are large no. of threads, the results become
+        // more unpredictable because of the JVM thread scheduling
+        // and that will be very hard to provide a consistent test
+        runKs1(1, 10);
+        runKs2(11, 13);
+        runKs3(14, 15);
+
+        try
+        {
+            Thread.sleep(3000);
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
+    }
+
+    @Test
+    public void testScheduling()
+    {
+        for (Integer initialValue : testValues.keySet())
+        {
+            // Makes sure, requests to each keyspace get an equal chance
+            // Requests from one keyspace will not block requests from
+            // another keyspacce
+            if (initialValue > 10)
+            {
+                assertTrue(initialValue >= testValues.get(initialValue));
+            }
+        }
+    }
+
+    @After
+    public void shutDown()
+    {
+        executor.shutdown();
+    }
+
+    private void runKs1(int start, int end)
+    {
+        for (int i=start; i<=end; i++)
+        {
+            executor.execute(new Worker(KS1, i));
+        }
+    }
+
+    private void runKs2(int start, int end)
+    {
+        for (int i=start; i<=end; i++)
+        {
+            executor.execute(new Worker(KS2, i)); 
+        }
+    }
+
+    private void runKs3(int start, int end)
+    {
+        for (int i=start; i<=end; i++)
+        {
+            executor.execute(new Worker(KS3, i)); 
+        }
+    }
+
+    class Worker implements Runnable
+    {
+        String id;
+        int initialCount;
+        int runCount;
+
+        public Worker(String id, int count)
+        {
+            this.id = id;
+            initialCount = count;
+        }
+
+        public void run()
+        {
+            scheduler.queue(Thread.currentThread(), id);
+
+            runCount = counter.incrementAndGet();
+
+            synchronized(scheduler)
+            {
+                testValues.put(initialCount, runCount);
+            }
+            scheduler.release();
+        }
+    }
+}