You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 07:12:49 UTC

svn commit: r749205 [6/16] - in /incubator/cassandra/src/org/apache/cassandra: analytics/ cli/ concurrent/ config/ continuations/ cql/ cql/common/ cql/compiler/ cql/compiler/common/ cql/compiler/parse/ cql/compiler/sem/ cql/driver/ cql/execution/ dht/ ...

Added: incubator/cassandra/src/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.*;
+
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.utils.*;
+
+/**
+ * This is a wrapper class for the <i>ScheduledThreadPoolExecutor</i>. It provides an implementation
+ * for the <i>afterExecute()</i> found in the <i>ThreadPoolExecutor</i> class to log any unexpected 
+ * Runtime Exceptions.
+ * 
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public final class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
+{
+    private static Logger logger_ = Logger.getLogger(DebuggableThreadPoolExecutor.class);    
+    
+    public DebuggableThreadPoolExecutor(int corePoolSize,
+            int maximumPoolSize,
+            long keepAliveTime,
+            TimeUnit unit,
+            BlockingQueue<Runnable> workQueue,
+            ThreadFactory threadFactory)
+    {
+        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
+        super.prestartAllCoreThreads();
+    }
+    
+    /*
+     * 
+     *  (non-Javadoc)
+     * @see java.util.concurrent.ThreadPoolExecutor#afterExecute(java.lang.Runnable, java.lang.Throwable)
+     * Helps us in figuring out why sometimes the threads are getting 
+     * killed and replaced by new ones.
+     */
+    public void afterExecute(Runnable r, Throwable t)
+    {
+        super.afterExecute(r,t);
+        if ( t != null )
+        {  
+            Context ctx = ThreadLocalContext.get();
+            if ( ctx != null )
+            {
+                Object object = ctx.get(r.getClass().getName());
+                
+                if ( object != null )
+                {
+                    logger_.info("**** In afterExecute() " + t.getClass().getName() + " occured while working with " + object + " ****");
+                }
+                else
+                {
+                    logger_.info("**** In afterExecute() " + t.getClass().getName() + " occured ****");
+                }
+            }
+            
+            Throwable cause = t.getCause();
+            if ( cause != null )
+            {
+                logger_.info( LogUtil.throwableToString(cause) );
+            }
+            logger_.info( LogUtil.throwableToString(t) );
+        }
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/concurrent/IContinuable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/concurrent/IContinuable.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/concurrent/IContinuable.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/concurrent/IContinuable.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import org.apache.commons.javaflow.Continuation;
+
+public interface IContinuable
+{
+    public void run(Continuation c);
+}

Added: incubator/cassandra/src/org/apache/cassandra/concurrent/IStage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/concurrent/IStage.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/concurrent/IStage.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/concurrent/IStage.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.*;
+
+/**
+ * An abstraction for stages as described in the SEDA paper by Matt Welsh. 
+ * For reference to the paper look over here 
+ * <a href="http://www.eecs.harvard.edu/~mdw/papers/seda-sosp01.pdf">SEDA: An Architecture for WellConditioned,
+   Scalable Internet Services</a>.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IStage 
+{
+    /**
+     * Get the name of the associated stage
+     * @return
+     */
+    public String getName();
+    
+    /**
+     * Get the thread pool used by this stage 
+     * internally.
+     */
+    public ExecutorService getInternalThreadPool();
+    
+    /**
+     * This method is used to execute a peice of code on
+     * this stage. The idea is that the <i>run()</i> method
+     * of this Runnable instance is invoked on a thread from a
+     * thread pool that belongs to this stage.
+     * @param runnable instance whose run() method needs to be invoked.
+     */
+    public void execute(Runnable runnable);
+    
+    /**
+     * This method is used to execute a peice of code on
+     * this stage which returns a Future pointer. The idea
+     * is that the <i>call()</i> method of this Runnable 
+     * instance is invoked on a thread from a thread pool 
+     * that belongs to this stage.
+     
+     * @param callable instance that needs to be invoked.
+     * @return
+     */
+    public Future<Object> execute(Callable<Object> callable);
+    
+    /**
+     * This method is used to submit tasks to this stage
+     * that execute periodically. 
+     * 
+     * @param command the task to execute.
+     * @param initialDelay the time to delay first execution 
+     * @param unit the time unit of the initialDelay and period parameters 
+     * @return
+     */
+    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); 
+      
+    /**
+     * This method is used to submit tasks to this stage
+     * that execute periodically. 
+     * @param command the task to execute.
+     * @param initialDelay the time to delay first execution
+     * @param period the period between successive executions
+     * @param unit the time unit of the initialDelay and period parameters 
+     * @return
+     */
+    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); 
+    
+    /**
+     * This method is used to submit tasks to this stage
+     * that execute periodically. 
+     * @param command the task to execute.
+     * @param initialDelay the time to delay first execution
+     * @param delay  the delay between the termination of one execution and the commencement of the next.
+     * @param unit the time unit of the initialDelay and delay parameters 
+     * @return
+     */
+    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
+    
+    /**
+     * Shutdown the stage. All the threads of this stage
+     * are forcefully shutdown. Any pending tasks on this
+     * stage could be dropped or the stage could wait for 
+     * these tasks to be completed. This is however an 
+     * implementation detail.
+     */
+    public void shutdown();  
+    
+    /**
+     * Checks if the stage has been shutdown.
+     * @return
+     */
+    public boolean isShutdown();
+    
+    /**
+     * This method returns the number of tasks that are 
+     * pending on this stage to be executed.
+     * @return
+     */
+    public long getTaskCount();
+}

Added: incubator/cassandra/src/org/apache/cassandra/concurrent/MultiThreadedStage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/concurrent/MultiThreadedStage.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/concurrent/MultiThreadedStage.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/concurrent/MultiThreadedStage.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.*;
+
+import javax.naming.OperationNotSupportedException;
+
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * This class is an implementation of the <i>IStage</i> interface. In particular
+ * it is for a stage that has a thread pool with multiple threads. For details 
+ * please refer to the <i>IStage</i> documentation.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class MultiThreadedStage implements IStage 
+{    
+    private String name_;
+    private DebuggableThreadPoolExecutor executorService_;
+            
+    public MultiThreadedStage(String name, int numThreads)
+    {        
+        name_ = name;        
+        executorService_ = new DebuggableThreadPoolExecutor( numThreads,
+                numThreads,
+                Integer.MAX_VALUE,
+                TimeUnit.SECONDS,
+                new LinkedBlockingQueue<Runnable>(),
+                new ThreadFactoryImpl(name)
+                );        
+    }
+    
+    public String getName() 
+    {        
+        return name_;
+    }    
+    
+    public ExecutorService getInternalThreadPool()
+    {
+        return executorService_;
+    }
+
+    public Future<Object> execute(Callable<Object> callable) {
+        return executorService_.submit(callable);
+    }
+    
+    public void execute(Runnable runnable) {
+        executorService_.execute(runnable);
+    }
+    
+    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
+    {
+        throw new UnsupportedOperationException("This operation is not supported");
+    }
+    
+    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
+        throw new UnsupportedOperationException("This operation is not supported");
+    }
+    
+    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
+        throw new UnsupportedOperationException("This operation is not supported");
+    }
+    
+    public void shutdown() {  
+        executorService_.shutdownNow(); 
+    }
+    
+    public boolean isShutdown()
+    {
+        return executorService_.isShutdown();
+    }
+    
+    public long getTaskCount(){
+        return (executorService_.getTaskCount() - executorService_.getCompletedTaskCount());
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/concurrent/RejectedExecutionHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/concurrent/RejectedExecutionHandler.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/concurrent/RejectedExecutionHandler.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/concurrent/RejectedExecutionHandler.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+interface RejectedExecutionHandler
+{
+    public void rejectedExecution(Runnable r, ContinuationsExecutor executor); 
+}

Added: incubator/cassandra/src/org/apache/cassandra/concurrent/SingleThreadedContinuationStage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/concurrent/SingleThreadedContinuationStage.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/concurrent/SingleThreadedContinuationStage.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/concurrent/SingleThreadedContinuationStage.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+public class SingleThreadedContinuationStage implements IStage 
+{
+    protected ContinuationsExecutor executorService_;
+    private String name_;
+
+    public SingleThreadedContinuationStage(String name)
+    {        
+        executorService_ = new ContinuationsExecutor( 1,
+                1,
+                Integer.MAX_VALUE,
+                TimeUnit.SECONDS,
+                new LinkedBlockingQueue<Runnable>(),
+                new ThreadFactoryImpl(name)
+                );        
+        name_ = name;        
+    }
+    
+    /* Implementing the IStage interface methods */
+    
+    public String getName()
+    {
+        return name_;
+    }
+    
+    public ExecutorService getInternalThreadPool()
+    {
+        return executorService_;
+    }
+    
+    public void execute(Runnable runnable)
+    {
+        executorService_.execute(runnable);
+    }
+    
+    public Future<Object> execute(Callable<Object> callable)
+    {
+        return executorService_.submit(callable);
+    }
+    
+    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
+    {
+        //return executorService_.schedule(command, delay, unit);
+        throw new UnsupportedOperationException("This operation is not supported");
+    }
+    
+    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
+    {
+        //return executorService_.scheduleAtFixedRate(command, initialDelay, period, unit);
+        throw new UnsupportedOperationException("This operation is not supported");
+    }
+    
+    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
+    {
+        //return executorService_.scheduleWithFixedDelay(command, initialDelay, delay, unit);
+        throw new UnsupportedOperationException("This operation is not supported");
+    }
+    
+    public void shutdown()
+    {
+        executorService_.shutdownNow();
+    }
+    
+    public boolean isShutdown()
+    {
+        return executorService_.isShutdown();
+    }    
+    
+    public long getTaskCount(){
+        return (executorService_.getTaskCount() - executorService_.getCompletedTaskCount());
+    }
+    /* Finished implementing the IStage interface methods */
+}
+

Added: incubator/cassandra/src/org/apache/cassandra/concurrent/SingleThreadedStage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/concurrent/SingleThreadedStage.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/concurrent/SingleThreadedStage.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/concurrent/SingleThreadedStage.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.cassandra.net.*;
+
+/**
+ * This class is an implementation of the <i>IStage</i> interface. In particular
+ * it is for a stage that has a thread pool with a single thread. For details 
+ * please refer to the <i>IStage</i> documentation.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class SingleThreadedStage implements IStage 
+{
+    protected DebuggableThreadPoolExecutor executorService_;
+    private String name_;
+
+	public SingleThreadedStage(String name)
+    {
+        //executorService_ = new DebuggableScheduledThreadPoolExecutor(1,new ThreadFactoryImpl(name));
+        executorService_ = new DebuggableThreadPoolExecutor( 1,
+                1,
+                Integer.MAX_VALUE,
+                TimeUnit.SECONDS,
+                new LinkedBlockingQueue<Runnable>(),
+                new ThreadFactoryImpl(name)
+                );        
+        name_ = name;        
+	}
+	
+    /* Implementing the IStage interface methods */
+    
+    public String getName()
+    {
+        return name_;
+    }
+    
+    public ExecutorService getInternalThreadPool()
+    {
+        return executorService_;
+    }
+    
+    public void execute(Runnable runnable)
+    {
+        executorService_.execute(runnable);
+    }
+    
+    public Future<Object> execute(Callable<Object> callable)
+    {
+        return executorService_.submit(callable);
+    }
+    
+    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
+    {
+        //return executorService_.schedule(command, delay, unit);
+        throw new UnsupportedOperationException("This operation is not supported");
+    }
+    
+    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
+    {
+        //return executorService_.scheduleAtFixedRate(command, initialDelay, period, unit);
+        throw new UnsupportedOperationException("This operation is not supported");
+    }
+    
+    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
+    {
+        //return executorService_.scheduleWithFixedDelay(command, initialDelay, delay, unit);
+        throw new UnsupportedOperationException("This operation is not supported");
+    }
+    
+    public void shutdown()
+    {
+        executorService_.shutdownNow();
+    }
+    
+    public boolean isShutdown()
+    {
+        return executorService_.isShutdown();
+    }    
+    
+    public long getTaskCount(){
+        return (executorService_.getTaskCount() - executorService_.getCompletedTaskCount());
+    }
+    /* Finished implementing the IStage interface methods */
+}

Added: incubator/cassandra/src/org/apache/cassandra/concurrent/StageManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/concurrent/StageManager.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/concurrent/StageManager.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/concurrent/StageManager.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.cassandra.continuations.Suspendable;
+
+
+/**
+ * This class manages all stages that exist within a process. The application registers
+ * and de-registers stages with this abstraction. Any component that has the <i>ID</i> 
+ * associated with a stage can obtain a handle to actual stage.
+ * 
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class StageManager
+{
+    private static Map<String, IStage > stageQueues_ = new HashMap<String, IStage>();
+    
+    /**
+     * Register a stage with the StageManager
+     * @param stageName stage name.
+     * @param stage stage for the respective message types.
+     */
+    public static void registerStage(String stageName, IStage stage)
+    {
+        stageQueues_.put(stageName, stage);
+    }
+    
+    /**
+     * Returns the stage that we are currently executing on.
+     * This relies on the fact that the thread names in the
+     * stage have the name of the stage as the prefix.
+     * @return
+     */
+    public static IStage getCurrentStage()
+    {
+        String name = Thread.currentThread().getName();
+        String[] peices = name.split(":");
+        IStage stage = getStage(peices[0]);
+        return stage;
+    }
+
+    /**
+     * Retrieve a stage from the StageManager
+     * @param stageName name of the stage to be retrieved.
+    */
+    public static IStage getStage(String stageName)
+    {
+        return stageQueues_.get(stageName);
+    }
+    
+    /**
+     * Retrieve the internal thread pool associated with the
+     * specified stage name.
+     * @param stageName name of the stage.
+     */
+    public static ExecutorService getStageInternalThreadPool(String stageName)
+    {
+        IStage stage = getStage(stageName);
+        if ( stage == null )
+            throw new IllegalArgumentException("No stage registered with name " + stageName);
+        return stage.getInternalThreadPool();
+    }
+
+    /**
+     * Deregister a stage from StageManager
+     * @param stageName stage name.
+     */
+    public static void deregisterStage(String stageName)
+    {
+        stageQueues_.remove(stageName);
+    }
+
+    /**
+     * This method gets the number of tasks on the
+     * stage's internal queue.
+     * @param stage name of the stage
+     * @return
+     */
+    public static long getStageTaskCount(String stage)
+    {
+        return stageQueues_.get(stage).getTaskCount();
+    }
+
+    /**
+     * This method shuts down all registered stages.
+     */
+    public static void shutdown()
+    {
+        Set<String> stages = stageQueues_.keySet();
+        for ( String stage : stages )
+        {
+            IStage registeredStage = stageQueues_.get(stage);
+            registeredStage.shutdown();
+        }
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/concurrent/ThreadFactoryImpl.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/concurrent/ThreadFactoryImpl.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/concurrent/ThreadFactoryImpl.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/concurrent/ThreadFactoryImpl.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+import org.apache.cassandra.utils.*;
+
+/**
+ * This class is an implementation of the <i>ThreadFactory</i> interface. This 
+ * is useful to give Java threads meaningful names which is useful when using 
+ * a tool like JConsole.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class ThreadFactoryImpl implements ThreadFactory
+{
+    protected String id_;
+    protected ThreadGroup threadGroup_;
+    protected final AtomicInteger threadNbr_ = new AtomicInteger(1);
+    
+    public ThreadFactoryImpl(String id)
+    {
+        SecurityManager sm = System.getSecurityManager();
+        threadGroup_ = ( sm != null ) ? sm.getThreadGroup() : Thread.currentThread().getThreadGroup();
+        id_ = id;
+    }    
+    
+    public Thread newThread(Runnable runnable)
+    {        
+        String name = id_ + ":" + threadNbr_.getAndIncrement();       
+        Thread thread = new Thread(threadGroup_, runnable, name);        
+        return thread;
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/concurrent/ThreadLocalContext.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/concurrent/ThreadLocalContext.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/concurrent/ThreadLocalContext.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/concurrent/ThreadLocalContext.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+/**
+ * Use this implementation over Java's ThreadLocal or InheritableThreadLocal when 
+ * you need to add multiple key/value pairs into ThreadLocalContext for a given thread.
+ * 
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+
+public class ThreadLocalContext
+{
+    private static InheritableThreadLocal<Context> tls_ = new InheritableThreadLocal<Context>();
+
+    public static void put(Context value)
+    {
+        tls_.set(value);
+    }
+
+    public static Context get()
+    {
+        return tls_.get();
+    }
+}
\ No newline at end of file

Added: incubator/cassandra/src/org/apache/cassandra/config/CFMetaData.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/config/CFMetaData.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/config/CFMetaData.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/config/CFMetaData.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.config;
+
+public class CFMetaData
+{
+    public String tableName;            // name of table which has this column family
+    public String cfName;               // name of the column family
+    public String columnType;           // type: super, standard, etc.
+    public String indexProperty_;       // name sorted, time stamp sorted etc. 
+
+    // The user chosen names (n_) for various parts of data in a column family.
+    // CQL queries, for instance, will refer to/extract data within a column
+    // family using these logical names.
+    public String n_rowKey;               
+    public String n_superColumnMap;     // only used if this is a super column family
+    public String n_superColumnKey;     // only used if this is a super column family
+    public String n_columnMap;
+    public String n_columnKey;
+    public String n_columnValue;
+    public String n_columnTimestamp;
+ 
+    // a quick and dirty pretty printer for describing the column family...
+    public String pretty()
+    {
+        String desc;
+        desc = n_columnMap + "(" + n_columnKey + ", " + n_columnValue + ", " + n_columnTimestamp + ")";
+        if ("Super".equals(columnType))
+        {
+            desc = n_superColumnMap + "(" + n_superColumnKey + ", " + desc + ")"; 
+        }
+        desc = tableName + "." + cfName + "(" + n_rowKey + ", " + desc + ")\n";
+        
+        desc += "Column Family Type: " + columnType + "\n" +
+                "Columns Sorted By: " + indexProperty_ + "\n";
+        return desc;
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/config/DatabaseDescriptor.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/config/DatabaseDescriptor.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/config/DatabaseDescriptor.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,743 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.config;
+
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.io.*;
+
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.TypeInfo;
+import org.apache.cassandra.db.Table.TableMetadata;
+import org.apache.cassandra.utils.FileUtils;
+import org.apache.cassandra.utils.XMLUtils;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.apache.cassandra.io.*;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class DatabaseDescriptor
+{
+    public static final String random_ = "RANDOM";
+    public static final String ophf_ = "OPHF";
+    private static int storagePort_ = 7000;
+    private static int controlPort_ = 7001;
+    private static int httpPort_ = 7002;
+    private static String clusterName_ = "Test";
+    private static int replicationFactor_ = 3;
+    private static long rpcTimeoutInMillis_ = 2000;
+    private static Set<String> seeds_ = new HashSet<String>();
+    private static String metadataDirectory_;
+    private static String snapshotDirectory_;
+    /* Keeps the list of Ganglia servers to contact */
+    private static String[] gangliaServers_ ;
+    /* Keeps the list of map output directories */
+    private static String[] mapOutputDirectories_;
+    /* Keeps the list of data file directories */
+    private static String[] dataFileDirectories_;
+    /* Current index into the above list of directories */
+    private static int currentIndex_ = 0;
+    private static String logFileDirectory_;
+    private static String bootstrapFileDirectory_;
+    private static int logRotationThreshold_ = 128*1024*1024;
+    private static boolean fastSync_ = false;
+    private static boolean rackAware_ = false;
+    private static int threadsPerPool_ = 4;
+    private static List<String> tables_ = new ArrayList<String>();
+    private static Set<String> applicationColumnFamilies_ = new HashSet<String>();
+
+    // Default descriptive names for use in CQL. The user can override
+    // these choices in the config file. These are not case sensitive.
+    // Hence, these are stored in UPPER case for easy comparison.
+    private static String d_rowKey_           = "ROW_KEY";
+    private static String d_superColumnMap_   = "SUPER_COLUMN_MAP";
+    private static String d_superColumnKey_   = "SUPER_COLUMN_KEY";
+    private static String d_columnMap_        = "COLUMN_MAP";
+    private static String d_columnKey_        = "COLUMN_KEY";
+    private static String d_columnValue_      = "COLUMN_VALUE";
+    private static String d_columnTimestamp_  = "COLUMN_TIMESTAMP";
+
+    /*
+     * A map from table names to the set of column families for the table and the
+     * corresponding meta data for that column family.
+    */
+    private static Map<String, Map<String, CFMetaData>> tableToCFMetaDataMap_;
+    /* Hashing strategy Random or OPHF */
+    private static String hashingStrategy_ = DatabaseDescriptor.random_;
+    /* if the size of columns or super-columns are more than this, indexing will kick in */
+    private static int columnIndexSizeInKB_;
+    /* Size of touch key cache */
+    private static int touchKeyCacheSize_ = 1024;
+    /* Number of hours to keep a memtable in memory */
+    private static int memtableLifetime_ = 6;
+    /* Size of the memtable in memory before it is dumped */
+    private static int memtableSize_ = 128;
+    /* Number of objects in millions in the memtable before it is dumped */
+    private static int memtableObjectCount_ = 1;
+    /* 
+     * This parameter enables or disables consistency checks. 
+     * If set to false the read repairs are disable for very
+     * high throughput on reads but at the cost of consistency.
+    */
+    private static boolean doConsistencyCheck_ = true;
+    /* Address of ZooKeeper cell */
+    private static String zkAddress_;
+    /* Callout directories */
+    private static String calloutLocation_;
+    /* Job Jar Location */
+    private static String jobJarFileLocation_;
+    /* Address where to run the job tracker */
+    private static String jobTrackerHost_;    
+    /* Zookeeper session timeout. */
+    private static int zkSessionTimeout_ = 30000;
+    
+    // the path qualified config file (storage-conf.xml) name
+    private static String configFileName_;
+
+    public static Map<String, Map<String, CFMetaData>> init(String filePath) throws Throwable
+    {
+        /* Read the configuration file to retrieve DB related properties. */
+        String file = filePath + System.getProperty("file.separator") + "storage-conf.xml";
+        return initInternal(file);
+    }
+
+    public static Map<String, Map<String, CFMetaData>> init() throws Throwable
+    {
+        /* Read the configuration file to retrieve DB related properties. */
+        configFileName_ = System.getProperty("storage-config") + System.getProperty("file.separator") + "storage-conf.xml";
+        return initInternal(configFileName_);
+    }
+    
+    public static Map<String, Map<String, CFMetaData>> initInternal(String file) throws Throwable
+    {
+        String os = System.getProperty("os.name");
+        XMLUtils xmlUtils = new XMLUtils(file);        
+
+        /* Cluster Name */
+        clusterName_ = xmlUtils.getNodeValue("/Storage/ClusterName");
+
+        /* Ganglia servers contact list */
+        gangliaServers_ = xmlUtils.getNodeValues("/Storage/GangliaServers/GangliaServer");
+        
+        /* ZooKeeper's address */
+        zkAddress_ = xmlUtils.getNodeValue("/Storage/ZookeeperAddress");
+        
+        /* Hashing strategy */
+        hashingStrategy_ = xmlUtils.getNodeValue("/Storage/HashingStrategy");
+        /* Callout location */
+        calloutLocation_ = xmlUtils.getNodeValue("/Storage/CalloutLocation");
+        
+        /* JobTracker address */
+        jobTrackerHost_ = xmlUtils.getNodeValue("/Storage/JobTrackerHost");
+        
+        /* Job Jar file location */
+        jobJarFileLocation_ = xmlUtils.getNodeValue("/Storage/JobJarFileLocation");       
+        
+        /* Zookeeper's session timeout */
+        String zkSessionTimeout = xmlUtils.getNodeValue("/Storage/ZookeeperSessionTimeout");
+        if ( zkSessionTimeout != null )
+            zkSessionTimeout_ = Integer.parseInt(zkSessionTimeout);
+
+        /* Data replication factor */
+        String replicationFactor = xmlUtils.getNodeValue("/Storage/ReplicationFactor");
+        if ( replicationFactor != null )
+        	replicationFactor_ = Integer.parseInt(replicationFactor);
+
+        /* RPC Timeout */
+        String rpcTimeoutInMillis = xmlUtils.getNodeValue("/Storage/RpcTimeoutInMillis");
+        if ( rpcTimeoutInMillis != null )
+        	rpcTimeoutInMillis_ = Integer.parseInt(rpcTimeoutInMillis);
+
+        /* Thread per pool */
+        String threadsPerPool = xmlUtils.getNodeValue("/Storage/ThreadsPerPool");
+        if ( threadsPerPool != null )
+            threadsPerPool_ = Integer.parseInt(threadsPerPool);
+
+        /* TCP port on which the storage system listens */
+        String port = xmlUtils.getNodeValue("/Storage/StoragePort");
+        if ( port != null )
+            storagePort_ = Integer.parseInt(port);
+
+        /* UDP port for control messages */
+        port = xmlUtils.getNodeValue("/Storage/ControlPort");
+        if ( port != null )
+            controlPort_ = Integer.parseInt(port);
+
+        /* HTTP port for HTTP messages */
+        port = xmlUtils.getNodeValue("/Storage/HttpPort");
+        if ( port != null )
+            httpPort_ = Integer.parseInt(port);
+
+        /* Touch Key Cache Size */
+        String touchKeyCacheSize = xmlUtils.getNodeValue("/Storage/TouchKeyCacheSize");
+        if ( touchKeyCacheSize != null )
+            touchKeyCacheSize_ = Integer.parseInt(touchKeyCacheSize);
+
+        /* Number of days to keep the memtable around w/o flushing */
+        String lifetime = xmlUtils.getNodeValue("/Storage/MemtableLifetimeInDays");
+        if ( lifetime != null )
+            memtableLifetime_ = Integer.parseInt(lifetime);
+        
+        /* Size of the memtable in memory in MB before it is dumped */
+        String memtableSize = xmlUtils.getNodeValue("/Storage/MemtableSizeInMB");
+        if ( memtableSize != null )
+        	memtableSize_ = Integer.parseInt(memtableSize);
+        /* Number of objects in millions in the memtable before it is dumped */
+        String memtableObjectCount = xmlUtils.getNodeValue("/Storage/MemtableObjectCountInMillions");
+        if ( memtableObjectCount != null )
+        	memtableObjectCount_ = Integer.parseInt(memtableObjectCount);
+        
+        /* This parameter enables or disables consistency checks. 
+         * If set to false the read repairs are disable for very
+         * high throughput on reads but at the cost of consistency.*/
+        String doConsistencyCheck = xmlUtils.getNodeValue("/Storage/DoConsistencyChecksBoolean");
+        if ( doConsistencyCheck != null )
+        	doConsistencyCheck_ = Boolean.parseBoolean(doConsistencyCheck);
+        
+        
+        /* read the size at which we should do column indexes */
+        String columnIndexSizeInKB = xmlUtils.getNodeValue("/Storage/ColumnIndexSizeInKB");
+        if(columnIndexSizeInKB == null)
+        {
+        	columnIndexSizeInKB_ = 64;
+        }
+        else
+        {
+        	columnIndexSizeInKB_ = Integer.parseInt(columnIndexSizeInKB);
+        }
+
+        /* metadata directory */
+        metadataDirectory_ = xmlUtils.getNodeValue("/Storage/MetadataDirectory");
+        if ( metadataDirectory_ != null )
+            FileUtils.createDirectory(metadataDirectory_);
+        else
+        {
+            if ( os.equals("Linux") )
+            {
+                metadataDirectory_ = "/var/storage/system";
+            }
+        }
+
+        /* snapshot directory */
+        snapshotDirectory_ = xmlUtils.getNodeValue("/Storage/SnapshotDirectory");
+        if ( snapshotDirectory_ != null )
+            FileUtils.createDirectory(snapshotDirectory_);
+        else
+        {
+            	snapshotDirectory_ = metadataDirectory_ + System.getProperty("file.separator") + "snapshot";
+        }
+        
+        /* map output directory */
+        mapOutputDirectories_ = xmlUtils.getNodeValues("/Storage/MapOutputDirectories/MapOutputDirectory");
+        if ( mapOutputDirectories_.length > 0 )
+        {
+            for ( String mapOutputDirectory : mapOutputDirectories_ )
+                FileUtils.createDirectory(mapOutputDirectory);
+        }
+        
+        /* data file directory */
+        dataFileDirectories_ = xmlUtils.getNodeValues("/Storage/DataFileDirectories/DataFileDirectory");
+        if ( dataFileDirectories_.length > 0 )
+        {
+        	for ( String dataFileDirectory : dataFileDirectories_ )
+        		FileUtils.createDirectory(dataFileDirectory);
+        }
+        else
+        {
+            if ( os.equals("Linux") )
+            {
+                dataFileDirectories_ = new String[]{"/var/storage/data"};
+            }
+        }
+
+        /* bootstrap file directory */
+        bootstrapFileDirectory_ = xmlUtils.getNodeValue("/Storage/BootstrapFileDirectory");
+        if ( bootstrapFileDirectory_ != null )
+            FileUtils.createDirectory(bootstrapFileDirectory_);
+        else
+        {
+            if ( os.equals("Linux") )
+            {
+                bootstrapFileDirectory_ = "/var/storage/bootstrap";
+            }
+        }
+
+        /* commit log directory */
+        logFileDirectory_ = xmlUtils.getNodeValue("/Storage/CommitLogDirectory");
+        if ( logFileDirectory_ != null )
+            FileUtils.createDirectory(logFileDirectory_);
+        else
+        {
+            if ( os.equals("Linux") )
+            {
+                logFileDirectory_ = "/var/storage/commitlog";
+            }
+        }
+
+        /* threshold after which commit log should be rotated. */
+        String value = xmlUtils.getNodeValue("/Storage/CommitLogRotationThresholdInMB");
+        if ( value != null)
+            logRotationThreshold_ = Integer.parseInt(value) * 1024 * 1024;
+
+        /* fast sync option */
+        value = xmlUtils.getNodeValue("/Storage/CommitLogFastSync");
+        if ( value != null )
+            fastSync_ = Boolean.parseBoolean(value);
+
+        tableToCFMetaDataMap_ = new HashMap<String, Map<String, CFMetaData>>();
+
+        /* Rack Aware option */
+        value = xmlUtils.getNodeValue("/Storage/RackAware");
+        if ( value != null )
+            rackAware_ = Boolean.parseBoolean(value);
+
+        /* Read the table related stuff from config */
+        NodeList tables = xmlUtils.getRequestedNodeList("/Storage/Tables/Table");
+        int size = tables.getLength();
+        for ( int i = 0; i < size; ++i )
+        {
+            Node table = tables.item(i);
+
+            /* parsing out the table name */
+            String tName = XMLUtils.getAttributeValue(table, "Name");
+            tables_.add(tName);
+            tableToCFMetaDataMap_.put(tName, new HashMap<String, CFMetaData>());
+
+            String xqlTable = "/Storage/Tables/Table[@Name='" + tName + "']/";
+            NodeList columnFamilies = xmlUtils.getRequestedNodeList(xqlTable + "ColumnFamily");
+
+            // get name of the rowKey for this table
+            String n_rowKey = xmlUtils.getNodeValue(xqlTable + "RowKey");
+            if (n_rowKey == null)
+                n_rowKey = d_rowKey_;
+
+            //NodeList columnFamilies = xmlUtils.getRequestedNodeList(table, "ColumnFamily");            
+            int size2 = columnFamilies.getLength();
+
+            for ( int j = 0; j < size2; ++j )
+            {
+                Node columnFamily = columnFamilies.item(j);
+                String cName = XMLUtils.getAttributeValue(columnFamily, "Name");
+                String xqlCF = xqlTable + "ColumnFamily[@Name='" + cName + "']/";
+
+                /* squirrel away the application column families */
+                applicationColumnFamilies_.add(cName);
+
+                // Parse out the column type
+                String columnType = xmlUtils.getAttributeValue(columnFamily, "ColumnType");
+                columnType = ColumnFamily.getColumnType(columnType);
+
+                // Parse out the column family sorting property for columns
+                String columnIndexProperty = XMLUtils.getAttributeValue(columnFamily, "ColumnSort");
+                String columnIndexType = ColumnFamily.getColumnSortProperty(columnIndexProperty);
+
+                // Parse out user-specified logical names for the various dimensions
+                // of a the column family from the config.
+                String n_superColumnMap = xmlUtils.getNodeValue(xqlCF + "SuperColumnMap");
+                if (n_superColumnMap == null)
+                    n_superColumnMap = d_superColumnMap_;
+                
+                String n_superColumnKey = xmlUtils.getNodeValue(xqlCF + "SuperColumnKey");
+                if (n_superColumnKey == null)
+                    n_superColumnKey = d_superColumnKey_;
+                
+                String n_columnMap = xmlUtils.getNodeValue(xqlCF + "ColumnMap");
+                if (n_columnMap == null)
+                    n_columnMap = d_columnMap_;
+                
+                String n_columnKey = xmlUtils.getNodeValue(xqlCF + "ColumnKey");
+                if (n_columnKey == null)
+                    n_columnKey = d_columnKey_;
+
+                String n_columnValue = xmlUtils.getNodeValue(xqlCF + "ColumnValue");
+                if (n_columnValue == null)
+                    n_columnValue = d_columnValue_;
+
+                String n_columnTimestamp = xmlUtils.getNodeValue(xqlCF + "ColumnTimestamp");
+                if (n_columnTimestamp == null)
+                    n_columnTimestamp = d_columnTimestamp_;
+
+                // now populate the column family meta data and
+                // insert it into the table dictionary. 
+                CFMetaData cfMetaData = new CFMetaData();
+                
+                cfMetaData.tableName = tName;
+                cfMetaData.cfName = cName;
+                
+                cfMetaData.columnType = columnType;
+                cfMetaData.indexProperty_ = columnIndexType;
+
+                cfMetaData.n_rowKey = n_rowKey;
+                cfMetaData.n_columnMap = n_columnMap;
+                cfMetaData.n_columnKey = n_columnKey;
+                cfMetaData.n_columnValue = n_columnValue;
+                cfMetaData.n_columnTimestamp = n_columnTimestamp;
+                if ("Super".equals(columnType))
+                {
+                    cfMetaData.n_superColumnKey = n_superColumnKey;
+                    cfMetaData.n_superColumnMap = n_superColumnMap;
+                }
+
+                tableToCFMetaDataMap_.get(tName).put(cName, cfMetaData);
+            }
+        }
+
+        /* Load the seeds for node contact points */
+        String[] seeds = xmlUtils.getNodeValues("/Storage/Seeds/Seed");
+        for( int i = 0; i < seeds.length; ++i )
+        {            
+            seeds_.add( seeds[i] );
+        }
+        return tableToCFMetaDataMap_;
+    }
+    
+    public static String getHashingStrategy()
+    {
+        return hashingStrategy_;
+    }
+    
+    public static String getZkAddress()
+    {
+        return zkAddress_;
+    }
+    
+    public static String getCalloutLocation()
+    {
+        return calloutLocation_;
+    }
+    
+    public static String getJobTrackerAddress()
+    {
+        return jobTrackerHost_;
+    }
+    
+    public static int getZkSessionTimeout()
+    {
+        return zkSessionTimeout_;
+    }
+
+    public static int getColumnIndexSize()
+    {
+    	return columnIndexSizeInKB_ * 1024;
+    }
+
+   
+    public static int getMemtableLifetime()
+    {
+      return memtableLifetime_;
+    }
+
+    public static int getMemtableSize()
+    {
+      return memtableSize_;
+    }
+
+    public static int getMemtableObjectCount()
+    {
+      return memtableObjectCount_;
+    }
+
+    public static boolean getConsistencyCheck()
+    {
+      return doConsistencyCheck_;
+    }
+
+    public static String getClusterName()
+    {
+        return clusterName_;
+    }
+
+    public static String getConfigFileName() {
+        return configFileName_;
+    }
+    
+    public static boolean isApplicationColumnFamily(String columnFamily)
+    {
+        return applicationColumnFamilies_.contains(columnFamily);
+    }
+
+    public static int getTouchKeyCacheSize()
+    {
+        return touchKeyCacheSize_;
+    }
+    
+    public static String getJobJarLocation()
+    {
+        return jobJarFileLocation_;
+    }
+
+    public static String getGangliaServers()
+    {
+    	StringBuilder sb = new StringBuilder();
+    	for ( int i = 0; i < gangliaServers_.length; ++i )
+    	{
+    		sb.append(gangliaServers_[i]);
+    		if ( i != (gangliaServers_.length - 1) )
+    			sb.append(", ");
+    	}
+    	return sb.toString();
+    }
+    
+    public static Map<String, CFMetaData> getTableMetaData(String table)
+    {
+        return tableToCFMetaDataMap_.get(table);
+    }
+
+    /*
+     * Given a table name & column family name, get the column family
+     * meta data. If the table name or column family name is not valid
+     * this function returns null.
+     */
+    public static CFMetaData getCFMetaData(String table, String cfName)
+    {
+        Map<String, CFMetaData> cfInfo = tableToCFMetaDataMap_.get(table);
+        if (cfInfo == null)
+            return null;
+        
+        return cfInfo.get(cfName);
+    }
+    
+    public static String getColumnType(String cfName)
+    {
+        String table = getTables().get(0);
+        CFMetaData cfMetaData = getCFMetaData(table, cfName);
+        
+        if (cfMetaData == null)
+            return null;
+        return cfMetaData.columnType;
+    }
+
+    public static boolean isNameSortingEnabled(String cfName)
+    {
+        String table = getTables().get(0);
+        CFMetaData cfMetaData = getCFMetaData(table, cfName);
+
+        if (cfMetaData == null)
+            return false;
+
+    	return "Name".equals(cfMetaData.indexProperty_);
+    }
+    
+    public static boolean isTimeSortingEnabled(String cfName)
+    {
+        String table = getTables().get(0);
+        CFMetaData cfMetaData = getCFMetaData(table, cfName);
+
+        if (cfMetaData == null)
+            return false;
+
+        return "Time".equals(cfMetaData.indexProperty_);
+    }
+    
+
+    public static List<String> getTables()
+    {
+        return tables_;
+    }
+
+    public static void  setTables(String table)
+    {
+        tables_.add(table);
+    }
+
+    public static int getStoragePort()
+    {
+        return storagePort_;
+    }
+
+    public static int getControlPort()
+    {
+        return controlPort_;
+    }
+
+    public static int getHttpPort()
+    {
+        return httpPort_;
+    }
+
+    public static int getReplicationFactor()
+    {
+        return replicationFactor_;
+    }
+
+    public static long getRpcTimeout()
+    {
+        return rpcTimeoutInMillis_;
+    }
+
+    public static int getThreadsPerPool()
+    {
+        return threadsPerPool_;
+    }
+
+    public static String getMetadataDirectory()
+    {
+        return metadataDirectory_;
+    }
+
+    public static void setMetadataDirectory(String metadataDirectory)
+    {
+        metadataDirectory_ = metadataDirectory;
+    }
+
+    public static String getSnapshotDirectory()
+    {
+        return snapshotDirectory_;
+    }
+
+    public static void setSnapshotDirectory(String snapshotDirectory)
+    {
+    	snapshotDirectory_ = snapshotDirectory;
+    }
+    
+    public static String[] getAllMapOutputDirectories()
+    {
+        return mapOutputDirectories_;
+    }
+    
+    public static String getMapOutputLocation()
+    {
+        String mapOutputDirectory = mapOutputDirectories_[currentIndex_];
+        return mapOutputDirectory;
+    }
+
+    public static String[] getAllDataFileLocations()
+    {
+        return dataFileDirectories_;
+    }
+
+    public static String getDataFileLocation()
+    {
+    	String dataFileDirectory = dataFileDirectories_[currentIndex_];
+        return dataFileDirectory;
+    }
+    
+    public static String getCompactionFileLocation()
+    {
+    	String dataFileDirectory = dataFileDirectories_[currentIndex_];
+    	currentIndex_ = (currentIndex_ + 1 )%dataFileDirectories_.length ;
+        return dataFileDirectory;
+    }
+
+    public static String getBootstrapFileLocation()
+    {
+        return bootstrapFileDirectory_;
+    }
+
+    public static void setBootstrapFileLocation(String bfLocation)
+    {
+        bootstrapFileDirectory_ = bfLocation;
+    }
+
+    public static int getLogFileSizeThreshold()
+    {
+        return logRotationThreshold_;
+    }
+
+    public static String getLogFileLocation()
+    {
+        return logFileDirectory_;
+    }
+
+    public static void setLogFileLocation(String logLocation)
+    {
+        logFileDirectory_ = logLocation;
+    }
+
+    public static boolean isFastSync()
+    {
+        return fastSync_;
+    }
+
+    public static boolean isRackAware()
+    {
+        return rackAware_;
+    }
+
+    public static Set<String> getSeeds()
+    {
+        return seeds_;
+    }
+
+    public static String getColumnFamilyType(String cfName)
+    {
+        String cfType = getColumnType(cfName);
+        if ( cfType == null )
+            cfType = "Standard";
+    	return cfType;
+    }
+
+    /*
+     * Loop through all the disks to see which disk has the max free space
+     * return the disk with max free space for compactions. If the size of the expected
+     * compacted file is greater than the max disk space available return null, we cannot
+     * do compaction in this case.
+     */
+    public static String getCompactionFileLocation(long expectedCompactedFileSize)
+    {
+      long maxFreeDisk = 0;
+      int maxDiskIndex = 0;
+      String dataFileDirectory = null;
+      for ( int i = 0 ; i < dataFileDirectories_.length ; i++ )
+      {
+        File f = new File(dataFileDirectories_[i]);
+        if( maxFreeDisk < f.getUsableSpace())
+        {
+          maxFreeDisk = f.getUsableSpace();
+          maxDiskIndex = i;
+        }
+      }
+      // Load factor of 0.9 we do not want to use the entire disk that is too risky.
+      maxFreeDisk = (long)(0.9 * maxFreeDisk);
+      if( expectedCompactedFileSize < maxFreeDisk )
+      {
+        dataFileDirectory = dataFileDirectories_[maxDiskIndex];
+        currentIndex_ = (maxDiskIndex + 1 )%dataFileDirectories_.length ;
+      }
+      else
+      {
+        currentIndex_ = maxDiskIndex;
+      }
+        return dataFileDirectory;
+    }
+    
+    public static TypeInfo getTypeInfo(String cfName)
+    {
+        String table = DatabaseDescriptor.getTables().get(0);
+        CFMetaData cfMetadata = DatabaseDescriptor.getCFMetaData(table, cfName);
+        if ( cfMetadata.indexProperty_.equals("Name") )
+        {
+            return TypeInfo.STRING;
+        }
+        else
+        {
+            return TypeInfo.LONG;
+        }
+    }
+    
+    public static void main(String[] args) throws Throwable
+    {
+        DatabaseDescriptor.initInternal("C:\\Engagements\\Cassandra-Golden\\storage-conf.xml");
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/continuations/CAgent.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/continuations/CAgent.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/continuations/CAgent.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/continuations/CAgent.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.continuations;
+
+import java.lang.instrument.Instrumentation;
+
+import org.apache.commons.javaflow.bytecode.transformation.bcel.BcelClassTransformer;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class CAgent
+{
+    public static void premain(String agentArguments, Instrumentation instrumentation)
+    {        
+        instrumentation.addTransformer(new ContinuationClassTransformer(agentArguments, new BcelClassTransformer()));        
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/continuations/ContinuationClassTransformer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/continuations/ContinuationClassTransformer.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/continuations/ContinuationClassTransformer.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/continuations/ContinuationClassTransformer.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.continuations;
+
+import java.lang.annotation.Annotation;
+import java.lang.instrument.ClassFileTransformer;
+import java.lang.instrument.IllegalClassFormatException;
+import java.security.ProtectionDomain;
+import java.util.List;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.StringTokenizer;
+import org.objectweb.asm.ClassReader;
+import org.objectweb.asm.tree.AnnotationNode;
+import org.objectweb.asm.tree.ClassNode;
+import org.objectweb.asm.tree.MethodNode;
+import org.apache.commons.javaflow.bytecode.transformation.ResourceTransformer;
+import org.apache.commons.javaflow.bytecode.transformation.bcel.BcelClassTransformer;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class ContinuationClassTransformer implements ClassFileTransformer
+{
+    private static final String targetAnnotation_ = "Suspendable"; 
+    private ResourceTransformer transformer_;    
+    
+    public ContinuationClassTransformer(String agentArguments, ResourceTransformer transformer)
+    {
+        super();
+        transformer_ = transformer;        
+    }
+    
+    public byte[] transform(ClassLoader classLoader, String className, Class redefiningClass, ProtectionDomain domain, byte[] bytes) throws IllegalClassFormatException
+    {              
+        /*
+         * Use the ASM class reader to see which classes support
+         * the Suspendable annotation. If they do then those 
+         * classes need to have their bytecodes transformed for
+         * Continuation support. 
+        */                
+        ClassReader classReader = new ClassReader(bytes);
+        ClassNode classNode = new ClassNode();
+        classReader.accept(classNode, true);       
+        List<AnnotationNode> annotationNodes = classNode.visibleAnnotations;
+        
+        for( AnnotationNode annotationNode : annotationNodes )
+        {            
+            if ( annotationNode.desc.indexOf(ContinuationClassTransformer.targetAnnotation_) != -1 )
+            {                
+                bytes = transformer_.transform(bytes);
+            }
+        }                                
+        return bytes;
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/continuations/Suspendable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/continuations/Suspendable.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/continuations/Suspendable.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/continuations/Suspendable.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.continuations;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+
+@Retention(RetentionPolicy.RUNTIME)
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public @interface Suspendable
+{
+
+}

Added: incubator/cassandra/src/org/apache/cassandra/cql/common/BindOperand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/cql/common/BindOperand.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/cql/common/BindOperand.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/cql/common/BindOperand.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql.common;
+
+import org.apache.cassandra.cql.execution.RuntimeErrorMsg;
+
+/**
+ * BindOperand: 
+ * Represents a bind variable in the CQL statement. Lives
+ * in the shared execution plan.
+ */
+public class BindOperand implements OperandDef 
+{
+    int bindIndex_;  // bind position
+
+    public BindOperand(int bindIndex)
+    {
+        bindIndex_ = bindIndex;
+    }
+
+    public Object get()
+    {
+        // TODO: Once bind variables are supported, the get() will extract
+        // the value of the bind at position "bindIndex_" from the execution
+        // context.
+        throw new RuntimeException(RuntimeErrorMsg.IMPLEMENTATION_RESTRICTION
+                                   .getMsg("bind params not yet supported"));
+    }
+    
+    public String explain()
+    {
+        return "Bind #: " + bindIndex_;
+    }
+
+};
\ No newline at end of file

Added: incubator/cassandra/src/org/apache/cassandra/cql/common/CExpr.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/cql/common/CExpr.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/cql/common/CExpr.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/cql/common/CExpr.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql.common;
+
+//Note: This class is CQL related work in progress.
+public class CExpr
+{
+    public static interface Expr
+    {
+        CType  getType();
+        String toString();
+    };
+}
+

Added: incubator/cassandra/src/org/apache/cassandra/cql/common/CType.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/cql/common/CType.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/cql/common/CType.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/cql/common/CType.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql.common;
+
+import java.util.ArrayList;
+
+//Note: This class is CQL related work in progress.
+public class CType
+{
+    public static interface Type
+    {
+        String toString(); 
+    };
+
+    public static class IntegerType implements Type
+    {
+        public String toString() { return "Integer"; };
+    }
+
+    public static class StringType implements Type
+    {
+        public String toString() { return "String"; };
+    }
+
+    public static class RowType implements Type
+    {
+        ArrayList<Type> types_;
+        public RowType(ArrayList<Type> types)
+        {
+            types_ = types;
+        }
+
+        public String toString()
+        {
+            StringBuffer sb = new StringBuffer("<");
+            for (int idx = types_.size(); idx > 0; idx--)
+            {
+                sb.append(types_.toString());
+                if (idx != 1)
+                {
+                    sb.append(", ");
+                }
+            }
+            sb.append(">");
+            return sb.toString();
+        }
+    }
+
+    public static class ArrayType
+    {
+        Type elementType_;
+        public ArrayType(Type elementType)
+        {
+            elementType_ = elementType;
+        }
+
+        public String toString()
+        {
+            return "Array(" + elementType_.toString() + ")";
+        }
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/cql/common/ColumnMapExpr.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/cql/common/ColumnMapExpr.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/cql/common/ColumnMapExpr.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/cql/common/ColumnMapExpr.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,25 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql.common;
+
+import java.util.ArrayList;
+
+public class ColumnMapExpr extends ArrayList<Pair<OperandDef, OperandDef>>
+{
+    private static final long serialVersionUID = 1L;
+};
\ No newline at end of file

Added: incubator/cassandra/src/org/apache/cassandra/cql/common/ColumnRangeQueryRSD.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/cql/common/ColumnRangeQueryRSD.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/cql/common/ColumnRangeQueryRSD.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/cql/common/ColumnRangeQueryRSD.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql.common;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.List;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.cql.execution.RuntimeErrorMsg;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.db.*;
+
+/**
+ * A Row Source Defintion (RSD) for doing a range query on a column map
+ * (in Standard or Super Column Family).
+ */
+public class ColumnRangeQueryRSD extends RowSourceDef
+{
+    private final static Logger logger_ = Logger.getLogger(ColumnRangeQueryRSD.class);
+    private CFMetaData cfMetaData_;
+    private OperandDef rowKey_;
+    private OperandDef superColumnKey_;
+    private int        offset_;
+    private int        limit_;
+
+    /**
+     * Set up a range query on column map in a simple column family.
+     * The column map in a simple column family is identified by the rowKey.
+     * 
+     * Note: "limit" of -1 is the equivalent of no limit.
+     *       "offset" specifies the number of rows to skip. An offset of 0 implies from the first row.
+     */
+    public ColumnRangeQueryRSD(CFMetaData cfMetaData, OperandDef rowKey, int offset, int limit)
+    {
+        cfMetaData_     = cfMetaData;
+        rowKey_         = rowKey;
+        superColumnKey_ = null;
+        offset_         = offset;
+        limit_          = limit;
+    }
+
+    /**
+     * Setup a range query on a column map in a super column family.
+     * The column map in a super column family is identified by the rowKey & superColumnKey.
+     *  
+     * Note: "limit" of -1 is the equivalent of no limit.
+     *       "offset" specifies the number of rows to skip. An offset of 0 implies the first row.  
+     */
+    public ColumnRangeQueryRSD(CFMetaData cfMetaData, ConstantOperand rowKey, ConstantOperand superColumnKey,
+                               int offset, int limit)
+    {
+        cfMetaData_     = cfMetaData;
+        rowKey_         = rowKey;
+        superColumnKey_ = superColumnKey;
+        offset_         = offset;
+        limit_          = limit;
+    }
+
+    public List<Map<String,String>> getRows()
+    {
+        String columnFamily_column;
+        String superColumnKey = null;      
+
+        if (superColumnKey_ != null)
+        {
+            superColumnKey = (String)(superColumnKey_.get());
+            columnFamily_column = cfMetaData_.cfName + ":" + superColumnKey;
+        }
+        else
+        {
+            columnFamily_column = cfMetaData_.cfName;
+        }
+
+        Row row = null;
+        try
+        {
+            String key = (String)(rowKey_.get());
+            row = StorageProxy.readProtocol(cfMetaData_.tableName, key, columnFamily_column,
+                                            offset_, limit_, StorageService.ConsistencyLevel.WEAK);
+        }
+        catch (Exception e)
+        {
+            logger_.error(LogUtil.throwableToString(e));
+            throw new RuntimeException(RuntimeErrorMsg.GENERIC_ERROR.getMsg());
+        }
+
+        List<Map<String, String>> rows = new LinkedList<Map<String, String>>();
+        if (row != null)
+        {
+            Map<String, ColumnFamily> cfMap = row.getColumnFamilies();
+            if (cfMap != null && cfMap.size() > 0)
+            {
+                ColumnFamily cfamily = cfMap.get(cfMetaData_.cfName);
+                if (cfamily != null)
+                {
+                    Collection<IColumn> columns = null;
+                    if (superColumnKey_ != null)
+                    {
+                        // this is the super column case 
+                        IColumn column = cfamily.getColumn(superColumnKey);
+                        if (column != null)
+                            columns = column.getSubColumns();
+                    }
+                    else
+                    {
+                        columns = cfamily.getAllColumns();
+                    }
+
+                    if (columns != null && columns.size() > 0)
+                    {
+                        for (IColumn column : columns)
+                        {
+                            Map<String, String> result = new HashMap<String, String>();
+                            
+                            result.put(cfMetaData_.n_columnKey, column.name());
+                            result.put(cfMetaData_.n_columnValue, new String(column.value()));
+                            result.put(cfMetaData_.n_columnTimestamp, Long.toString(column.timestamp()));
+                            
+                            rows.add(result);
+                        }
+                    }
+                }
+            }
+        }
+        return rows;
+    }
+
+    public String explainPlan()
+    {
+        return String.format("%s Column Family: Column Range Query: \n" +
+                "  Table Name:       %s\n" +
+                "  Column Family:    %s\n" +
+                "  RowKey:           %s\n" +
+                "%s"                   +
+                "  Offset:           %d\n" +
+                "  Limit:            %d\n" +
+                "  Order By:         %s",
+                cfMetaData_.columnType,
+                cfMetaData_.tableName,
+                cfMetaData_.cfName,
+                rowKey_.explain(),
+                (superColumnKey_ == null) ? "" : "  SuperColumnKey:   " + superColumnKey_.explain() + "\n",
+                offset_, limit_,
+                cfMetaData_.indexProperty_);
+    }
+}
\ No newline at end of file

Added: incubator/cassandra/src/org/apache/cassandra/cql/common/ConstantOperand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/cql/common/ConstantOperand.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/cql/common/ConstantOperand.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/cql/common/ConstantOperand.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql.common;
+
+/**
+ * ConstantOperand:
+ * Represents a literal/constant operand in the CQL statement.
+ * Lives as part of the shared execution plan.
+ */
+public class ConstantOperand implements OperandDef 
+{
+    Object value_;
+    public ConstantOperand(Object value)
+    {
+        value_ = value;
+    }
+
+    public Object get()
+    {
+        return value_;
+    }
+
+    public String explain()
+    {
+        return "Constant: '" + value_ + "'";
+    }
+};
\ No newline at end of file