You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/07/09 21:58:22 UTC
svn commit: r962675 - in /cassandra/trunk: ./ conf/
src/java/org/apache/cassandra/avro/ src/java/org/apache/cassandra/config/
src/java/org/apache/cassandra/scheduler/
src/java/org/apache/cassandra/thrift/ test/conf/
test/unit/org/apache/cassandra/sched...
Author: jbellis
Date: Fri Jul 9 19:58:21 2010
New Revision: 962675
URL: http://svn.apache.org/viewvc?rev=962675&view=rev
Log:
implement keyspace round-robin scheduler. patch by Nirmal Ranganathan; reviewed by Stu Hood and jbellis for CASSANDRA-1035
Added:
cassandra/trunk/src/java/org/apache/cassandra/scheduler/
cassandra/trunk/src/java/org/apache/cassandra/scheduler/IRequestScheduler.java
cassandra/trunk/src/java/org/apache/cassandra/scheduler/NoScheduler.java
cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
cassandra/trunk/test/unit/org/apache/cassandra/scheduler/
cassandra/trunk/test/unit/org/apache/cassandra/scheduler/RoundRobinSchedulerTest.java
Modified:
cassandra/trunk/NEWS.txt
cassandra/trunk/conf/cassandra.yaml
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
cassandra/trunk/test/conf/cassandra.yaml
Modified: cassandra/trunk/NEWS.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/NEWS.txt?rev=962675&r1=962674&r2=962675&view=diff
==============================================================================
--- cassandra/trunk/NEWS.txt (original)
+++ cassandra/trunk/NEWS.txt Fri Jul 9 19:58:21 2010
@@ -18,6 +18,8 @@ Features
- Streaming data for repair or node movement no longer requires
anticompaction step first
- keyspace is per-connection in the thrift API instead of per-call
+ - optional round-robin scheduling between keyspaces for multitenant
+ clusters
Configuraton
------------
Modified: cassandra/trunk/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=962675&r1=962674&r2=962675&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra.yaml (original)
+++ cassandra/trunk/conf/cassandra.yaml Fri Jul 9 19:58:21 2010
@@ -141,6 +141,35 @@ gc_grace_seconds: 864000
# org.apache.cassandra.locator.PropertyFileSnitch.
endpoint_snitch: org.apache.cassandra.locator.SimpleSnitch
+# request_scheduler -- Set this to a class that implements
+# RequestScheduler, which will schedule incoming client requests
+# according to the specific policy. This is useful for multi-tenancy
+# with a single Cassandra cluster.
+# NOTE: This is specifically for requests from the client and does
+# not affect inter node communication.
+# org.apache.cassandra.scheduler.NoScheduler - No scheduling takes place
+# org.apache.cassandra.scheduler.RoundRobinScheduler - Round robin of
+# client requests to a node with a sepearte queue for each
+# reques_scheduler_id. The requests are throttled based on the limit set
+# in throttle_limit in the requeset_scheduler_options
+request_scheduler: org.apache.cassandra.scheduler.NoScheduler
+
+# Scheduler Options vary based on the type of scheduler
+# NoScheduler - Has no options
+# RoundRobin
+# - throttle_limit -- The throttle_limit is the number of in-flight
+# requests per client. Requests beyond
+# that limit are queued up until
+# running requests can complete.
+# The value of 80 here is twice the number of
+# concurrent_reads + concurrent_writes.
+# request_scheduler_options:
+# throttle_limit: 80
+
+# request_scheduler_id -- An identifer based on which to perform
+# the request scheduling. The current supported option is "keyspace"
+request_scheduler_id: keyspace
+
# A ColumnFamily is the Cassandra concept closest to a relational table.
#
# Keyspaces are separate groups of ColumnFamilies. Except in very
Modified: cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java?rev=962675&r1=962674&r2=962675&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java Fri Jul 9 19:58:21 2010
@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.KSMetaData;
@@ -53,6 +54,7 @@ import org.apache.cassandra.db.filter.Qu
import org.apache.cassandra.db.marshal.MarshalException;
import org.apache.cassandra.db.migration.AddKeyspace;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.scheduler.IRequestScheduler;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
@@ -84,12 +86,27 @@ public class CassandraServer implements
// Session keyspace.
private ThreadLocal<String> curKeyspace = new ThreadLocal<String>();
+ /*
+ * An associated Id for scheduling the requests
+ */
+ private ThreadLocal<String> requestSchedulerId = new ThreadLocal<String>();
+
+ /*
+ * RequestScheduler to perform the scheduling of incoming requests
+ */
+ private final IRequestScheduler requestScheduler;
+
+ public CassandraServer()
+ {
+ requestScheduler = DatabaseDescriptor.getRequestScheduler();
+ }
+
@Override
public ColumnOrSuperColumn get(ByteBuffer key, ColumnPath columnPath, ConsistencyLevel consistencyLevel)
throws AvroRemoteException, InvalidRequestException, NotFoundException, UnavailableException, TimedOutException {
if (logger.isDebugEnabled())
logger.debug("get");
-
+
AvroValidation.validateColumnPath(curKeyspace.get(), columnPath);
// FIXME: This is repetitive.
@@ -129,6 +146,7 @@ public class CassandraServer implements
List<Row> rows;
try
{
+ schedule();
rows = StorageProxy.readProtocol(commands, thriftConsistencyLevel(consistency));
}
catch (TimeoutException e)
@@ -145,6 +163,10 @@ public class CassandraServer implements
{
throw new UnavailableException();
}
+ finally
+ {
+ release();
+ }
for (Row row: rows)
{
@@ -398,6 +420,7 @@ public class CassandraServer implements
{
try
{
+ schedule();
StorageProxy.mutateBlocking(Arrays.asList(rm), thriftConsistencyLevel(consistency));
}
catch (TimeoutException e)
@@ -408,10 +431,22 @@ public class CassandraServer implements
{
throw new UnavailableException();
}
+ finally
+ {
+ release();
+ }
}
else
{
- StorageProxy.mutate(Arrays.asList(rm));
+ try
+ {
+ schedule();
+ StorageProxy.mutate(Arrays.asList(rm));
+ }
+ finally
+ {
+ release();
+ }
}
}
@@ -441,12 +476,21 @@ public class CassandraServer implements
if (consistencyLevel == ConsistencyLevel.ZERO)
{
- StorageProxy.mutate(rowMutations);
+ try
+ {
+ schedule();
+ StorageProxy.mutate(rowMutations);
+ }
+ finally
+ {
+ release();
+ }
}
else
{
try
{
+ schedule();
StorageProxy.mutateBlocking(rowMutations, thriftConsistencyLevel(consistencyLevel));
}
catch (TimeoutException te)
@@ -458,6 +502,10 @@ public class CassandraServer implements
{
throw newUnavailableException();
}
+ finally
+ {
+ release();
+ }
}
return null;
@@ -553,7 +601,11 @@ public class CassandraServer implements
loginDone.set(AccessLevel.NONE);
this.curKeyspace.set(keyspaceStr);
-
+
+ if (DatabaseDescriptor.getRequestSchedulerId().equals(Config.RequestSchedulerId.keyspace)) {
+ requestSchedulerId.set(curKeyspace.get());
+ }
+
return null;
}
@@ -663,4 +715,20 @@ public class CassandraServer implements
logger.debug("checking schema agreement");
return StorageProxy.checkSchemaAgreement();
}
+
+ /**
+ * Schedule the current thread for access to the required services
+ */
+ private void schedule()
+ {
+ requestScheduler.queue(Thread.currentThread(), requestSchedulerId.get());
+ }
+
+ /**
+ * Release a count of resources used to the request scheduler
+ */
+ private void release()
+ {
+ requestScheduler.release();
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=962675&r1=962674&r2=962675&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Fri Jul 9 19:58:21 2010
@@ -72,6 +72,10 @@ public class Config {
public String endpoint_snitch;
+ public String request_scheduler;
+ public RequestSchedulerId request_scheduler_id;
+ public RequestSchedulerOptions request_scheduler_options;
+
public List<Keyspace> keyspaces;
public static enum CommitLogSync {
@@ -86,4 +90,8 @@ public class Config {
standard,
}
+ public static enum RequestSchedulerId
+ {
+ keyspace
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=962675&r1=962674&r2=962675&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Fri Jul 9 19:58:21 2010
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.auth.AllowAllAuthenticator;
import org.apache.cassandra.auth.IAuthenticator;
+import org.apache.cassandra.config.Config.RequestSchedulerId;
import org.apache.cassandra.db.ClockType;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.db.DefsTable;
@@ -46,6 +47,8 @@ import org.apache.cassandra.dht.IPartiti
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.scheduler.IRequestScheduler;
+import org.apache.cassandra.scheduler.NoScheduler;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
@@ -85,6 +88,10 @@ public class DatabaseDescriptor
private final static String STORAGE_CONF_FILE = "cassandra.yaml";
+ private static IRequestScheduler requestScheduler;
+ private static RequestSchedulerId requestSchedulerId;
+ private static RequestSchedulerOptions requestSchedulerOptions;
+
public static final UUID INITIAL_VERSION = new UUID(4096, 0); // has type nibble set to 1, everything else to zero.
private static UUID defsVersion = INITIAL_VERSION;
@@ -259,6 +266,39 @@ public class DatabaseDescriptor
}
snitch = createEndpointSnitch(conf.endpoint_snitch);
+ /* Request Scheduler setup */
+ requestSchedulerOptions = conf.request_scheduler_options;
+ if (conf.request_scheduler != null)
+ {
+ try
+ {
+ if (requestSchedulerOptions == null)
+ {
+ requestSchedulerOptions = new RequestSchedulerOptions();
+ }
+ Class cls = Class.forName(conf.request_scheduler);
+ requestScheduler = (IRequestScheduler) cls.getConstructor(RequestSchedulerOptions.class).newInstance(requestSchedulerOptions);
+ }
+ catch (ClassNotFoundException e)
+ {
+ throw new ConfigurationException("Invalid Request Scheduler class " + conf.request_scheduler);
+ }
+ }
+ else
+ {
+ requestScheduler = new NoScheduler();
+ }
+
+ if (conf.request_scheduler_id == RequestSchedulerId.keyspace)
+ {
+ requestSchedulerId = conf.request_scheduler_id;
+ }
+ else
+ {
+ // Default to Keyspace
+ requestSchedulerId = RequestSchedulerId.keyspace;
+ }
+
if (logger.isDebugEnabled() && conf.auto_bootstrap != null)
{
logger.debug("setting auto_bootstrap to " + conf.auto_bootstrap);
@@ -716,6 +756,21 @@ public class DatabaseDescriptor
return snitch;
}
+ public static IRequestScheduler getRequestScheduler()
+ {
+ return requestScheduler;
+ }
+
+ public static RequestSchedulerOptions getRequestSchedulerOptions()
+ {
+ return requestSchedulerOptions;
+ }
+
+ public static RequestSchedulerId getRequestSchedulerId()
+ {
+ return requestSchedulerId;
+ }
+
public static Class<? extends AbstractReplicationStrategy> getReplicaPlacementStrategyClass(String table)
{
KSMetaData meta = tables.get(table);
Added: cassandra/trunk/src/java/org/apache/cassandra/scheduler/IRequestScheduler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/scheduler/IRequestScheduler.java?rev=962675&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/scheduler/IRequestScheduler.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/scheduler/IRequestScheduler.java Fri Jul 9 19:58:21 2010
@@ -0,0 +1,41 @@
+package org.apache.cassandra.scheduler;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * Implementors of IRequestScheduler must provide a constructor taking a RequestSchedulerOptions object.
+ */
+public interface IRequestScheduler
+{
+ /**
+ * Queue incoming request threads
+ *
+ * @param t Thread handing the request
+ * @param id Scheduling parameter, an id to distinguish profiles (users/keyspace)
+ */
+ public void queue(Thread t, String id);
+
+ /**
+ * A convenience method for indicating when a particular request has completed
+ * processing, and before a return to the client
+ */
+ public void release();
+}
Added: cassandra/trunk/src/java/org/apache/cassandra/scheduler/NoScheduler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/scheduler/NoScheduler.java?rev=962675&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/scheduler/NoScheduler.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/scheduler/NoScheduler.java Fri Jul 9 19:58:21 2010
@@ -0,0 +1,40 @@
+package org.apache.cassandra.scheduler;
+
+import org.apache.cassandra.config.RequestSchedulerOptions;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * This is basically not having a scheduler, the requests are
+ * processed as normally would be handled by the JVM.
+ */
+public class NoScheduler implements IRequestScheduler
+{
+
+ public NoScheduler(RequestSchedulerOptions options) {}
+
+ public NoScheduler() {}
+
+ public void queue(Thread t, String id) {}
+
+ public void release() {}
+}
Added: cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java?rev=962675&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java Fri Jul 9 19:58:21 2010
@@ -0,0 +1,126 @@
+package org.apache.cassandra.scheduler;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.SynchronousQueue;
+
+import org.apache.cassandra.config.RequestSchedulerOptions;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A very basic Round Robin implementation of the RequestScheduler. It handles
+ * request groups identified on user/keyspace by placing them in separate
+ * queues and servicing a request from each queue in a RoundRobin fashion.
+ */
+public class RoundRobinScheduler implements IRequestScheduler
+{
+ private static final Logger logger = LoggerFactory.getLogger(RoundRobinScheduler.class);
+ private final NonBlockingHashMap<String, SynchronousQueue<Thread>> queues;
+ private static boolean started = false;
+
+ private final Semaphore taskCount;
+
+ // Used by the the scheduler thread so we don't need to busy-wait until there is a request to process
+ private final Semaphore queueSize = new Semaphore(0, false);
+
+ public RoundRobinScheduler(RequestSchedulerOptions options)
+ {
+ assert !started;
+
+ taskCount = new Semaphore(options.throttle_limit);
+ queues = new NonBlockingHashMap<String, SynchronousQueue<Thread>>();
+ Runnable runnable = new Runnable()
+ {
+ public void run()
+ {
+ while (true)
+ {
+ schedule();
+ }
+ }
+ };
+ Thread scheduler = new Thread(runnable, "REQUEST-SCHEDULER");
+ scheduler.start();
+ logger.info("Started the RoundRobin Request Scheduler");
+ started = true;
+ }
+
+ public void queue(Thread t, String id)
+ {
+ SynchronousQueue<Thread> queue = getQueue(id);
+
+ try
+ {
+ queueSize.release();
+ queue.put(t);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException("Interrupted while queueing requests", e);
+ }
+ }
+
+ public void release()
+ {
+ taskCount.release();
+ }
+
+ private void schedule()
+ {
+ queueSize.acquireUninterruptibly();
+ for (SynchronousQueue<Thread> queue : queues.values())
+ {
+ Thread t = queue.poll();
+ if (t != null)
+ {
+ taskCount.acquireUninterruptibly();
+ queueSize.acquireUninterruptibly();
+ }
+ }
+ queueSize.release();
+ }
+
+ /*
+ * Get the Queue for the respective id, if one is not available
+ * create a new queue for that corresponding id and return it
+ */
+ private SynchronousQueue<Thread> getQueue(String id)
+ {
+ SynchronousQueue<Thread> queue = queues.get(id);
+ if (queue != null)
+ // queue existed
+ return queue;
+
+ SynchronousQueue<Thread> maybenew = new SynchronousQueue<Thread>(true);
+ queue = queues.putIfAbsent(id, maybenew);
+ if (queue == null)
+ // created new queue
+ return maybenew;
+
+ // another thread created the queue
+ return queue;
+ }
+}
Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=962675&r1=962674&r2=962675&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Fri Jul 9 19:58:21 2010
@@ -27,6 +27,15 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.auth.AllowAllAuthenticator;
import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.db.migration.AddColumnFamily;
+import org.apache.cassandra.db.migration.AddKeyspace;
+import org.apache.cassandra.db.migration.DropColumnFamily;
+import org.apache.cassandra.db.migration.DropKeyspace;
+import org.apache.cassandra.db.migration.RenameColumnFamily;
+import org.apache.cassandra.db.migration.RenameKeyspace;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.config.*;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.ColumnFamily;
@@ -34,9 +43,12 @@ import org.apache.cassandra.db.clock.Abs
import org.apache.cassandra.db.clock.TimestampReconciler;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.marshal.MarshalException;
-import org.apache.cassandra.db.migration.*;
-import org.apache.cassandra.dht.*;
-import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.scheduler.IRequestScheduler;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
import org.apache.thrift.TException;
@@ -64,6 +76,16 @@ public class CassandraServer implements
private ThreadLocal<String> keySpace = new ThreadLocal<String>();
/*
+ * An associated Id for scheduling the requests
+ */
+ private ThreadLocal<String> requestSchedulerId = new ThreadLocal<String>();
+
+ /*
+ * RequestScheduler to perform the scheduling of incoming requests
+ */
+ private final IRequestScheduler requestScheduler;
+
+ /*
* Handle to the storage service to interact with the other machines in the
* cluster.
*/
@@ -72,6 +94,7 @@ public class CassandraServer implements
public CassandraServer()
{
storageService = StorageService.instance;
+ requestScheduler = DatabaseDescriptor.getRequestScheduler();
}
protected Map<DecoratedKey, ColumnFamily> readColumnFamily(List<ReadCommand> commands, ConsistencyLevel consistency_level)
@@ -92,7 +115,15 @@ public class CassandraServer implements
List<Row> rows;
try
{
- rows = StorageProxy.readProtocol(commands, consistency_level);
+ try
+ {
+ schedule();
+ rows = StorageProxy.readProtocol(commands, consistency_level);
+ }
+ finally
+ {
+ release();
+ }
}
catch (TimeoutException e)
{
@@ -424,21 +455,30 @@ public class CassandraServer implements
private void doInsert(ConsistencyLevel consistency_level, List<RowMutation> mutations) throws UnavailableException, TimedOutException
{
- if (consistency_level == ConsistencyLevel.ZERO)
- {
- StorageProxy.mutate(mutations);
- }
- else
+ try
{
- try
+ schedule();
+
+ if (consistency_level == ConsistencyLevel.ZERO)
{
- StorageProxy.mutateBlocking(mutations, consistency_level);
+ StorageProxy.mutate(mutations);
}
- catch (TimeoutException e)
+ else
{
- throw new TimedOutException();
+ try
+ {
+ StorageProxy.mutateBlocking(mutations, consistency_level);
+ }
+ catch (TimeoutException e)
+ {
+ throw new TimedOutException();
+ }
}
}
+ finally
+ {
+ release();
+ }
}
public Map<String, Map<String, String>> describe_keyspace(String table) throws NotFoundException
@@ -503,7 +543,15 @@ public class CassandraServer implements
{
bounds = new Bounds(p.getToken(range.start_key), p.getToken(range.end_key));
}
- rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_parent, predicate, bounds, range.count), consistency_level);
+ try
+ {
+ schedule();
+ rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_parent, predicate, bounds, range.count), consistency_level);
+ }
+ finally
+ {
+ release();
+ }
assert rows != null;
}
catch (TimeoutException e)
@@ -681,6 +729,22 @@ public class CassandraServer implements
}
}
+ /**
+ * Schedule the current thread for access to the required services
+ */
+ private void schedule()
+ {
+ requestScheduler.queue(Thread.currentThread(), requestSchedulerId.get());
+ }
+
+ /**
+ * Release count for the used up resources
+ */
+ private void release()
+ {
+ requestScheduler.release();
+ }
+
public String system_add_column_family(CfDef cf_def) throws InvalidRequestException, TException
{
checkKeyspaceAndLoginAuthorized(AccessLevel.FULL);
@@ -919,6 +983,7 @@ public class CassandraServer implements
checkKeyspaceAndLoginAuthorized(AccessLevel.FULL);
try
{
+ schedule();
StorageProxy.truncateBlocking(keySpace.get(), cfname);
}
catch (TimeoutException e)
@@ -929,9 +994,14 @@ public class CassandraServer implements
{
throw (UnavailableException) new UnavailableException().initCause(e);
}
+ finally
+ {
+ release();
+ }
}
- public void set_keyspace(String keyspace) throws InvalidRequestException, TException {
+ public void set_keyspace(String keyspace) throws InvalidRequestException, TException
+ {
if (DatabaseDescriptor.getTableDefinition(keyspace) == null)
{
throw new InvalidRequestException("Keyspace does not exist");
@@ -941,7 +1011,8 @@ public class CassandraServer implements
if (keySpace.get() != null && !keySpace.get().equals(keyspace))
loginDone.set(AccessLevel.NONE);
- keySpace.set(keyspace);
+ keySpace.set(keyspace);
+ requestSchedulerId.set(keyspace);
}
public Map<String, List<String>> check_schema_agreement() throws TException, InvalidRequestException
Modified: cassandra/trunk/test/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/conf/cassandra.yaml?rev=962675&r1=962674&r2=962675&view=diff
==============================================================================
--- cassandra/trunk/test/conf/cassandra.yaml (original)
+++ cassandra/trunk/test/conf/cassandra.yaml Fri Jul 9 19:58:21 2010
@@ -19,6 +19,8 @@ memtable_operations_in_millions: 0.00002
seeds:
- 127.0.0.2
endpoint_snitch: org.apache.cassandra.locator.SimpleSnitch
+request_scheduler: org.apache.cassandra.scheduler.RoundRobinScheduler
+request_scheduler_id: keyspace
keyspaces:
- name: Keyspace1
replica_placement_strategy: org.apache.cassandra.locator.RackUnawareStrategy
Added: cassandra/trunk/test/unit/org/apache/cassandra/scheduler/RoundRobinSchedulerTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/scheduler/RoundRobinSchedulerTest.java?rev=962675&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/scheduler/RoundRobinSchedulerTest.java (added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/scheduler/RoundRobinSchedulerTest.java Fri Jul 9 19:58:21 2010
@@ -0,0 +1,150 @@
+package org.apache.cassandra.scheduler;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import static org.junit.Assert.*;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.cassandra.config.RequestSchedulerOptions;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RoundRobinSchedulerTest
+{
+ ExecutorService executor;
+ IRequestScheduler scheduler;
+ AtomicInteger counter = new AtomicInteger(0);
+ static final String KS1 = "TestKeyspace";
+ static final String KS2 = "DevKeyspace";
+ static final String KS3 = "ProdKeyspace";
+
+ Map<Integer, Integer> testValues = new HashMap<Integer, Integer>();
+
+ @Before
+ public void setUp()
+ {
+ RequestSchedulerOptions options = new RequestSchedulerOptions();
+ options.throttle_limit = 5;
+ scheduler = new RoundRobinScheduler(options);
+ SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>();
+
+ executor = new ThreadPoolExecutor(20,
+ Integer.MAX_VALUE,
+ 60*1000,
+ TimeUnit.MILLISECONDS,
+ queue);
+ // When there are large no. of threads, the results become
+ // more unpredictable because of the JVM thread scheduling
+ // and that will be very hard to provide a consistent test
+ runKs1(1, 10);
+ runKs2(11, 13);
+ runKs3(14, 15);
+
+ try
+ {
+ Thread.sleep(3000);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+
+ @Test
+ public void testScheduling()
+ {
+ for (Integer initialValue : testValues.keySet())
+ {
+ // Makes sure, requests to each keyspace get an equal chance
+ // Requests from one keyspace will not block requests from
+ // another keyspacce
+ if (initialValue > 10)
+ {
+ assertTrue(initialValue >= testValues.get(initialValue));
+ }
+ }
+ }
+
+ @After
+ public void shutDown()
+ {
+ executor.shutdown();
+ }
+
+ private void runKs1(int start, int end)
+ {
+ for (int i=start; i<=end; i++)
+ {
+ executor.execute(new Worker(KS1, i));
+ }
+ }
+
+ private void runKs2(int start, int end)
+ {
+ for (int i=start; i<=end; i++)
+ {
+ executor.execute(new Worker(KS2, i));
+ }
+ }
+
+ private void runKs3(int start, int end)
+ {
+ for (int i=start; i<=end; i++)
+ {
+ executor.execute(new Worker(KS3, i));
+ }
+ }
+
+ class Worker implements Runnable
+ {
+ String id;
+ int initialCount;
+ int runCount;
+
+ public Worker(String id, int count)
+ {
+ this.id = id;
+ initialCount = count;
+ }
+
+ public void run()
+ {
+ scheduler.queue(Thread.currentThread(), id);
+
+ runCount = counter.incrementAndGet();
+
+ synchronized(scheduler)
+ {
+ testValues.put(initialCount, runCount);
+ }
+ scheduler.release();
+ }
+ }
+}