You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 07:12:49 UTC
svn commit: r749205 [6/16] - in
/incubator/cassandra/src/org/apache/cassandra: analytics/ cli/ concurrent/
config/ continuations/ cql/ cql/common/ cql/compiler/ cql/compiler/common/
cql/compiler/parse/ cql/compiler/sem/ cql/driver/ cql/execution/ dht/ ...
Added: incubator/cassandra/src/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.*;
+
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.utils.*;
+
+/**
+ * This is a wrapper class for the <i>ScheduledThreadPoolExecutor</i>. It provides an implementation
+ * for the <i>afterExecute()</i> found in the <i>ThreadPoolExecutor</i> class to log any unexpected
+ * Runtime Exceptions.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public final class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
+{
+ private static Logger logger_ = Logger.getLogger(DebuggableThreadPoolExecutor.class);
+
+ public DebuggableThreadPoolExecutor(int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue,
+ ThreadFactory threadFactory)
+ {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
+ super.prestartAllCoreThreads();
+ }
+
+ /*
+ *
+ * (non-Javadoc)
+ * @see java.util.concurrent.ThreadPoolExecutor#afterExecute(java.lang.Runnable, java.lang.Throwable)
+ * Helps us in figuring out why sometimes the threads are getting
+ * killed and replaced by new ones.
+ */
+ public void afterExecute(Runnable r, Throwable t)
+ {
+ super.afterExecute(r,t);
+ if ( t != null )
+ {
+ Context ctx = ThreadLocalContext.get();
+ if ( ctx != null )
+ {
+ Object object = ctx.get(r.getClass().getName());
+
+ if ( object != null )
+ {
+ logger_.info("**** In afterExecute() " + t.getClass().getName() + " occured while working with " + object + " ****");
+ }
+ else
+ {
+ logger_.info("**** In afterExecute() " + t.getClass().getName() + " occured ****");
+ }
+ }
+
+ Throwable cause = t.getCause();
+ if ( cause != null )
+ {
+ logger_.info( LogUtil.throwableToString(cause) );
+ }
+ logger_.info( LogUtil.throwableToString(t) );
+ }
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/concurrent/IContinuable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/concurrent/IContinuable.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/concurrent/IContinuable.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/concurrent/IContinuable.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import org.apache.commons.javaflow.Continuation;
+
+public interface IContinuable
+{
+ public void run(Continuation c);
+}
Added: incubator/cassandra/src/org/apache/cassandra/concurrent/IStage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/concurrent/IStage.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/concurrent/IStage.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/concurrent/IStage.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.*;
+
+/**
+ * An abstraction for stages as described in the SEDA paper by Matt Welsh.
+ * For reference to the paper look over here
+ * <a href="http://www.eecs.harvard.edu/~mdw/papers/seda-sosp01.pdf">SEDA: An Architecture for WellConditioned,
+ Scalable Internet Services</a>.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IStage
+{
+ /**
+ * Get the name of the associated stage
+ * @return
+ */
+ public String getName();
+
+ /**
+ * Get the thread pool used by this stage
+ * internally.
+ */
+ public ExecutorService getInternalThreadPool();
+
+ /**
+ * This method is used to execute a peice of code on
+ * this stage. The idea is that the <i>run()</i> method
+ * of this Runnable instance is invoked on a thread from a
+ * thread pool that belongs to this stage.
+ * @param runnable instance whose run() method needs to be invoked.
+ */
+ public void execute(Runnable runnable);
+
+ /**
+ * This method is used to execute a peice of code on
+ * this stage which returns a Future pointer. The idea
+ * is that the <i>call()</i> method of this Runnable
+ * instance is invoked on a thread from a thread pool
+ * that belongs to this stage.
+
+ * @param callable instance that needs to be invoked.
+ * @return
+ */
+ public Future<Object> execute(Callable<Object> callable);
+
+ /**
+ * This method is used to submit tasks to this stage
+ * that execute periodically.
+ *
+ * @param command the task to execute.
+ * @param initialDelay the time to delay first execution
+ * @param unit the time unit of the initialDelay and period parameters
+ * @return
+ */
+ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
+
+ /**
+ * This method is used to submit tasks to this stage
+ * that execute periodically.
+ * @param command the task to execute.
+ * @param initialDelay the time to delay first execution
+ * @param period the period between successive executions
+ * @param unit the time unit of the initialDelay and period parameters
+ * @return
+ */
+ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
+
+ /**
+ * This method is used to submit tasks to this stage
+ * that execute periodically.
+ * @param command the task to execute.
+ * @param initialDelay the time to delay first execution
+ * @param delay the delay between the termination of one execution and the commencement of the next.
+ * @param unit the time unit of the initialDelay and delay parameters
+ * @return
+ */
+ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
+
+ /**
+ * Shutdown the stage. All the threads of this stage
+ * are forcefully shutdown. Any pending tasks on this
+ * stage could be dropped or the stage could wait for
+ * these tasks to be completed. This is however an
+ * implementation detail.
+ */
+ public void shutdown();
+
+ /**
+ * Checks if the stage has been shutdown.
+ * @return
+ */
+ public boolean isShutdown();
+
+ /**
+ * This method returns the number of tasks that are
+ * pending on this stage to be executed.
+ * @return
+ */
+ public long getTaskCount();
+}
Added: incubator/cassandra/src/org/apache/cassandra/concurrent/MultiThreadedStage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/concurrent/MultiThreadedStage.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/concurrent/MultiThreadedStage.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/concurrent/MultiThreadedStage.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.*;
+
+import javax.naming.OperationNotSupportedException;
+
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * This class is an implementation of the <i>IStage</i> interface. In particular
+ * it is for a stage that has a thread pool with multiple threads. For details
+ * please refer to the <i>IStage</i> documentation.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class MultiThreadedStage implements IStage
+{
+ private String name_;
+ private DebuggableThreadPoolExecutor executorService_;
+
+ public MultiThreadedStage(String name, int numThreads)
+ {
+ name_ = name;
+ executorService_ = new DebuggableThreadPoolExecutor( numThreads,
+ numThreads,
+ Integer.MAX_VALUE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new ThreadFactoryImpl(name)
+ );
+ }
+
+ public String getName()
+ {
+ return name_;
+ }
+
+ public ExecutorService getInternalThreadPool()
+ {
+ return executorService_;
+ }
+
+ public Future<Object> execute(Callable<Object> callable) {
+ return executorService_.submit(callable);
+ }
+
+ public void execute(Runnable runnable) {
+ executorService_.execute(runnable);
+ }
+
+ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
+ {
+ throw new UnsupportedOperationException("This operation is not supported");
+ }
+
+ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
+ throw new UnsupportedOperationException("This operation is not supported");
+ }
+
+ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
+ throw new UnsupportedOperationException("This operation is not supported");
+ }
+
+ public void shutdown() {
+ executorService_.shutdownNow();
+ }
+
+ public boolean isShutdown()
+ {
+ return executorService_.isShutdown();
+ }
+
+ public long getTaskCount(){
+ return (executorService_.getTaskCount() - executorService_.getCompletedTaskCount());
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/concurrent/RejectedExecutionHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/concurrent/RejectedExecutionHandler.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/concurrent/RejectedExecutionHandler.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/concurrent/RejectedExecutionHandler.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+interface RejectedExecutionHandler
+{
+ public void rejectedExecution(Runnable r, ContinuationsExecutor executor);
+}
Added: incubator/cassandra/src/org/apache/cassandra/concurrent/SingleThreadedContinuationStage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/concurrent/SingleThreadedContinuationStage.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/concurrent/SingleThreadedContinuationStage.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/concurrent/SingleThreadedContinuationStage.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+public class SingleThreadedContinuationStage implements IStage
+{
+ protected ContinuationsExecutor executorService_;
+ private String name_;
+
+ public SingleThreadedContinuationStage(String name)
+ {
+ executorService_ = new ContinuationsExecutor( 1,
+ 1,
+ Integer.MAX_VALUE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new ThreadFactoryImpl(name)
+ );
+ name_ = name;
+ }
+
+ /* Implementing the IStage interface methods */
+
+ public String getName()
+ {
+ return name_;
+ }
+
+ public ExecutorService getInternalThreadPool()
+ {
+ return executorService_;
+ }
+
+ public void execute(Runnable runnable)
+ {
+ executorService_.execute(runnable);
+ }
+
+ public Future<Object> execute(Callable<Object> callable)
+ {
+ return executorService_.submit(callable);
+ }
+
+ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
+ {
+ //return executorService_.schedule(command, delay, unit);
+ throw new UnsupportedOperationException("This operation is not supported");
+ }
+
+ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
+ {
+ //return executorService_.scheduleAtFixedRate(command, initialDelay, period, unit);
+ throw new UnsupportedOperationException("This operation is not supported");
+ }
+
+ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
+ {
+ //return executorService_.scheduleWithFixedDelay(command, initialDelay, delay, unit);
+ throw new UnsupportedOperationException("This operation is not supported");
+ }
+
+ public void shutdown()
+ {
+ executorService_.shutdownNow();
+ }
+
+ public boolean isShutdown()
+ {
+ return executorService_.isShutdown();
+ }
+
+ public long getTaskCount(){
+ return (executorService_.getTaskCount() - executorService_.getCompletedTaskCount());
+ }
+ /* Finished implementing the IStage interface methods */
+}
+
Added: incubator/cassandra/src/org/apache/cassandra/concurrent/SingleThreadedStage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/concurrent/SingleThreadedStage.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/concurrent/SingleThreadedStage.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/concurrent/SingleThreadedStage.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.cassandra.net.*;
+
+/**
+ * This class is an implementation of the <i>IStage</i> interface. In particular
+ * it is for a stage that has a thread pool with a single thread. For details
+ * please refer to the <i>IStage</i> documentation.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class SingleThreadedStage implements IStage
+{
+ protected DebuggableThreadPoolExecutor executorService_;
+ private String name_;
+
+ public SingleThreadedStage(String name)
+ {
+ //executorService_ = new DebuggableScheduledThreadPoolExecutor(1,new ThreadFactoryImpl(name));
+ executorService_ = new DebuggableThreadPoolExecutor( 1,
+ 1,
+ Integer.MAX_VALUE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new ThreadFactoryImpl(name)
+ );
+ name_ = name;
+ }
+
+ /* Implementing the IStage interface methods */
+
+ public String getName()
+ {
+ return name_;
+ }
+
+ public ExecutorService getInternalThreadPool()
+ {
+ return executorService_;
+ }
+
+ public void execute(Runnable runnable)
+ {
+ executorService_.execute(runnable);
+ }
+
+ public Future<Object> execute(Callable<Object> callable)
+ {
+ return executorService_.submit(callable);
+ }
+
+ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
+ {
+ //return executorService_.schedule(command, delay, unit);
+ throw new UnsupportedOperationException("This operation is not supported");
+ }
+
+ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
+ {
+ //return executorService_.scheduleAtFixedRate(command, initialDelay, period, unit);
+ throw new UnsupportedOperationException("This operation is not supported");
+ }
+
+ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
+ {
+ //return executorService_.scheduleWithFixedDelay(command, initialDelay, delay, unit);
+ throw new UnsupportedOperationException("This operation is not supported");
+ }
+
+ public void shutdown()
+ {
+ executorService_.shutdownNow();
+ }
+
+ public boolean isShutdown()
+ {
+ return executorService_.isShutdown();
+ }
+
+ public long getTaskCount(){
+ return (executorService_.getTaskCount() - executorService_.getCompletedTaskCount());
+ }
+ /* Finished implementing the IStage interface methods */
+}
Added: incubator/cassandra/src/org/apache/cassandra/concurrent/StageManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/concurrent/StageManager.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/concurrent/StageManager.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/concurrent/StageManager.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.cassandra.continuations.Suspendable;
+
+
+/**
+ * This class manages all stages that exist within a process. The application registers
+ * and de-registers stages with this abstraction. Any component that has the <i>ID</i>
+ * associated with a stage can obtain a handle to actual stage.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class StageManager
+{
+ private static Map<String, IStage > stageQueues_ = new HashMap<String, IStage>();
+
+ /**
+ * Register a stage with the StageManager
+ * @param stageName stage name.
+ * @param stage stage for the respective message types.
+ */
+ public static void registerStage(String stageName, IStage stage)
+ {
+ stageQueues_.put(stageName, stage);
+ }
+
+ /**
+ * Returns the stage that we are currently executing on.
+ * This relies on the fact that the thread names in the
+ * stage have the name of the stage as the prefix.
+ * @return
+ */
+ public static IStage getCurrentStage()
+ {
+ String name = Thread.currentThread().getName();
+ String[] peices = name.split(":");
+ IStage stage = getStage(peices[0]);
+ return stage;
+ }
+
+ /**
+ * Retrieve a stage from the StageManager
+ * @param stageName name of the stage to be retrieved.
+ */
+ public static IStage getStage(String stageName)
+ {
+ return stageQueues_.get(stageName);
+ }
+
+ /**
+ * Retrieve the internal thread pool associated with the
+ * specified stage name.
+ * @param stageName name of the stage.
+ */
+ public static ExecutorService getStageInternalThreadPool(String stageName)
+ {
+ IStage stage = getStage(stageName);
+ if ( stage == null )
+ throw new IllegalArgumentException("No stage registered with name " + stageName);
+ return stage.getInternalThreadPool();
+ }
+
+ /**
+ * Deregister a stage from StageManager
+ * @param stageName stage name.
+ */
+ public static void deregisterStage(String stageName)
+ {
+ stageQueues_.remove(stageName);
+ }
+
+ /**
+ * This method gets the number of tasks on the
+ * stage's internal queue.
+ * @param stage name of the stage
+ * @return
+ */
+ public static long getStageTaskCount(String stage)
+ {
+ return stageQueues_.get(stage).getTaskCount();
+ }
+
+ /**
+ * This method shuts down all registered stages.
+ */
+ public static void shutdown()
+ {
+ Set<String> stages = stageQueues_.keySet();
+ for ( String stage : stages )
+ {
+ IStage registeredStage = stageQueues_.get(stage);
+ registeredStage.shutdown();
+ }
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/concurrent/ThreadFactoryImpl.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/concurrent/ThreadFactoryImpl.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/concurrent/ThreadFactoryImpl.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/concurrent/ThreadFactoryImpl.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+import org.apache.cassandra.utils.*;
+
+/**
+ * This class is an implementation of the <i>ThreadFactory</i> interface. This
+ * is useful to give Java threads meaningful names which is useful when using
+ * a tool like JConsole.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class ThreadFactoryImpl implements ThreadFactory
+{
+ protected String id_;
+ protected ThreadGroup threadGroup_;
+ protected final AtomicInteger threadNbr_ = new AtomicInteger(1);
+
+ public ThreadFactoryImpl(String id)
+ {
+ SecurityManager sm = System.getSecurityManager();
+ threadGroup_ = ( sm != null ) ? sm.getThreadGroup() : Thread.currentThread().getThreadGroup();
+ id_ = id;
+ }
+
+ public Thread newThread(Runnable runnable)
+ {
+ String name = id_ + ":" + threadNbr_.getAndIncrement();
+ Thread thread = new Thread(threadGroup_, runnable, name);
+ return thread;
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/concurrent/ThreadLocalContext.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/concurrent/ThreadLocalContext.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/concurrent/ThreadLocalContext.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/concurrent/ThreadLocalContext.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+/**
+ * Use this implementation over Java's ThreadLocal or InheritableThreadLocal when
+ * you need to add multiple key/value pairs into ThreadLocalContext for a given thread.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+
+public class ThreadLocalContext
+{
+ private static InheritableThreadLocal<Context> tls_ = new InheritableThreadLocal<Context>();
+
+ public static void put(Context value)
+ {
+ tls_.set(value);
+ }
+
+ public static Context get()
+ {
+ return tls_.get();
+ }
+}
\ No newline at end of file
Added: incubator/cassandra/src/org/apache/cassandra/config/CFMetaData.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/config/CFMetaData.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/config/CFMetaData.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/config/CFMetaData.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.config;
+
+public class CFMetaData
+{
+ public String tableName; // name of table which has this column family
+ public String cfName; // name of the column family
+ public String columnType; // type: super, standard, etc.
+ public String indexProperty_; // name sorted, time stamp sorted etc.
+
+ // The user chosen names (n_) for various parts of data in a column family.
+ // CQL queries, for instance, will refer to/extract data within a column
+ // family using these logical names.
+ public String n_rowKey;
+ public String n_superColumnMap; // only used if this is a super column family
+ public String n_superColumnKey; // only used if this is a super column family
+ public String n_columnMap;
+ public String n_columnKey;
+ public String n_columnValue;
+ public String n_columnTimestamp;
+
+ // a quick and dirty pretty printer for describing the column family...
+ public String pretty()
+ {
+ String desc;
+ desc = n_columnMap + "(" + n_columnKey + ", " + n_columnValue + ", " + n_columnTimestamp + ")";
+ if ("Super".equals(columnType))
+ {
+ desc = n_superColumnMap + "(" + n_superColumnKey + ", " + desc + ")";
+ }
+ desc = tableName + "." + cfName + "(" + n_rowKey + ", " + desc + ")\n";
+
+ desc += "Column Family Type: " + columnType + "\n" +
+ "Columns Sorted By: " + indexProperty_ + "\n";
+ return desc;
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/config/DatabaseDescriptor.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/config/DatabaseDescriptor.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/config/DatabaseDescriptor.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,743 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.config;
+
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.io.*;
+
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.TypeInfo;
+import org.apache.cassandra.db.Table.TableMetadata;
+import org.apache.cassandra.utils.FileUtils;
+import org.apache.cassandra.utils.XMLUtils;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.apache.cassandra.io.*;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class DatabaseDescriptor
+{
+ public static final String random_ = "RANDOM";
+ public static final String ophf_ = "OPHF";
+ private static int storagePort_ = 7000;
+ private static int controlPort_ = 7001;
+ private static int httpPort_ = 7002;
+ private static String clusterName_ = "Test";
+ private static int replicationFactor_ = 3;
+ private static long rpcTimeoutInMillis_ = 2000;
+ private static Set<String> seeds_ = new HashSet<String>();
+ private static String metadataDirectory_;
+ private static String snapshotDirectory_;
+ /* Keeps the list of Ganglia servers to contact */
+ private static String[] gangliaServers_ ;
+ /* Keeps the list of map output directories */
+ private static String[] mapOutputDirectories_;
+ /* Keeps the list of data file directories */
+ private static String[] dataFileDirectories_;
+ /* Current index into the above list of directories */
+ private static int currentIndex_ = 0;
+ private static String logFileDirectory_;
+ private static String bootstrapFileDirectory_;
+ private static int logRotationThreshold_ = 128*1024*1024;
+ private static boolean fastSync_ = false;
+ private static boolean rackAware_ = false;
+ private static int threadsPerPool_ = 4;
+ private static List<String> tables_ = new ArrayList<String>();
+ private static Set<String> applicationColumnFamilies_ = new HashSet<String>();
+
+ // Default descriptive names for use in CQL. The user can override
+ // these choices in the config file. These are not case sensitive.
+ // Hence, these are stored in UPPER case for easy comparison.
+ private static String d_rowKey_ = "ROW_KEY";
+ private static String d_superColumnMap_ = "SUPER_COLUMN_MAP";
+ private static String d_superColumnKey_ = "SUPER_COLUMN_KEY";
+ private static String d_columnMap_ = "COLUMN_MAP";
+ private static String d_columnKey_ = "COLUMN_KEY";
+ private static String d_columnValue_ = "COLUMN_VALUE";
+ private static String d_columnTimestamp_ = "COLUMN_TIMESTAMP";
+
+ /*
+ * A map from table names to the set of column families for the table and the
+ * corresponding meta data for that column family.
+ */
+ private static Map<String, Map<String, CFMetaData>> tableToCFMetaDataMap_;
+ /* Hashing strategy Random or OPHF */
+ private static String hashingStrategy_ = DatabaseDescriptor.random_;
+ /* if the size of columns or super-columns are more than this, indexing will kick in */
+ private static int columnIndexSizeInKB_;
+ /* Size of touch key cache */
+ private static int touchKeyCacheSize_ = 1024;
+ /* Number of hours to keep a memtable in memory */
+ private static int memtableLifetime_ = 6;
+ /* Size of the memtable in memory before it is dumped */
+ private static int memtableSize_ = 128;
+ /* Number of objects in millions in the memtable before it is dumped */
+ private static int memtableObjectCount_ = 1;
+ /*
+ * This parameter enables or disables consistency checks.
+ * If set to false the read repairs are disable for very
+ * high throughput on reads but at the cost of consistency.
+ */
+ private static boolean doConsistencyCheck_ = true;
+ /* Address of ZooKeeper cell */
+ private static String zkAddress_;
+ /* Callout directories */
+ private static String calloutLocation_;
+ /* Job Jar Location */
+ private static String jobJarFileLocation_;
+ /* Address where to run the job tracker */
+ private static String jobTrackerHost_;
+ /* Zookeeper session timeout. */
+ private static int zkSessionTimeout_ = 30000;
+
+ // the path qualified config file (storage-conf.xml) name
+ private static String configFileName_;
+
+ public static Map<String, Map<String, CFMetaData>> init(String filePath) throws Throwable
+ {
+ /* Read the configuration file to retrieve DB related properties. */
+ String file = filePath + System.getProperty("file.separator") + "storage-conf.xml";
+ return initInternal(file);
+ }
+
+ public static Map<String, Map<String, CFMetaData>> init() throws Throwable
+ {
+ /* Read the configuration file to retrieve DB related properties. */
+ configFileName_ = System.getProperty("storage-config") + System.getProperty("file.separator") + "storage-conf.xml";
+ return initInternal(configFileName_);
+ }
+
+ public static Map<String, Map<String, CFMetaData>> initInternal(String file) throws Throwable
+ {
+ String os = System.getProperty("os.name");
+ XMLUtils xmlUtils = new XMLUtils(file);
+
+ /* Cluster Name */
+ clusterName_ = xmlUtils.getNodeValue("/Storage/ClusterName");
+
+ /* Ganglia servers contact list */
+ gangliaServers_ = xmlUtils.getNodeValues("/Storage/GangliaServers/GangliaServer");
+
+ /* ZooKeeper's address */
+ zkAddress_ = xmlUtils.getNodeValue("/Storage/ZookeeperAddress");
+
+ /* Hashing strategy */
+ hashingStrategy_ = xmlUtils.getNodeValue("/Storage/HashingStrategy");
+ /* Callout location */
+ calloutLocation_ = xmlUtils.getNodeValue("/Storage/CalloutLocation");
+
+ /* JobTracker address */
+ jobTrackerHost_ = xmlUtils.getNodeValue("/Storage/JobTrackerHost");
+
+ /* Job Jar file location */
+ jobJarFileLocation_ = xmlUtils.getNodeValue("/Storage/JobJarFileLocation");
+
+ /* Zookeeper's session timeout */
+ String zkSessionTimeout = xmlUtils.getNodeValue("/Storage/ZookeeperSessionTimeout");
+ if ( zkSessionTimeout != null )
+ zkSessionTimeout_ = Integer.parseInt(zkSessionTimeout);
+
+ /* Data replication factor */
+ String replicationFactor = xmlUtils.getNodeValue("/Storage/ReplicationFactor");
+ if ( replicationFactor != null )
+ replicationFactor_ = Integer.parseInt(replicationFactor);
+
+ /* RPC Timeout */
+ String rpcTimeoutInMillis = xmlUtils.getNodeValue("/Storage/RpcTimeoutInMillis");
+ if ( rpcTimeoutInMillis != null )
+ rpcTimeoutInMillis_ = Integer.parseInt(rpcTimeoutInMillis);
+
+ /* Thread per pool */
+ String threadsPerPool = xmlUtils.getNodeValue("/Storage/ThreadsPerPool");
+ if ( threadsPerPool != null )
+ threadsPerPool_ = Integer.parseInt(threadsPerPool);
+
+ /* TCP port on which the storage system listens */
+ String port = xmlUtils.getNodeValue("/Storage/StoragePort");
+ if ( port != null )
+ storagePort_ = Integer.parseInt(port);
+
+ /* UDP port for control messages */
+ port = xmlUtils.getNodeValue("/Storage/ControlPort");
+ if ( port != null )
+ controlPort_ = Integer.parseInt(port);
+
+ /* HTTP port for HTTP messages */
+ port = xmlUtils.getNodeValue("/Storage/HttpPort");
+ if ( port != null )
+ httpPort_ = Integer.parseInt(port);
+
+ /* Touch Key Cache Size */
+ String touchKeyCacheSize = xmlUtils.getNodeValue("/Storage/TouchKeyCacheSize");
+ if ( touchKeyCacheSize != null )
+ touchKeyCacheSize_ = Integer.parseInt(touchKeyCacheSize);
+
+ /* Number of days to keep the memtable around w/o flushing */
+ String lifetime = xmlUtils.getNodeValue("/Storage/MemtableLifetimeInDays");
+ if ( lifetime != null )
+ memtableLifetime_ = Integer.parseInt(lifetime);
+
+ /* Size of the memtable in memory in MB before it is dumped */
+ String memtableSize = xmlUtils.getNodeValue("/Storage/MemtableSizeInMB");
+ if ( memtableSize != null )
+ memtableSize_ = Integer.parseInt(memtableSize);
+ /* Number of objects in millions in the memtable before it is dumped */
+ String memtableObjectCount = xmlUtils.getNodeValue("/Storage/MemtableObjectCountInMillions");
+ if ( memtableObjectCount != null )
+ memtableObjectCount_ = Integer.parseInt(memtableObjectCount);
+
+ /* This parameter enables or disables consistency checks.
+ * If set to false the read repairs are disable for very
+ * high throughput on reads but at the cost of consistency.*/
+ String doConsistencyCheck = xmlUtils.getNodeValue("/Storage/DoConsistencyChecksBoolean");
+ if ( doConsistencyCheck != null )
+ doConsistencyCheck_ = Boolean.parseBoolean(doConsistencyCheck);
+
+
+ /* read the size at which we should do column indexes */
+ String columnIndexSizeInKB = xmlUtils.getNodeValue("/Storage/ColumnIndexSizeInKB");
+ if(columnIndexSizeInKB == null)
+ {
+ columnIndexSizeInKB_ = 64;
+ }
+ else
+ {
+ columnIndexSizeInKB_ = Integer.parseInt(columnIndexSizeInKB);
+ }
+
+ /* metadata directory */
+ metadataDirectory_ = xmlUtils.getNodeValue("/Storage/MetadataDirectory");
+ if ( metadataDirectory_ != null )
+ FileUtils.createDirectory(metadataDirectory_);
+ else
+ {
+ if ( os.equals("Linux") )
+ {
+ metadataDirectory_ = "/var/storage/system";
+ }
+ }
+
+ /* snapshot directory */
+ snapshotDirectory_ = xmlUtils.getNodeValue("/Storage/SnapshotDirectory");
+ if ( snapshotDirectory_ != null )
+ FileUtils.createDirectory(snapshotDirectory_);
+ else
+ {
+ snapshotDirectory_ = metadataDirectory_ + System.getProperty("file.separator") + "snapshot";
+ }
+
+ /* map output directory */
+ mapOutputDirectories_ = xmlUtils.getNodeValues("/Storage/MapOutputDirectories/MapOutputDirectory");
+ if ( mapOutputDirectories_.length > 0 )
+ {
+ for ( String mapOutputDirectory : mapOutputDirectories_ )
+ FileUtils.createDirectory(mapOutputDirectory);
+ }
+
+ /* data file directory */
+ dataFileDirectories_ = xmlUtils.getNodeValues("/Storage/DataFileDirectories/DataFileDirectory");
+ if ( dataFileDirectories_.length > 0 )
+ {
+ for ( String dataFileDirectory : dataFileDirectories_ )
+ FileUtils.createDirectory(dataFileDirectory);
+ }
+ else
+ {
+ if ( os.equals("Linux") )
+ {
+ dataFileDirectories_ = new String[]{"/var/storage/data"};
+ }
+ }
+
+ /* bootstrap file directory */
+ bootstrapFileDirectory_ = xmlUtils.getNodeValue("/Storage/BootstrapFileDirectory");
+ if ( bootstrapFileDirectory_ != null )
+ FileUtils.createDirectory(bootstrapFileDirectory_);
+ else
+ {
+ if ( os.equals("Linux") )
+ {
+ bootstrapFileDirectory_ = "/var/storage/bootstrap";
+ }
+ }
+
+ /* commit log directory */
+ logFileDirectory_ = xmlUtils.getNodeValue("/Storage/CommitLogDirectory");
+ if ( logFileDirectory_ != null )
+ FileUtils.createDirectory(logFileDirectory_);
+ else
+ {
+ if ( os.equals("Linux") )
+ {
+ logFileDirectory_ = "/var/storage/commitlog";
+ }
+ }
+
+ /* threshold after which commit log should be rotated. */
+ String value = xmlUtils.getNodeValue("/Storage/CommitLogRotationThresholdInMB");
+ if ( value != null)
+ logRotationThreshold_ = Integer.parseInt(value) * 1024 * 1024;
+
+ /* fast sync option */
+ value = xmlUtils.getNodeValue("/Storage/CommitLogFastSync");
+ if ( value != null )
+ fastSync_ = Boolean.parseBoolean(value);
+
+ tableToCFMetaDataMap_ = new HashMap<String, Map<String, CFMetaData>>();
+
+ /* Rack Aware option */
+ value = xmlUtils.getNodeValue("/Storage/RackAware");
+ if ( value != null )
+ rackAware_ = Boolean.parseBoolean(value);
+
+ /* Read the table related stuff from config */
+ NodeList tables = xmlUtils.getRequestedNodeList("/Storage/Tables/Table");
+ int size = tables.getLength();
+ for ( int i = 0; i < size; ++i )
+ {
+ Node table = tables.item(i);
+
+ /* parsing out the table name */
+ String tName = XMLUtils.getAttributeValue(table, "Name");
+ tables_.add(tName);
+ tableToCFMetaDataMap_.put(tName, new HashMap<String, CFMetaData>());
+
+ String xqlTable = "/Storage/Tables/Table[@Name='" + tName + "']/";
+ NodeList columnFamilies = xmlUtils.getRequestedNodeList(xqlTable + "ColumnFamily");
+
+ // get name of the rowKey for this table
+ String n_rowKey = xmlUtils.getNodeValue(xqlTable + "RowKey");
+ if (n_rowKey == null)
+ n_rowKey = d_rowKey_;
+
+ //NodeList columnFamilies = xmlUtils.getRequestedNodeList(table, "ColumnFamily");
+ int size2 = columnFamilies.getLength();
+
+ for ( int j = 0; j < size2; ++j )
+ {
+ Node columnFamily = columnFamilies.item(j);
+ String cName = XMLUtils.getAttributeValue(columnFamily, "Name");
+ String xqlCF = xqlTable + "ColumnFamily[@Name='" + cName + "']/";
+
+ /* squirrel away the application column families */
+ applicationColumnFamilies_.add(cName);
+
+ // Parse out the column type
+ String columnType = xmlUtils.getAttributeValue(columnFamily, "ColumnType");
+ columnType = ColumnFamily.getColumnType(columnType);
+
+ // Parse out the column family sorting property for columns
+ String columnIndexProperty = XMLUtils.getAttributeValue(columnFamily, "ColumnSort");
+ String columnIndexType = ColumnFamily.getColumnSortProperty(columnIndexProperty);
+
+ // Parse out user-specified logical names for the various dimensions
+ // of a the column family from the config.
+ String n_superColumnMap = xmlUtils.getNodeValue(xqlCF + "SuperColumnMap");
+ if (n_superColumnMap == null)
+ n_superColumnMap = d_superColumnMap_;
+
+ String n_superColumnKey = xmlUtils.getNodeValue(xqlCF + "SuperColumnKey");
+ if (n_superColumnKey == null)
+ n_superColumnKey = d_superColumnKey_;
+
+ String n_columnMap = xmlUtils.getNodeValue(xqlCF + "ColumnMap");
+ if (n_columnMap == null)
+ n_columnMap = d_columnMap_;
+
+ String n_columnKey = xmlUtils.getNodeValue(xqlCF + "ColumnKey");
+ if (n_columnKey == null)
+ n_columnKey = d_columnKey_;
+
+ String n_columnValue = xmlUtils.getNodeValue(xqlCF + "ColumnValue");
+ if (n_columnValue == null)
+ n_columnValue = d_columnValue_;
+
+ String n_columnTimestamp = xmlUtils.getNodeValue(xqlCF + "ColumnTimestamp");
+ if (n_columnTimestamp == null)
+ n_columnTimestamp = d_columnTimestamp_;
+
+ // now populate the column family meta data and
+ // insert it into the table dictionary.
+ CFMetaData cfMetaData = new CFMetaData();
+
+ cfMetaData.tableName = tName;
+ cfMetaData.cfName = cName;
+
+ cfMetaData.columnType = columnType;
+ cfMetaData.indexProperty_ = columnIndexType;
+
+ cfMetaData.n_rowKey = n_rowKey;
+ cfMetaData.n_columnMap = n_columnMap;
+ cfMetaData.n_columnKey = n_columnKey;
+ cfMetaData.n_columnValue = n_columnValue;
+ cfMetaData.n_columnTimestamp = n_columnTimestamp;
+ if ("Super".equals(columnType))
+ {
+ cfMetaData.n_superColumnKey = n_superColumnKey;
+ cfMetaData.n_superColumnMap = n_superColumnMap;
+ }
+
+ tableToCFMetaDataMap_.get(tName).put(cName, cfMetaData);
+ }
+ }
+
+ /* Load the seeds for node contact points */
+ String[] seeds = xmlUtils.getNodeValues("/Storage/Seeds/Seed");
+ for( int i = 0; i < seeds.length; ++i )
+ {
+ seeds_.add( seeds[i] );
+ }
+ return tableToCFMetaDataMap_;
+ }
+
+ public static String getHashingStrategy()
+ {
+ return hashingStrategy_;
+ }
+
+ public static String getZkAddress()
+ {
+ return zkAddress_;
+ }
+
+ public static String getCalloutLocation()
+ {
+ return calloutLocation_;
+ }
+
+ public static String getJobTrackerAddress()
+ {
+ return jobTrackerHost_;
+ }
+
+ public static int getZkSessionTimeout()
+ {
+ return zkSessionTimeout_;
+ }
+
+ public static int getColumnIndexSize()
+ {
+ return columnIndexSizeInKB_ * 1024;
+ }
+
+
+ public static int getMemtableLifetime()
+ {
+ return memtableLifetime_;
+ }
+
+ public static int getMemtableSize()
+ {
+ return memtableSize_;
+ }
+
+ public static int getMemtableObjectCount()
+ {
+ return memtableObjectCount_;
+ }
+
+ public static boolean getConsistencyCheck()
+ {
+ return doConsistencyCheck_;
+ }
+
+ public static String getClusterName()
+ {
+ return clusterName_;
+ }
+
+ public static String getConfigFileName() {
+ return configFileName_;
+ }
+
+ public static boolean isApplicationColumnFamily(String columnFamily)
+ {
+ return applicationColumnFamilies_.contains(columnFamily);
+ }
+
+ public static int getTouchKeyCacheSize()
+ {
+ return touchKeyCacheSize_;
+ }
+
+ public static String getJobJarLocation()
+ {
+ return jobJarFileLocation_;
+ }
+
+ public static String getGangliaServers()
+ {
+ StringBuilder sb = new StringBuilder();
+ for ( int i = 0; i < gangliaServers_.length; ++i )
+ {
+ sb.append(gangliaServers_[i]);
+ if ( i != (gangliaServers_.length - 1) )
+ sb.append(", ");
+ }
+ return sb.toString();
+ }
+
+ public static Map<String, CFMetaData> getTableMetaData(String table)
+ {
+ return tableToCFMetaDataMap_.get(table);
+ }
+
+ /*
+ * Given a table name & column family name, get the column family
+ * meta data. If the table name or column family name is not valid
+ * this function returns null.
+ */
+ public static CFMetaData getCFMetaData(String table, String cfName)
+ {
+ Map<String, CFMetaData> cfInfo = tableToCFMetaDataMap_.get(table);
+ if (cfInfo == null)
+ return null;
+
+ return cfInfo.get(cfName);
+ }
+
+ public static String getColumnType(String cfName)
+ {
+ String table = getTables().get(0);
+ CFMetaData cfMetaData = getCFMetaData(table, cfName);
+
+ if (cfMetaData == null)
+ return null;
+ return cfMetaData.columnType;
+ }
+
+ public static boolean isNameSortingEnabled(String cfName)
+ {
+ String table = getTables().get(0);
+ CFMetaData cfMetaData = getCFMetaData(table, cfName);
+
+ if (cfMetaData == null)
+ return false;
+
+ return "Name".equals(cfMetaData.indexProperty_);
+ }
+
+ public static boolean isTimeSortingEnabled(String cfName)
+ {
+ String table = getTables().get(0);
+ CFMetaData cfMetaData = getCFMetaData(table, cfName);
+
+ if (cfMetaData == null)
+ return false;
+
+ return "Time".equals(cfMetaData.indexProperty_);
+ }
+
+
+ public static List<String> getTables()
+ {
+ return tables_;
+ }
+
+ public static void setTables(String table)
+ {
+ tables_.add(table);
+ }
+
+ public static int getStoragePort()
+ {
+ return storagePort_;
+ }
+
+ public static int getControlPort()
+ {
+ return controlPort_;
+ }
+
+ public static int getHttpPort()
+ {
+ return httpPort_;
+ }
+
+ public static int getReplicationFactor()
+ {
+ return replicationFactor_;
+ }
+
+ public static long getRpcTimeout()
+ {
+ return rpcTimeoutInMillis_;
+ }
+
+ public static int getThreadsPerPool()
+ {
+ return threadsPerPool_;
+ }
+
+ public static String getMetadataDirectory()
+ {
+ return metadataDirectory_;
+ }
+
+ public static void setMetadataDirectory(String metadataDirectory)
+ {
+ metadataDirectory_ = metadataDirectory;
+ }
+
+ public static String getSnapshotDirectory()
+ {
+ return snapshotDirectory_;
+ }
+
+ public static void setSnapshotDirectory(String snapshotDirectory)
+ {
+ snapshotDirectory_ = snapshotDirectory;
+ }
+
+ public static String[] getAllMapOutputDirectories()
+ {
+ return mapOutputDirectories_;
+ }
+
+ public static String getMapOutputLocation()
+ {
+ String mapOutputDirectory = mapOutputDirectories_[currentIndex_];
+ return mapOutputDirectory;
+ }
+
+ public static String[] getAllDataFileLocations()
+ {
+ return dataFileDirectories_;
+ }
+
+ public static String getDataFileLocation()
+ {
+ String dataFileDirectory = dataFileDirectories_[currentIndex_];
+ return dataFileDirectory;
+ }
+
+ public static String getCompactionFileLocation()
+ {
+ String dataFileDirectory = dataFileDirectories_[currentIndex_];
+ currentIndex_ = (currentIndex_ + 1 )%dataFileDirectories_.length ;
+ return dataFileDirectory;
+ }
+
+ public static String getBootstrapFileLocation()
+ {
+ return bootstrapFileDirectory_;
+ }
+
+ public static void setBootstrapFileLocation(String bfLocation)
+ {
+ bootstrapFileDirectory_ = bfLocation;
+ }
+
+ public static int getLogFileSizeThreshold()
+ {
+ return logRotationThreshold_;
+ }
+
+ public static String getLogFileLocation()
+ {
+ return logFileDirectory_;
+ }
+
+ public static void setLogFileLocation(String logLocation)
+ {
+ logFileDirectory_ = logLocation;
+ }
+
+ public static boolean isFastSync()
+ {
+ return fastSync_;
+ }
+
+ public static boolean isRackAware()
+ {
+ return rackAware_;
+ }
+
+ public static Set<String> getSeeds()
+ {
+ return seeds_;
+ }
+
+ public static String getColumnFamilyType(String cfName)
+ {
+ String cfType = getColumnType(cfName);
+ if ( cfType == null )
+ cfType = "Standard";
+ return cfType;
+ }
+
+ /*
+ * Loop through all the disks to see which disk has the max free space
+ * return the disk with max free space for compactions. If the size of the expected
+ * compacted file is greater than the max disk space available return null, we cannot
+ * do compaction in this case.
+ */
+ public static String getCompactionFileLocation(long expectedCompactedFileSize)
+ {
+ long maxFreeDisk = 0;
+ int maxDiskIndex = 0;
+ String dataFileDirectory = null;
+ for ( int i = 0 ; i < dataFileDirectories_.length ; i++ )
+ {
+ File f = new File(dataFileDirectories_[i]);
+ if( maxFreeDisk < f.getUsableSpace())
+ {
+ maxFreeDisk = f.getUsableSpace();
+ maxDiskIndex = i;
+ }
+ }
+ // Load factor of 0.9 we do not want to use the entire disk that is too risky.
+ maxFreeDisk = (long)(0.9 * maxFreeDisk);
+ if( expectedCompactedFileSize < maxFreeDisk )
+ {
+ dataFileDirectory = dataFileDirectories_[maxDiskIndex];
+ currentIndex_ = (maxDiskIndex + 1 )%dataFileDirectories_.length ;
+ }
+ else
+ {
+ currentIndex_ = maxDiskIndex;
+ }
+ return dataFileDirectory;
+ }
+
+ public static TypeInfo getTypeInfo(String cfName)
+ {
+ String table = DatabaseDescriptor.getTables().get(0);
+ CFMetaData cfMetadata = DatabaseDescriptor.getCFMetaData(table, cfName);
+ if ( cfMetadata.indexProperty_.equals("Name") )
+ {
+ return TypeInfo.STRING;
+ }
+ else
+ {
+ return TypeInfo.LONG;
+ }
+ }
+
+ public static void main(String[] args) throws Throwable
+ {
+ DatabaseDescriptor.initInternal("C:\\Engagements\\Cassandra-Golden\\storage-conf.xml");
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/continuations/CAgent.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/continuations/CAgent.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/continuations/CAgent.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/continuations/CAgent.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.continuations;
+
+import java.lang.instrument.Instrumentation;
+
+import org.apache.commons.javaflow.bytecode.transformation.bcel.BcelClassTransformer;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class CAgent
+{
+ public static void premain(String agentArguments, Instrumentation instrumentation)
+ {
+ instrumentation.addTransformer(new ContinuationClassTransformer(agentArguments, new BcelClassTransformer()));
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/continuations/ContinuationClassTransformer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/continuations/ContinuationClassTransformer.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/continuations/ContinuationClassTransformer.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/continuations/ContinuationClassTransformer.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.continuations;
+
+import java.lang.annotation.Annotation;
+import java.lang.instrument.ClassFileTransformer;
+import java.lang.instrument.IllegalClassFormatException;
+import java.security.ProtectionDomain;
+import java.util.List;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.StringTokenizer;
+import org.objectweb.asm.ClassReader;
+import org.objectweb.asm.tree.AnnotationNode;
+import org.objectweb.asm.tree.ClassNode;
+import org.objectweb.asm.tree.MethodNode;
+import org.apache.commons.javaflow.bytecode.transformation.ResourceTransformer;
+import org.apache.commons.javaflow.bytecode.transformation.bcel.BcelClassTransformer;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class ContinuationClassTransformer implements ClassFileTransformer
+{
+ private static final String targetAnnotation_ = "Suspendable";
+ private ResourceTransformer transformer_;
+
+ public ContinuationClassTransformer(String agentArguments, ResourceTransformer transformer)
+ {
+ super();
+ transformer_ = transformer;
+ }
+
+ public byte[] transform(ClassLoader classLoader, String className, Class redefiningClass, ProtectionDomain domain, byte[] bytes) throws IllegalClassFormatException
+ {
+ /*
+ * Use the ASM class reader to see which classes support
+ * the Suspendable annotation. If they do then those
+ * classes need to have their bytecodes transformed for
+ * Continuation support.
+ */
+ ClassReader classReader = new ClassReader(bytes);
+ ClassNode classNode = new ClassNode();
+ classReader.accept(classNode, true);
+ List<AnnotationNode> annotationNodes = classNode.visibleAnnotations;
+
+ for( AnnotationNode annotationNode : annotationNodes )
+ {
+ if ( annotationNode.desc.indexOf(ContinuationClassTransformer.targetAnnotation_) != -1 )
+ {
+ bytes = transformer_.transform(bytes);
+ }
+ }
+ return bytes;
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/continuations/Suspendable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/continuations/Suspendable.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/continuations/Suspendable.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/continuations/Suspendable.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.continuations;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+
+@Retention(RetentionPolicy.RUNTIME)
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public @interface Suspendable
+{
+
+}
Added: incubator/cassandra/src/org/apache/cassandra/cql/common/BindOperand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/cql/common/BindOperand.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/cql/common/BindOperand.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/cql/common/BindOperand.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql.common;
+
+import org.apache.cassandra.cql.execution.RuntimeErrorMsg;
+
+/**
+ * BindOperand:
+ * Represents a bind variable in the CQL statement. Lives
+ * in the shared execution plan.
+ */
+public class BindOperand implements OperandDef
+{
+ int bindIndex_; // bind position
+
+ public BindOperand(int bindIndex)
+ {
+ bindIndex_ = bindIndex;
+ }
+
+ public Object get()
+ {
+ // TODO: Once bind variables are supported, the get() will extract
+ // the value of the bind at position "bindIndex_" from the execution
+ // context.
+ throw new RuntimeException(RuntimeErrorMsg.IMPLEMENTATION_RESTRICTION
+ .getMsg("bind params not yet supported"));
+ }
+
+ public String explain()
+ {
+ return "Bind #: " + bindIndex_;
+ }
+
+};
\ No newline at end of file
Added: incubator/cassandra/src/org/apache/cassandra/cql/common/CExpr.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/cql/common/CExpr.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/cql/common/CExpr.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/cql/common/CExpr.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql.common;
+
+//Note: This class is CQL related work in progress.
+public class CExpr
+{
+ public static interface Expr
+ {
+ CType getType();
+ String toString();
+ };
+}
+
Added: incubator/cassandra/src/org/apache/cassandra/cql/common/CType.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/cql/common/CType.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/cql/common/CType.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/cql/common/CType.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql.common;
+
+import java.util.ArrayList;
+
+//Note: This class is CQL related work in progress.
+public class CType
+{
+ public static interface Type
+ {
+ String toString();
+ };
+
+ public static class IntegerType implements Type
+ {
+ public String toString() { return "Integer"; };
+ }
+
+ public static class StringType implements Type
+ {
+ public String toString() { return "String"; };
+ }
+
+ public static class RowType implements Type
+ {
+ ArrayList<Type> types_;
+ public RowType(ArrayList<Type> types)
+ {
+ types_ = types;
+ }
+
+ public String toString()
+ {
+ StringBuffer sb = new StringBuffer("<");
+ for (int idx = types_.size(); idx > 0; idx--)
+ {
+ sb.append(types_.toString());
+ if (idx != 1)
+ {
+ sb.append(", ");
+ }
+ }
+ sb.append(">");
+ return sb.toString();
+ }
+ }
+
+ public static class ArrayType
+ {
+ Type elementType_;
+ public ArrayType(Type elementType)
+ {
+ elementType_ = elementType;
+ }
+
+ public String toString()
+ {
+ return "Array(" + elementType_.toString() + ")";
+ }
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/cql/common/ColumnMapExpr.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/cql/common/ColumnMapExpr.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/cql/common/ColumnMapExpr.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/cql/common/ColumnMapExpr.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,25 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql.common;
+
+import java.util.ArrayList;
+
+public class ColumnMapExpr extends ArrayList<Pair<OperandDef, OperandDef>>
+{
+ private static final long serialVersionUID = 1L;
+};
\ No newline at end of file
Added: incubator/cassandra/src/org/apache/cassandra/cql/common/ColumnRangeQueryRSD.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/cql/common/ColumnRangeQueryRSD.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/cql/common/ColumnRangeQueryRSD.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/cql/common/ColumnRangeQueryRSD.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql.common;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.List;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.cql.execution.RuntimeErrorMsg;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.db.*;
+
+/**
+ * A Row Source Defintion (RSD) for doing a range query on a column map
+ * (in Standard or Super Column Family).
+ */
+public class ColumnRangeQueryRSD extends RowSourceDef
+{
+ private final static Logger logger_ = Logger.getLogger(ColumnRangeQueryRSD.class);
+ private CFMetaData cfMetaData_;
+ private OperandDef rowKey_;
+ private OperandDef superColumnKey_;
+ private int offset_;
+ private int limit_;
+
+ /**
+ * Set up a range query on column map in a simple column family.
+ * The column map in a simple column family is identified by the rowKey.
+ *
+ * Note: "limit" of -1 is the equivalent of no limit.
+ * "offset" specifies the number of rows to skip. An offset of 0 implies from the first row.
+ */
+ public ColumnRangeQueryRSD(CFMetaData cfMetaData, OperandDef rowKey, int offset, int limit)
+ {
+ cfMetaData_ = cfMetaData;
+ rowKey_ = rowKey;
+ superColumnKey_ = null;
+ offset_ = offset;
+ limit_ = limit;
+ }
+
+ /**
+ * Setup a range query on a column map in a super column family.
+ * The column map in a super column family is identified by the rowKey & superColumnKey.
+ *
+ * Note: "limit" of -1 is the equivalent of no limit.
+ * "offset" specifies the number of rows to skip. An offset of 0 implies the first row.
+ */
+ public ColumnRangeQueryRSD(CFMetaData cfMetaData, ConstantOperand rowKey, ConstantOperand superColumnKey,
+ int offset, int limit)
+ {
+ cfMetaData_ = cfMetaData;
+ rowKey_ = rowKey;
+ superColumnKey_ = superColumnKey;
+ offset_ = offset;
+ limit_ = limit;
+ }
+
+ public List<Map<String,String>> getRows()
+ {
+ String columnFamily_column;
+ String superColumnKey = null;
+
+ if (superColumnKey_ != null)
+ {
+ superColumnKey = (String)(superColumnKey_.get());
+ columnFamily_column = cfMetaData_.cfName + ":" + superColumnKey;
+ }
+ else
+ {
+ columnFamily_column = cfMetaData_.cfName;
+ }
+
+ Row row = null;
+ try
+ {
+ String key = (String)(rowKey_.get());
+ row = StorageProxy.readProtocol(cfMetaData_.tableName, key, columnFamily_column,
+ offset_, limit_, StorageService.ConsistencyLevel.WEAK);
+ }
+ catch (Exception e)
+ {
+ logger_.error(LogUtil.throwableToString(e));
+ throw new RuntimeException(RuntimeErrorMsg.GENERIC_ERROR.getMsg());
+ }
+
+ List<Map<String, String>> rows = new LinkedList<Map<String, String>>();
+ if (row != null)
+ {
+ Map<String, ColumnFamily> cfMap = row.getColumnFamilies();
+ if (cfMap != null && cfMap.size() > 0)
+ {
+ ColumnFamily cfamily = cfMap.get(cfMetaData_.cfName);
+ if (cfamily != null)
+ {
+ Collection<IColumn> columns = null;
+ if (superColumnKey_ != null)
+ {
+ // this is the super column case
+ IColumn column = cfamily.getColumn(superColumnKey);
+ if (column != null)
+ columns = column.getSubColumns();
+ }
+ else
+ {
+ columns = cfamily.getAllColumns();
+ }
+
+ if (columns != null && columns.size() > 0)
+ {
+ for (IColumn column : columns)
+ {
+ Map<String, String> result = new HashMap<String, String>();
+
+ result.put(cfMetaData_.n_columnKey, column.name());
+ result.put(cfMetaData_.n_columnValue, new String(column.value()));
+ result.put(cfMetaData_.n_columnTimestamp, Long.toString(column.timestamp()));
+
+ rows.add(result);
+ }
+ }
+ }
+ }
+ }
+ return rows;
+ }
+
+ public String explainPlan()
+ {
+ return String.format("%s Column Family: Column Range Query: \n" +
+ " Table Name: %s\n" +
+ " Column Family: %s\n" +
+ " RowKey: %s\n" +
+ "%s" +
+ " Offset: %d\n" +
+ " Limit: %d\n" +
+ " Order By: %s",
+ cfMetaData_.columnType,
+ cfMetaData_.tableName,
+ cfMetaData_.cfName,
+ rowKey_.explain(),
+ (superColumnKey_ == null) ? "" : " SuperColumnKey: " + superColumnKey_.explain() + "\n",
+ offset_, limit_,
+ cfMetaData_.indexProperty_);
+ }
+}
\ No newline at end of file
Added: incubator/cassandra/src/org/apache/cassandra/cql/common/ConstantOperand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/cql/common/ConstantOperand.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/cql/common/ConstantOperand.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/cql/common/ConstantOperand.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql.common;
+
+/**
+ * ConstantOperand:
+ * Represents a literal/constant operand in the CQL statement.
+ * Lives as part of the shared execution plan.
+ */
+public class ConstantOperand implements OperandDef
+{
+ Object value_;
+ public ConstantOperand(Object value)
+ {
+ value_ = value;
+ }
+
+ public Object get()
+ {
+ return value_;
+ }
+
+ public String explain()
+ {
+ return "Constant: '" + value_ + "'";
+ }
+};
\ No newline at end of file