You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 08:57:31 UTC

svn commit: r749218 [5/34] - in /incubator/cassandra: branches/ dist/ nightly/ site/ tags/ trunk/ trunk/lib/ trunk/src/ trunk/src/org/ trunk/src/org/apache/ trunk/src/org/apache/cassandra/ trunk/src/org/apache/cassandra/analytics/ trunk/src/org/apache/...

Added: incubator/cassandra/trunk/src/org/apache/cassandra/cli/CliSessionState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/cli/CliSessionState.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/cli/CliSessionState.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/cli/CliSessionState.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cli;
+
+import java.io.InputStream;
+import java.io.PrintStream;
+
+public class CliSessionState {
+
+    public boolean timingOn = false;
+    public String  hostName;       // cassandra server name
+    public int     thriftPort;     // cassandra server's thrift port
+
+    /*
+     * Streams to read/write from
+     */
+    public InputStream in;
+    public PrintStream out;
+    public PrintStream err;
+
+    public CliSessionState() {
+        in = System.in;
+        out = System.out;
+        err = System.err;
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/cli/Cli__.g
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/cli/Cli__.g?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/cli/Cli__.g (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/cli/Cli__.g Mon Mar  2 07:57:22 2009
@@ -0,0 +1,105 @@
+lexer grammar Cli;
+@header {
+package com.facebook.infrastructure.cli;
+}
+
+T43 : '?' ;
+T44 : '=' ;
+T45 : '[' ;
+T46 : ']' ;
+
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cli/Cli.g" 166
+K_CONFIG:     'CONFIG';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cli/Cli.g" 167
+K_CONNECT:    'CONNECT';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cli/Cli.g" 168
+K_CLUSTER:    'CLUSTER';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cli/Cli.g" 169
+K_DESCRIBE:   'DESCRIBE';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cli/Cli.g" 170
+K_GET:        'GET';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cli/Cli.g" 171
+K_HELP:       'HELP';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cli/Cli.g" 172
+K_EXIT:       'EXIT';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cli/Cli.g" 173
+K_FILE:       'FILE';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cli/Cli.g" 174
+K_NAME:       'NAME';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cli/Cli.g" 175
+K_QUIT:       'QUIT';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cli/Cli.g" 176
+K_SET:        'SET';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cli/Cli.g" 177
+K_SHOW:       'SHOW';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cli/Cli.g" 178
+K_TABLE:      'TABLE';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cli/Cli.g" 179
+K_TABLES:     'TABLES';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cli/Cli.g" 180
+K_THRIFT:     'THRIFT';
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cli/Cli.g" 181
+K_VERSION:    'VERSION';
+
+// private syntactic rules
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cli/Cli.g" 184
+fragment
+Letter
+    : 'a'..'z' 
+    | 'A'..'Z'
+    ;
+
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cli/Cli.g" 190
+fragment
+Digit
+    : '0'..'9'
+    ;
+
+// syntactic Elements
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cli/Cli.g" 196
+Identifier
+    : Letter ( Letter | Digit | '_')*
+    ;
+
+
+// literals
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cli/Cli.g" 202
+StringLiteral
+    :
+    '\'' (~'\'')* '\'' ( '\'' (~'\'')* '\'' )* 
+    ;
+
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cli/Cli.g" 207
+IntegerLiteral
+   : Digit+;
+
+
+//
+// syntactic elements
+//
+
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cli/Cli.g" 215
+DOT
+    : '.'
+    ;
+
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cli/Cli.g" 219
+SLASH
+    : '/'
+    ;
+
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cli/Cli.g" 223
+SEMICOLON
+    : ';'
+    ;
+
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cli/Cli.g" 227
+WS
+    :  (' '|'\r'|'\t'|'\n') {$channel=HIDDEN;}  // whitepace
+    ;
+
+// $ANTLR src "/home/kannan/fbomb/trunk/fbcode/cassandra/src/com/facebook/infrastructure/cli/Cli.g" 231
+COMMENT 
+    : '--' (~('\n'|'\r'))*                     { $channel=HIDDEN; }
+    | '/*' (options {greedy=false;} : .)* '*/' { $channel=HIDDEN; }
+    ;

Added: incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/AIOExecutorService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/AIOExecutorService.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/AIOExecutorService.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/AIOExecutorService.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,319 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class AIOExecutorService implements ExecutorService
+{
+    private ExecutorService executorService_;
+    
+    public AIOExecutorService(int corePoolSize,
+            int maximumPoolSize,
+            long keepAliveTime,
+            TimeUnit unit,
+            BlockingQueue<Runnable> workQueue,
+            ThreadFactory threadFactory)
+    {
+        executorService_ = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);        
+    }
+    
+    /**
+     * Executes the given command at some time in the future.  The command
+     * may execute in a new thread, in a pooled thread, or in the calling
+     * thread, at the discretion of the <tt>Executor</tt> implementation.
+     *
+     * @param command the runnable task
+     * @throws RejectedExecutionException if this task cannot be
+     * accepted for execution.
+     * @throws NullPointerException if command is null
+     */
+    public void execute(Runnable command)
+    {
+        executorService_.execute(command);
+    }
+    
+    /**
+     * Initiates an orderly shutdown in which previously submitted
+     * tasks are executed, but no new tasks will be accepted.
+     * Invocation has no additional effect if already shut down.
+     *
+     * <p>This method does not wait for previously submitted tasks to
+     * complete execution.  Use {@link #awaitTermination awaitTermination}
+     * to do that.
+     *
+     * @throws SecurityException if a security manager exists and
+     *         shutting down this ExecutorService may manipulate
+     *         threads that the caller is not permitted to modify
+     *         because it does not hold {@link
+     *         java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
+     *         or the security manager's <tt>checkAccess</tt> method
+     *         denies access.
+     */
+    public void shutdown()
+    {    
+        /* This is a noop. */     
+    }
+
+    /**
+     * Attempts to stop all actively executing tasks, halts the
+     * processing of waiting tasks, and returns a list of the tasks
+     * that were awaiting execution.
+     *
+     * <p>This method does not wait for actively executing tasks to
+     * terminate.  Use {@link #awaitTermination awaitTermination} to
+     * do that.
+     *
+     * <p>There are no guarantees beyond best-effort attempts to stop
+     * processing actively executing tasks.  For example, typical
+     * implementations will cancel via {@link Thread#interrupt}, so any
+     * task that fails to respond to interrupts may never terminate.
+     *
+     * @return list of tasks that never commenced execution
+     * @throws SecurityException if a security manager exists and
+     *         shutting down this ExecutorService may manipulate
+     *         threads that the caller is not permitted to modify
+     *         because it does not hold {@link
+     *         java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
+     *         or the security manager's <tt>checkAccess</tt> method
+     *         denies access.
+     */
+    public List<Runnable> shutdownNow()
+    {
+        return executorService_.shutdownNow();
+    }
+
+    /**
+     * Returns <tt>true</tt> if this executor has been shut down.
+     *
+     * @return <tt>true</tt> if this executor has been shut down
+     */
+    public boolean isShutdown()
+    {
+        return executorService_.isShutdown();
+    }
+
+    /**
+     * Returns <tt>true</tt> if all tasks have completed following shut down.
+     * Note that <tt>isTerminated</tt> is never <tt>true</tt> unless
+     * either <tt>shutdown</tt> or <tt>shutdownNow</tt> was called first.
+     *
+     * @return <tt>true</tt> if all tasks have completed following shut down
+     */
+    public boolean isTerminated()
+    {
+        return executorService_.isTerminated();
+    }
+
+    /**
+     * Blocks until all tasks have completed execution after a shutdown
+     * request, or the timeout occurs, or the current thread is
+     * interrupted, whichever happens first.
+     *
+     * @param timeout the maximum time to wait
+     * @param unit the time unit of the timeout argument
+     * @return <tt>true</tt> if this executor terminated and
+     *         <tt>false</tt> if the timeout elapsed before termination
+     * @throws InterruptedException if interrupted while waiting
+     */
+    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
+    {
+        return executorService_.awaitTermination(timeout, unit);
+    }
+
+    /**
+     * Submits a value-returning task for execution and returns a
+     * Future representing the pending results of the task. The
+     * Future's <tt>get</tt> method will return the task's result upon
+     * successful completion.
+     *
+     * <p>
+     * If you would like to immediately block waiting
+     * for a task, you can use constructions of the form
+     * <tt>result = exec.submit(aCallable).get();</tt>
+     *
+     * <p> Note: The {@link Executors} class includes a set of methods
+     * that can convert some other common closure-like objects,
+     * for example, {@link java.security.PrivilegedAction} to
+     * {@link Callable} form so they can be submitted.
+     *
+     * @param task the task to submit
+     * @return a Future representing pending completion of the task
+     * @throws RejectedExecutionException if the task cannot be
+     *         scheduled for execution
+     * @throws NullPointerException if the task is null
+     */
+    public <T> Future<T> submit(Callable<T> task)
+    {
+        return executorService_.submit(task);
+    }
+
+    /**
+     * Submits a Runnable task for execution and returns a Future
+     * representing that task. The Future's <tt>get</tt> method will
+     * return the given result upon successful completion.
+     *
+     * @param task the task to submit
+     * @param result the result to return
+     * @return a Future representing pending completion of the task
+     * @throws RejectedExecutionException if the task cannot be
+     *         scheduled for execution
+     * @throws NullPointerException if the task is null
+     */
+    public <T> Future<T> submit(Runnable task, T result)
+    {
+        return executorService_.submit(task, result);
+    }
+
+    /**
+     * Submits a Runnable task for execution and returns a Future
+     * representing that task. The Future's <tt>get</tt> method will
+     * return <tt>null</tt> upon <em>successful</em> completion.
+     *
+     * @param task the task to submit
+     * @return a Future representing pending completion of the task
+     * @throws RejectedExecutionException if the task cannot be
+     *         scheduled for execution
+     * @throws NullPointerException if the task is null
+     */
+    public Future<?> submit(Runnable task)
+    {
+        return executorService_.submit(task);
+    }
+
+    /**
+     * Executes the given tasks, returning a list of Futures holding
+     * their status and results when all complete.
+     * {@link Future#isDone} is <tt>true</tt> for each
+     * element of the returned list.
+     * Note that a <em>completed</em> task could have
+     * terminated either normally or by throwing an exception.
+     * The results of this method are undefined if the given
+     * collection is modified while this operation is in progress.
+     *
+     * @param tasks the collection of tasks
+     * @return A list of Futures representing the tasks, in the same
+     *         sequential order as produced by the iterator for the
+     *         given task list, each of which has completed.
+     * @throws InterruptedException if interrupted while waiting, in
+     *         which case unfinished tasks are cancelled.
+     * @throws NullPointerException if tasks or any of its elements are <tt>null</tt>
+     * @throws RejectedExecutionException if any task cannot be
+     *         scheduled for execution
+     */
+
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException
+    {
+        return executorService_.invokeAll(tasks);
+    }
+
+    /**
+     * Executes the given tasks, returning a list of Futures holding
+     * their status and results
+     * when all complete or the timeout expires, whichever happens first.
+     * {@link Future#isDone} is <tt>true</tt> for each
+     * element of the returned list.
+     * Upon return, tasks that have not completed are cancelled.
+     * Note that a <em>completed</em> task could have
+     * terminated either normally or by throwing an exception.
+     * The results of this method are undefined if the given
+     * collection is modified while this operation is in progress.
+     *
+     * @param tasks the collection of tasks
+     * @param timeout the maximum time to wait
+     * @param unit the time unit of the timeout argument
+     * @return a list of Futures representing the tasks, in the same
+     *         sequential order as produced by the iterator for the
+     *         given task list. If the operation did not time out,
+     *         each task will have completed. If it did time out, some
+     *         of these tasks will not have completed.
+     * @throws InterruptedException if interrupted while waiting, in
+     *         which case unfinished tasks are cancelled
+     * @throws NullPointerException if tasks, any of its elements, or
+     *         unit are <tt>null</tt>
+     * @throws RejectedExecutionException if any task cannot be scheduled
+     *         for execution
+     */
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException
+    {
+        return executorService_.invokeAll(tasks, timeout, unit);
+    }
+    
+    /**
+     * Executes the given tasks, returning the result
+     * of one that has completed successfully (i.e., without throwing
+     * an exception), if any do. Upon normal or exceptional return,
+     * tasks that have not completed are cancelled.
+     * The results of this method are undefined if the given
+     * collection is modified while this operation is in progress.
+     *
+     * @param tasks the collection of tasks
+     * @return the result returned by one of the tasks
+     * @throws InterruptedException if interrupted while waiting
+     * @throws NullPointerException if tasks or any of its elements
+     *         are <tt>null</tt>
+     * @throws IllegalArgumentException if tasks is empty
+     * @throws ExecutionException if no task successfully completes
+     * @throws RejectedExecutionException if tasks cannot be scheduled
+     *         for execution
+     */
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException
+    {
+        return executorService_.invokeAny(tasks);
+    }
+
+    /**
+     * Executes the given tasks, returning the result
+     * of one that has completed successfully (i.e., without throwing
+     * an exception), if any do before the given timeout elapses.
+     * Upon normal or exceptional return, tasks that have not
+     * completed are cancelled.
+     * The results of this method are undefined if the given
+     * collection is modified while this operation is in progress.
+     *
+     * @param tasks the collection of tasks
+     * @param timeout the maximum time to wait
+     * @param unit the time unit of the timeout argument
+     * @return the result returned by one of the tasks.
+     * @throws InterruptedException if interrupted while waiting
+     * @throws NullPointerException if tasks, any of its elements, or
+     *         unit are <tt>null</tt>
+     * @throws TimeoutException if the given timeout elapses before
+     *         any task successfully completes
+     * @throws ExecutionException if no task successfully completes
+     * @throws RejectedExecutionException if tasks cannot be scheduled
+     *         for execution
+     */
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
+    {
+        return executorService_.invokeAny(tasks, timeout, unit);
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/Context.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/Context.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/Context.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/Context.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Context object adding a collection of key/value pairs into ThreadLocalContext.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class Context
+{
+    private Map<Object, Object> ht_;
+    
+    public Context()
+    {
+        ht_ = new HashMap<Object, Object>();
+    }
+    
+    public Object put(Object key, Object value)
+    {
+        return ht_.put(key, value);
+    }
+    
+    public Object get(Object key)
+    {
+        return ht_.get(key);
+    }
+    
+    public void remove(Object key)
+    {
+        ht_.remove(key);
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/ContinuationContext.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/ContinuationContext.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/ContinuationContext.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/ContinuationContext.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import org.apache.cassandra.continuations.Suspendable;
+import org.apache.commons.javaflow.Continuation;
+
+public class ContinuationContext
+{
+    private Continuation continuation_;
+    private Object result_;
+    
+    public ContinuationContext(Continuation continuation)
+    {
+        continuation_ = continuation;        
+    }
+    
+    public Continuation getContinuation()
+    {
+        return continuation_;
+    }
+    
+    public void setContinuation(Continuation continuation)
+    {
+        continuation_ = continuation;
+    }
+    
+    public Object result()
+    {
+        return result_;
+    }
+    
+    public void result(Object result)
+    {
+        result_ = result;
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/ContinuationStage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/ContinuationStage.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/ContinuationStage.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/concurrent/ContinuationStage.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.continuations.Suspendable;
+
+
+public class ContinuationStage implements IStage
+{
+    private String name_;
+    private ContinuationsExecutor executorService_;
+            
+    public ContinuationStage(String name, int numThreads)
+    {        
+        name_ = name;        
+        executorService_ = new ContinuationsExecutor( numThreads,
+                numThreads,
+                Integer.MAX_VALUE,
+                TimeUnit.SECONDS,
+                new LinkedBlockingQueue<Runnable>(),
+                new ThreadFactoryImpl(name)
+                );        
+    }
+    
+    public String getName() 
+    {        
+        return name_;
+    }
+    
+    public ExecutorService getInternalThreadPool()
+    {
+        return executorService_;
+    }
+
+    public Future<Object> execute(Callable<Object> callable) {
+        return executorService_.submit(callable);
+    }
+    
+    public void execute(Runnable runnable) {
+        executorService_.execute(runnable);
+    }
+    
+    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
+    {
+        throw new UnsupportedOperationException("This operation is not supported");
+    }
+    
+    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
+        throw new UnsupportedOperationException("This operation is not supported");
+    }
+    
+    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
+        throw new UnsupportedOperationException("This operation is not supported");
+    }
+    
+    public void shutdown() {  
+        executorService_.shutdownNow(); 
+    }
+    
+    public boolean isShutdown()
+    {
+        return executorService_.isShutdown();
+    }
+    
+    public long getTaskCount(){
+        return (executorService_.getTaskCount() - executorService_.getCompletedTaskCount());
+    }
+}