You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 08:57:31 UTC

svn commit: r749218 [28/34] - in /incubator/cassandra: branches/ dist/ nightly/ site/ tags/ trunk/ trunk/lib/ trunk/src/ trunk/src/org/ trunk/src/org/apache/ trunk/src/org/apache/cassandra/ trunk/src/org/apache/cassandra/analytics/ trunk/src/org/apache...

Added: incubator/cassandra/trunk/src/org/apache/cassandra/service/IPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/IPartitioner.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/IPartitioner.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/IPartitioner.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.math.BigInteger;
+
+public interface IPartitioner
+{
+    public BigInteger hash(String key);
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/service/IResponseResolver.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/IResponseResolver.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/IResponseResolver.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/IResponseResolver.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.util.List;
+
+import org.apache.cassandra.net.Message;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IResponseResolver<T> {
+
+	/*
+	 * This Method resolves the responses that are passed in . for example : if
+	 * its write response then all we get is true or false return values which
+	 * implies if the writes were successful but for reads its more complicated
+	 * you need to look at the responses and then based on differences schedule
+	 * repairs . Hence you need to derive a response resolver based on your
+	 * needs from this interface.
+	 */
+	public T resolve(List<Message> responses) throws DigestMismatchException;
+	public boolean isDataPresent(List<Message> responses);
+
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/service/LeaderElector.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/LeaderElector.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/LeaderElector.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/LeaderElector.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndPointState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.IEndPointStateChangeSubscriber;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.*;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+class LeaderElector implements IEndPointStateChangeSubscriber
+{
+    private static Logger logger_ = Logger.getLogger(LeaderElector.class);
+    protected static final String leaderState_ = "LEADER";
+    private static LeaderElector instance_ = null;
+    private static Lock createLock_ = new ReentrantLock();
+    
+    /*
+     * Factory method that gets an instance of the StorageService
+     * class.
+    */
+    public static LeaderElector instance()
+    {
+        if ( instance_ == null )
+        {
+            LeaderElector.createLock_.lock();
+            try
+            {
+                if ( instance_ == null )
+                {
+                    instance_ = new LeaderElector();
+                }
+            }
+            finally
+            {
+                createLock_.unlock();
+            }
+        }
+        return instance_;
+    }
+    
+    /* The elected leader. */
+    private AtomicReference<EndPoint> leader_;
+    private Condition condition_;
+    private ExecutorService leaderElectionService_ = new DebuggableThreadPoolExecutor(1,
+            1,
+            Integer.MAX_VALUE,
+            TimeUnit.SECONDS,
+            new LinkedBlockingQueue<Runnable>(),
+            new ThreadFactoryImpl("LEADER-ELECTOR")
+            );
+    
+    private class LeaderDeathMonitor implements Runnable
+    {
+        private String pathCreated_;
+        
+        LeaderDeathMonitor(String pathCreated)
+        {
+            pathCreated_ = pathCreated;
+        }
+        
+        public void run()
+        {            
+            ZooKeeper zk = StorageService.instance().getZooKeeperHandle();
+            String path = "/Cassandra/" + DatabaseDescriptor.getClusterName() + "/Leader";
+            try
+            {
+                String createPath = path + "/L-";                                
+                LeaderElector.createLock_.lock();
+                while( true )
+                {
+                    /* Get all znodes under the Leader znode */
+                    List<String> values = zk.getChildren(path, false);
+                    SortedMap<Integer, String> suffixToZnode = getSuffixToZnodeMapping(values);
+                    String value = suffixToZnode.get( suffixToZnode.firstKey() );
+                    /*
+                     * Get the first znode and if it is the 
+                     * pathCreated created above then the data
+                     * in that znode is the leader's identity. 
+                    */
+                    if ( leader_ == null )
+                    {
+                        leader_ = new AtomicReference<EndPoint>( EndPoint.fromBytes( zk.getData(path + "/" + value, false, null) ) );
+                    }
+                    else
+                    {
+                        leader_.set( EndPoint.fromBytes( zk.getData(path + "/" + value, false, null) ) );
+                        /* Disseminate the state as to who the leader is. */
+                        onLeaderElection();
+                    }
+                    logger_.debug("Elected leader is " + leader_ + " @ znode " + ( path + "/" + value ) );                                     
+                    /* We need only the last portion of this znode */
+                    int index = getLocalSuffix();                   
+                    if ( index > suffixToZnode.firstKey() )
+                    {
+                        String pathToCheck = path + "/" + getImmediatelyPrecedingZnode(suffixToZnode, index);
+                        Stat stat = zk.exists(pathToCheck, true);
+                        if ( stat != null )
+                        {
+                            logger_.debug("Awaiting my turn ...");
+                            condition_.await();
+                            logger_.debug("Checking to see if leader is around ...");
+                        }
+                    }
+                    else
+                    {
+                        break;
+                    }
+                }
+            }
+            catch ( InterruptedException ex )
+            {
+                logger_.warn(LogUtil.throwableToString(ex));
+            }
+            catch ( IOException ex )
+            {
+                logger_.warn(LogUtil.throwableToString(ex));
+            }
+            catch ( KeeperException ex )
+            {
+                logger_.warn(LogUtil.throwableToString(ex));
+            }
+            finally
+            {
+                LeaderElector.createLock_.unlock();
+            }
+        }
+        
+        private SortedMap<Integer, String> getSuffixToZnodeMapping(List<String> values)
+        {
+            SortedMap<Integer, String> suffixZNodeMap = new TreeMap<Integer, String>();
+            for ( String znode : values )
+            {
+                String[] peices = znode.split("-");
+                suffixZNodeMap.put(Integer.parseInt( peices[1] ), znode);
+            }
+            return suffixZNodeMap;
+        }
+        
+        private String getImmediatelyPrecedingZnode(SortedMap<Integer, String> suffixToZnode, int index)
+        {
+            List<Integer> suffixes = new ArrayList<Integer>( suffixToZnode.keySet() );            
+            Collections.sort(suffixes);
+            int position = Collections.binarySearch(suffixes, index);
+            return suffixToZnode.get( suffixes.get( position - 1 ) );
+        }
+        
+        /**
+         * If the local node's leader related znode is L-3
+         * this method will return 3.
+         * @return suffix portion of L-3.
+         */
+        private int getLocalSuffix()
+        {
+            String[] peices = pathCreated_.split("/");
+            String leaderPeice = peices[peices.length - 1];
+            String[] leaderPeices = leaderPeice.split("-");
+            return Integer.parseInt( leaderPeices[1] );
+        }
+    }
+    
+    private LeaderElector()
+    {
+        condition_ = LeaderElector.createLock_.newCondition();
+    }
+    
+    /**
+     * Use to inform interested parties about the change in the state
+     * for specified endpoint
+     * 
+     * @param endpoint endpoint for which the state change occured.
+     * @param epState state that actually changed for the above endpoint.
+     */
+    public void onChange(EndPoint endpoint, EndPointState epState)
+    {        
+        /* node identifier for this endpoint on the identifier space */
+        ApplicationState leaderState = epState.getApplicationState(LeaderElector.leaderState_);
+        if (leaderState != null && !leader_.equals(endpoint))
+        {
+            EndPoint leader = EndPoint.fromString( leaderState.getState() );
+            logger_.debug("New leader in the cluster is " + leader);
+            leader_.set(endpoint);
+        }
+    }
+    
+    void start() throws Throwable
+    {
+        /* Register with the Gossiper for EndPointState notifications */
+        Gossiper.instance().register(this);
+        logger_.debug("Starting the leader election process ...");
+        ZooKeeper zk = StorageService.instance().getZooKeeperHandle();
+        String path = "/Cassandra/" + DatabaseDescriptor.getClusterName() + "/Leader";
+        String createPath = path + "/L-";
+        
+        /* Create the znodes under the Leader znode */       
+        logger_.debug("Attempting to create znode " + createPath);
+        String pathCreated = zk.create(createPath, EndPoint.toBytes( StorageService.getLocalControlEndPoint() ), Ids.OPEN_ACL_UNSAFE, (CreateMode.EPHEMERAL_SEQUENTIAL) );             
+        logger_.debug("Created znode under leader znode " + pathCreated);            
+        leaderElectionService_.submit(new LeaderDeathMonitor(pathCreated));
+    }
+    
+    void signal()
+    {
+        logger_.debug("Signalling others to check on leader ...");
+        try
+        {
+            LeaderElector.createLock_.lock();
+            condition_.signal();
+        }
+        finally
+        {
+            LeaderElector.createLock_.unlock();
+        }
+    }
+    
+    EndPoint getLeader()
+    {
+        return (leader_ != null ) ? leader_.get() : StorageService.getLocalStorageEndPoint();
+    }
+    
+    private void onLeaderElection() throws InterruptedException, IOException
+    {
+        /*
+         * If the local node is the leader then not only does he 
+         * diseminate the information but also starts the M/R job
+         * tracker. Non leader nodes start the M/R task tracker 
+         * thereby initializing the M/R subsystem.
+        */
+        if ( StorageService.instance().isLeader(leader_.get()) )
+        {
+            Gossiper.instance().addApplicationState(LeaderElector.leaderState_, new ApplicationState(leader_.toString()));              
+        }
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/service/LoadDisseminator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/LoadDisseminator.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/LoadDisseminator.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/LoadDisseminator.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.util.TimerTask;
+
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.utils.FileUtils;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+class LoadDisseminator extends TimerTask
+{
+    private final static Logger logger_ = Logger.getLogger(LoadDisseminator.class);
+    protected final static String loadInfo_= "LOAD-INFORMATION";
+    
+    public void run()
+    {
+        try
+        {
+            long diskSpace = FileUtils.getUsedDiskSpace();                
+            String diskUtilization = FileUtils.stringifyFileSize(diskSpace);
+            logger_.debug("Disseminating load info ...");
+            Gossiper.instance().addApplicationState(LoadDisseminator.loadInfo_, new ApplicationState(diskUtilization));
+        }
+        catch ( Throwable ex )
+        {
+            logger_.warn( LogUtil.throwableToString(ex) );
+        }
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/service/LoadInfo.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/LoadInfo.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/LoadInfo.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/LoadInfo.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,64 @@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.util.Comparator;
+
+import org.apache.cassandra.utils.FileUtils;
+
+
+class LoadInfo
+{
+    protected static class DiskSpaceComparator implements Comparator<LoadInfo>
+    {
+        public int compare(LoadInfo li, LoadInfo li2)
+        {
+            if ( li == null || li2 == null )
+                throw new IllegalArgumentException("Cannot pass in values that are NULL.");
+            
+            double space = FileUtils.stringToFileSize(li.diskSpace_);
+            double space2 = FileUtils.stringToFileSize(li2.diskSpace_);
+            return (int)(space - space2);
+        }
+    }
+        
+    private String diskSpace_;
+    
+    LoadInfo(long diskSpace)
+    {       
+        diskSpace_ = FileUtils.stringifyFileSize(diskSpace);
+    }
+    
+    LoadInfo(String loadInfo)
+    {
+        diskSpace_ = loadInfo;
+    }
+    
+    String diskSpace()
+    {
+        return diskSpace_;
+    }
+    
+    public String toString()
+    {
+        StringBuilder sb = new StringBuilder("");       
+        sb.append(diskSpace_);
+        return sb.toString();
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/service/LocationInfoVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/LocationInfoVerbHandler.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/LocationInfoVerbHandler.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/LocationInfoVerbHandler.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.net.CompactEndPointSerializationHelper;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class LocationInfoVerbHandler implements IVerbHandler
+{
+    private static Logger logger_ = Logger.getLogger( LocationInfoVerbHandler.class );
+    
+    public void doVerb(Message message)
+    {            
+        EndPoint from = message.getFrom();
+        logger_.info("Received a location download request from " + from);
+        
+        Object[] body = message.getMessageBody();
+        byte[] bytes = (byte[])body[0];
+        DataInputBuffer bufIn = new DataInputBuffer();
+        bufIn.reset(bytes, bytes.length);
+        try
+        {
+            Range range = Range.serializer().deserialize(bufIn);
+            /* Get the replicas for the given range */
+            EndPoint[] replicas = StorageService.instance().getNStorageEndPoint(range.right());
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            DataOutputStream dos = new DataOutputStream(bos);
+            
+            for ( EndPoint replica : replicas )
+            {       
+                replica.setPort(DatabaseDescriptor.getControlPort());
+                if ( FailureDetector.instance().isAlive(replica) )
+                {
+                    replica.setPort(DatabaseDescriptor.getStoragePort());
+                    CompactEndPointSerializationHelper.serialize(replica, dos);
+                    break;
+                }
+            }
+            
+            Message response = message.getReply(StorageService.getLocalStorageEndPoint(), new Object[]{bos.toByteArray()});
+            logger_.info("Sending the token download response to " + from);
+            MessagingService.getMessagingInstance().sendOneWay(response, from);
+        }
+        catch (IOException ex)
+        {
+            logger_.warn(LogUtil.throwableToString(ex));
+        }        
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/service/OrderPreservingHashPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/OrderPreservingHashPartitioner.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/OrderPreservingHashPartitioner.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/OrderPreservingHashPartitioner.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.math.BigInteger;
+
+public class OrderPreservingHashPartitioner implements IPartitioner
+{
+    private final static int maxKeyHashLength_ = 24;
+    private static final BigInteger prime_ = BigInteger.valueOf(31);
+    
+    public BigInteger hash(String key)
+    {
+        BigInteger h = BigInteger.ZERO;
+        char val[] = key.toCharArray();
+        
+        for (int i = 0; i < OrderPreservingHashPartitioner.maxKeyHashLength_; i++)
+        {
+            if( i < val.length )
+                h = OrderPreservingHashPartitioner.prime_.multiply(h).add( BigInteger.valueOf(val[i]) );
+            else
+                h = OrderPreservingHashPartitioner.prime_.multiply(h).add( OrderPreservingHashPartitioner.prime_ );
+        }
+        return h;
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/service/PartitionerType.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/PartitionerType.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/PartitionerType.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/PartitionerType.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+public enum PartitionerType
+{
+    RANDOM,
+    OPHF
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/service/QuorumResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/QuorumResponseHandler.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/QuorumResponseHandler.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/QuorumResponseHandler.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.WriteResponseMessage;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.utils.*;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class QuorumResponseHandler<T> implements IAsyncCallback
+{
+    private static Logger logger_ = Logger.getLogger( QuorumResponseHandler.class );
+    private Lock lock_ = new ReentrantLock();
+    private Condition condition_;
+    private int responseCount_;
+    private List<Message> responses_ = new ArrayList<Message>();
+    private IResponseResolver<T> responseResolver_;
+    private AtomicBoolean done_ = new AtomicBoolean(false);
+    
+    public QuorumResponseHandler(int responseCount, IResponseResolver<T> responseResolver)
+    {        
+        condition_ = lock_.newCondition();
+        responseCount_ = responseCount;
+        responseResolver_ =  responseResolver;
+    }
+    
+    public void  setResponseCount(int responseCount)
+    {
+        responseCount_ = responseCount;
+    }
+    
+    public T get() throws TimeoutException, DigestMismatchException
+    {
+        long startTime = System.currentTimeMillis();
+    	lock_.lock();
+        try
+        {            
+            boolean bVal = true;            
+            try
+            {
+            	if ( !done_.get() )
+                {            		
+            		bVal = condition_.await(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
+                }
+            }
+            catch ( InterruptedException ex )
+            {
+                logger_.debug( LogUtil.throwableToString(ex) );
+            }
+            
+            if ( !bVal && !done_.get() )
+            {
+                StringBuilder sb = new StringBuilder("");
+                for ( Message message : responses_ )
+                {
+                    sb.append(message.getFrom());                    
+                }                
+                throw new TimeoutException("Operation timed out - received only " +  responses_.size() + " responses from " + sb.toString() + " .");
+            }
+        }
+        finally
+        {
+            lock_.unlock();
+            for(Message response : responses_)
+            {
+            	MessagingService.removeRegisteredCallback( response.getMessageId() );
+            }
+        }
+        logger_.info("QuorumResponseHandler: " + (System.currentTimeMillis() - startTime)
+                + " ms.");
+
+    	return responseResolver_.resolve( responses_);
+    }
+    
+    public void response(Message message)
+    {
+        lock_.lock();
+        try
+        {
+        	int majority = (responseCount_ >> 1) + 1;            
+            if ( !done_.get() )
+            {
+            	responses_.add( message );
+            	if ( responses_.size() >= majority && responseResolver_.isDataPresent(responses_))
+            	{
+            		done_.set(true);
+            		condition_.signal();            	
+            	}
+            }
+        }
+        finally
+        {
+            lock_.unlock();
+        }
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/service/RandomPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/RandomPartitioner.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/RandomPartitioner.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/RandomPartitioner.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.math.BigInteger;
+
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * This class generates a MD5 hash of the key. It uses the standard technique
+ * used in all DHT's.
+ * 
+ * @author alakshman
+ * 
+ */
+public class RandomPartitioner implements IPartitioner
+{
+	public BigInteger hash(String key)
+	{
+		return FBUtilities.hash(key);
+	}
+}
\ No newline at end of file

Added: incubator/cassandra/trunk/src/org/apache/cassandra/service/ReadRepairManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/ReadRepairManager.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/ReadRepairManager.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/ReadRepairManager.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.concurrent.locks.*;
+
+import org.apache.cassandra.db.Column;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.RowMutationMessage;
+import org.apache.cassandra.db.SuperColumn;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Header;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.Cachetable;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.ICacheExpungeHook;
+import org.apache.cassandra.utils.ICachetable;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+
+/*
+ * This class manages the read repairs . This is a singleton class
+ * it basically uses the cache table construct to schedule writes that have to be 
+ * made for read repairs. 
+ * A cachetable is created which wakes up every n  milliseconds specified by 
+ * expirationTimeInMillis and calls a global hook fn on pending entries 
+ * This fn basically sends the message to the appropriate servers to update them
+ * with the latest changes.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+class ReadRepairManager
+{
+    private static Logger logger_ = Logger.getLogger(ReadRepairManager.class);
+	private static final long expirationTimeInMillis = 2000;
+	private static Lock lock_ = new ReentrantLock();
+	private static ReadRepairManager self_ = null;
+
+	/*
+	 * This is the internal class which actually
+	 * implements the global hook fn called by the readrepair manager
+	 */
+	static class ReadRepairPerformer implements
+			ICacheExpungeHook<String, Message>
+	{
+		private static Logger logger_ = Logger.getLogger(ReadRepairPerformer.class);
+		/*
+		 * The hook fn which takes the end point and the row mutation that 
+		 * needs to be sent to the end point in order 
+		 * to perform read repair.
+		 */
+		public void callMe(String target,
+				Message message)
+		{
+			String[] pieces = FBUtilities.strip(target, ":");
+			EndPoint to = new EndPoint(pieces[0], Integer.parseInt(pieces[1]));
+			MessagingService.getMessagingInstance().sendOneWay(message, to);			
+		}
+
+	}
+
+	private ICachetable<String, Message> readRepairTable_ = new Cachetable<String, Message>(expirationTimeInMillis, new ReadRepairManager.ReadRepairPerformer());
+
+	protected ReadRepairManager()
+	{
+
+	}
+
+	public  static ReadRepairManager instance()
+	{
+		if (self_ == null)
+		{
+            lock_.lock();
+            try
+            {
+                if ( self_ == null )
+                    self_ = new ReadRepairManager();
+            }
+            finally
+            {
+                lock_.unlock();
+            }
+		}
+		return self_;
+	}
+
+	/*
+	 * This is the fn that should be used to scheule a read repair 
+	 * specify a endpoint on whcih the read repair should happen and the row mutaion
+	 * message that has the repaired row.
+	 */
+	public void schedule(EndPoint target, RowMutationMessage rowMutationMessage)
+	{
+        /*
+		Message message = new Message(StorageService.getLocalStorageEndPoint(),
+				StorageService.mutationStage_,
+				StorageService.readRepairVerbHandler_, new Object[]
+				{ rowMutationMessage });
+        */
+        try
+        {
+            Message message = RowMutationMessage.makeRowMutationMessage(rowMutationMessage, StorageService.readRepairVerbHandler_);
+    		String key = target + ":" + message.getMessageId();
+    		readRepairTable_.put(key, message);
+        }
+        catch ( IOException ex )
+        {
+            logger_.info(LogUtil.throwableToString(ex));
+        }
+	}
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/ReadResponseResolver.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/ReadResponseResolver.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/ReadResponseResolver.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ReadResponseMessage;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.RowMutationMessage;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+/*
+ * This class is used by all read functions and is called by the Qorum 
+ * when atleast a few of the servers ( few is specified in Quorum)
+ * have sent the response . The resolve fn then schedules read repair 
+ * and resolution of read data from the various servers.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public class ReadResponseResolver implements IResponseResolver<Row>
+{
+
+	private static Logger logger_ = Logger.getLogger(WriteResponseResolver.class);
+
+	/*
+	 * This method for resolving read data should look at the timestamps of each
+	 * of the columns that are read and should pick up columns with the latest
+	 * timestamp. For those columns where the timestamp is not the latest a
+	 * repair request should be scheduled.
+	 * 
+	 */
+	public Row resolve(List<Message> responses) throws DigestMismatchException
+	{
+        long startTime = System.currentTimeMillis();
+		Row retRow = null;
+		List<Row> rowList = new ArrayList<Row>();
+		List<EndPoint> endPoints = new ArrayList<EndPoint>();
+		String key = null;
+		String table = null;
+		byte[] digest = new byte[0];
+		boolean isDigestQuery = false;
+        
+        /*
+		 * Populate the list of rows from each of the messages
+		 * Check to see if there is a digest query. If a digest 
+         * query exists then we need to compare the digest with 
+         * the digest of the data that is received.
+        */
+        DataInputBuffer bufIn = new DataInputBuffer();
+		for (Message response : responses)
+		{					            
+            byte[] body = (byte[])response.getMessageBody()[0];            
+            bufIn.reset(body, body.length);
+            try
+            {
+                long start = System.currentTimeMillis();
+                ReadResponseMessage result = ReadResponseMessage.serializer().deserialize(bufIn);            
+                logger_.debug( "Response deserialization time : " + (System.currentTimeMillis() - start) + " ms.");
+    			if(!result.isDigestQuery())
+    			{
+    				rowList.add(result.row());
+    				endPoints.add(response.getFrom());
+    				key = result.row().key();
+    				table = result.table();
+    			}
+    			else
+    			{
+    				digest = result.digest();
+    				isDigestQuery = true;
+    			}
+            }
+            catch( IOException ex )
+            {
+                logger_.info(LogUtil.throwableToString(ex));
+            }
+		}
+		// If there was a digest query compare it withh all teh data digests 
+		// If there is a mismatch then thwrow an exception so that read repair can happen.
+		if(isDigestQuery)
+		{
+			for(Row row: rowList)
+			{
+				if( !Arrays.equals(row.digest(), digest) )
+				{
+					throw new DigestMismatchException("The Digest does not match");
+				}
+			}
+		}
+		
+        /* If the rowList is empty then we had some exception above. */
+        if ( rowList.size() == 0 )
+        {
+            return retRow;
+        }
+        
+        /* Now calculate the resolved row */
+		retRow = new Row(key);		
+		for (int i = 0 ; i < rowList.size(); i++)
+		{
+			retRow.repair(rowList.get(i));			
+		}
+        // At  this point  we have the return row .
+		// Now we need to calculate the differnce 
+		// so that we can schedule read repairs 
+		
+		for (int i = 0 ; i < rowList.size(); i++)
+		{
+			// calculate the difference , since retRow is the resolved
+			// row it can be used as the super set , remember no deletes 
+			// will happen with diff its only for additions so far 
+			// TODO : handle deletes 
+			Row diffRow = rowList.get(i).diff(retRow);
+			if(diffRow == null) // no repair needs to happen
+				continue;
+			// create the row mutation message based on the diff and schedule a read repair 
+			RowMutation rowMutation = new RowMutation(table, key);            			
+	    	Map<String, ColumnFamily> columnFamilies = diffRow.getColumnFamilies();
+	        Set<String> cfNames = columnFamilies.keySet();
+	        
+	        for ( String cfName : cfNames )
+	        {
+	            ColumnFamily cf = columnFamilies.get(cfName);
+	            rowMutation.add(cfName, cf);
+	        }
+            RowMutationMessage rowMutationMessage = new RowMutationMessage(rowMutation);
+	        // schedule the read repair
+	        ReadRepairManager.instance().schedule(endPoints.get(i),rowMutationMessage);
+		}
+        logger_.info("resolve: " + (System.currentTimeMillis() - startTime)
+                + " ms.");
+		return retRow;
+	}
+
+	public boolean isDataPresent(List<Message> responses)
+	{
+		boolean isDataPresent = false;
+		for (Message response : responses)
+		{
+            byte[] body = (byte[])response.getMessageBody()[0];
+			DataInputBuffer bufIn = new DataInputBuffer();
+            bufIn.reset(body, body.length);
+            try
+            {
+    			ReadResponseMessage result = ReadResponseMessage.serializer().deserialize(bufIn);
+    			if(!result.isDigestQuery())
+    			{
+    				isDataPresent = true;
+    			}
+                bufIn.close();
+            }
+            catch(IOException ex)
+            {
+                logger_.info(LogUtil.throwableToString(ex));
+            }                        
+		}
+		return isDataPresent;
+	}
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageLoadBalancer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageLoadBalancer.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageLoadBalancer.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageLoadBalancer.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,414 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.Serializable;
+import java.math.BigInteger;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.SingleThreadedStage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.dht.LeaveJoinProtocolImpl;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndPointState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.IEndPointStateChangeSubscriber;
+import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.net.*;
+import org.apache.cassandra.utils.*;
+
+/*
+ * The load balancing algorithm here is an implementation of
+ * the algorithm as described in the paper "Scalable range query
+ * processing for large-scale distributed database applications".
+ * This class keeps track of load information across the system.
+ * It registers itself with the Gossiper for ApplicationState namely
+ * load information i.e number of requests processed w.r.t distinct
+ * keys at an Endpoint. Monitor load infomation for a 5 minute
+ * interval and then do load balancing operations if necessary.
+ * 
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+final class StorageLoadBalancer implements IEndPointStateChangeSubscriber, IComponentShutdown
+{
+    class LoadBalancer implements Runnable
+    {
+        LoadBalancer()
+        {
+            /* Copy the entries in loadInfo_ into loadInfo2_ and use it for all calculations */
+            loadInfo2_.putAll(loadInfo_);
+        }
+
+        /**
+         * Obtain a node which is a potential target. Start with
+         * the neighbours i.e either successor or predecessor.
+         * Send the target a MoveMessage. If the node cannot be
+         * relocated on the ring then we pick another candidate for
+         * relocation.
+        */        
+        public void run()
+        {
+            /*
+            int threshold = (int)(StorageLoadBalancer.ratio_ * averageSystemLoad());
+            int myLoad = localLoad();            
+            EndPoint predecessor = storageService_.getPredecessor(StorageService.getLocalStorageEndPoint());
+            logger_.debug("Trying to relocate the predecessor " + predecessor);
+            boolean value = tryThisNode(myLoad, threshold, predecessor);
+            if ( !value )
+            {
+                loadInfo2_.remove(predecessor);
+                EndPoint successor = storageService_.getSuccessor(StorageService.getLocalStorageEndPoint());
+                logger_.debug("Trying to relocate the successor " + successor);
+                value = tryThisNode(myLoad, threshold, successor);
+                if ( !value )
+                {
+                    loadInfo2_.remove(successor);
+                    while ( !loadInfo2_.isEmpty() )
+                    {
+                        EndPoint target = findARandomLightNode();
+                        if ( target != null )
+                        {
+                            logger_.debug("Trying to relocate the random node " + target);
+                            value = tryThisNode(myLoad, threshold, target);
+                            if ( !value )
+                            {
+                                loadInfo2_.remove(target);
+                            }
+                            else
+                            {
+                                break;
+                            }
+                        }
+                        else
+                        {
+                            // No light nodes available - this is NOT good.
+                            logger_.warn("Not even a single lightly loaded node is available ...");
+                            break;
+                        }
+                    }
+
+                    loadInfo2_.clear();                    
+                     // If we are here and no node was available to
+                     // perform load balance with we need to report and bail.                    
+                    if ( !value )
+                    {
+                        logger_.warn("Load Balancing operations weren't performed for this node");
+                    }
+                }                
+            }
+            */        
+        }
+
+        /*
+        private boolean tryThisNode(int myLoad, int threshold, EndPoint target)
+        {
+            boolean value = false;
+            LoadInfo li = loadInfo2_.get(target);
+            int pLoad = li.count();
+            if ( ((myLoad + pLoad) >> 1) <= threshold )
+            {
+                //calculate the number of keys to be transferred
+                int keyCount = ( (myLoad - pLoad) >> 1 );
+                logger_.debug("Number of keys we attempt to transfer to " + target + " " + keyCount);
+                // Determine the token that the target should join at.         
+                BigInteger targetToken = BootstrapAndLbHelper.getTokenBasedOnPrimaryCount(keyCount);
+                // Send a MoveMessage and see if this node is relocateable
+                MoveMessage moveMessage = new MoveMessage(targetToken);
+                Message message = new Message(StorageService.getLocalStorageEndPoint(), StorageLoadBalancer.lbStage_, StorageLoadBalancer.moveMessageVerbHandler_, new Object[]{moveMessage});
+                logger_.debug("Sending a move message to " + target);
+                IAsyncResult result = MessagingService.getMessagingInstance().sendRR(message, target);
+                value = (Boolean)result.get()[0];
+                logger_.debug("Response for query to relocate " + target + " is " + value);
+            }
+            return value;
+        }
+        */
+    }
+
+    class MoveMessageVerbHandler implements IVerbHandler
+    {
+        public void doVerb(Message message)
+        {
+            Message reply = message.getReply(StorageService.getLocalStorageEndPoint(), new Object[]{isMoveable_.get()});
+            MessagingService.getMessagingInstance().sendOneWay(reply, message.getFrom());
+            if ( isMoveable_.get() )
+            {
+                MoveMessage moveMessage = (MoveMessage)message.getMessageBody()[0];
+                BigInteger targetToken = moveMessage.getTargetToken();
+                /* Start the leave operation and join the ring at the position specified */
+                isMoveable_.set(false);
+            }
+        }
+    }
+
+    private static final Logger logger_ = Logger.getLogger(StorageLoadBalancer.class);
+    private static final String lbStage_ = "LOAD-BALANCER-STAGE";
+    private static final String moveMessageVerbHandler_ = "MOVE-MESSAGE-VERB-HANDLER";
+    /* time to delay in minutes the actual load balance procedure if heavily loaded */
+    private static final int delay_ = 5;
+    /* Ratio of highest loaded node and the average load. */
+    private static final double ratio_ = 1.5;
+
+    private StorageService storageService_;
+    /* this indicates whether this node is already helping someone else */
+    private AtomicBoolean isMoveable_ = new AtomicBoolean(false);
+    private Map<EndPoint, LoadInfo> loadInfo_ = new HashMap<EndPoint, LoadInfo>();
+    /* This map is a clone of the one above and is used for various calculations during LB operation */
+    private Map<EndPoint, LoadInfo> loadInfo2_ = new HashMap<EndPoint, LoadInfo>();
+    /* This thread pool is used for initiating load balancing operations */
+    private ScheduledThreadPoolExecutor lb_ = new DebuggableScheduledThreadPoolExecutor(
+            1,
+            new ThreadFactoryImpl("LB-OPERATIONS")
+            );
+    /* This thread pool is used by target node to leave the ring. */
+    private ExecutorService lbOperations_ = new DebuggableThreadPoolExecutor(1,
+            1,
+            Integer.MAX_VALUE,
+            TimeUnit.SECONDS,
+            new LinkedBlockingQueue<Runnable>(),
+            new ThreadFactoryImpl("LB-TARGET")
+            );
+
+    StorageLoadBalancer(StorageService storageService)
+    {
+        storageService_ = storageService;
+        /* register the load balancer stage */
+        StageManager.registerStage(StorageLoadBalancer.lbStage_, new SingleThreadedStage(StorageLoadBalancer.lbStage_));
+        /* register the load balancer verb handler */
+        MessagingService.getMessagingInstance().registerVerbHandlers(StorageLoadBalancer.moveMessageVerbHandler_, new MoveMessageVerbHandler());
+        /* register with the StorageService */
+        storageService_.registerComponentForShutdown(this);
+    }
+
+    public void start()
+    {
+        /* Register with the Gossiper for EndPointState notifications */
+        Gossiper.instance().register(this);
+    }
+
+    public void shutdown()
+    {
+        lbOperations_.shutdownNow();
+        lb_.shutdownNow();
+    }
+
+    public void onChange(EndPoint endpoint, EndPointState epState)
+    {
+        logger_.debug("CHANGE IN STATE FOR @ StorageLoadBalancer " + endpoint);
+        // load information for this specified endpoint for load balancing 
+        ApplicationState loadInfoState = epState.getApplicationState(LoadDisseminator.loadInfo_);
+        if ( loadInfoState != null )
+        {
+            String lInfoState = loadInfoState.getState();
+            LoadInfo lInfo = new LoadInfo(lInfoState);
+            loadInfo_.put(endpoint, lInfo);
+            
+            /*
+            int currentLoad = Integer.parseInt(loadInfoState.getState());
+            // update load information for this endpoint
+            loadInfo_.put(endpoint, currentLoad);
+
+            // clone load information to perform calculations
+            loadInfo2_.putAll(loadInfo_);
+            // Perform the analysis for load balance operations
+            if ( isHeavyNode() )
+            {
+                logger_.debug(StorageService.getLocalStorageEndPoint() + " is a heavy node with load " + localLoad());
+                // lb_.schedule( new LoadBalancer(), StorageLoadBalancer.delay_, TimeUnit.MINUTES );
+            }
+            */
+        }       
+    }
+
+    /*
+     * Load information associated with a given endpoint.
+    */
+    LoadInfo getLoad(EndPoint ep)
+    {
+        LoadInfo li = loadInfo_.get(ep);        
+        return li;        
+    }
+
+    /*
+    private boolean isMoveable()
+    {
+        if ( !isMoveable_.get() )
+            return false;
+        int myload = localLoad();
+        EndPoint successor = storageService_.getSuccessor(StorageService.getLocalStorageEndPoint());
+        LoadInfo li = loadInfo2_.get(successor);
+        // "load" is NULL means that the successor node has not
+        // yet gossiped its load information. We should return
+        // false in this case since we want to err on the side
+        // of caution.
+        if ( li == null )
+            return false;
+        else
+        {            
+            if ( ( myload + li.count() ) > StorageLoadBalancer.ratio_*averageSystemLoad() )
+                return false;
+            else
+                return true;
+        }
+    }
+    */
+
+    /*
+    private int localLoad()
+    {
+        LoadInfo value = loadInfo2_.get(StorageService.getLocalStorageEndPoint());
+        return (value == null) ? 0 : value.count();
+    }
+    */
+
+    /*
+    private int averageSystemLoad()
+    {
+        int nodeCount = loadInfo2_.size();
+        Set<EndPoint> nodes = loadInfo2_.keySet();
+
+        int systemLoad = 0;
+        for ( EndPoint node : nodes )
+        {
+            LoadInfo load = loadInfo2_.get(node);
+            if ( load != null )
+                systemLoad += load.count();
+        }
+        int averageLoad = (nodeCount > 0) ? (systemLoad / nodeCount) : 0;
+        logger_.debug("Average system load should be " + averageLoad);
+        return averageLoad;
+    }
+    */
+    
+    /*
+    private boolean isHeavyNode()
+    {
+        return ( localLoad() > ( StorageLoadBalancer.ratio_ * averageSystemLoad() ) );
+    }
+    */
+    
+    /*
+    private boolean isMoveable(EndPoint target)
+    {
+        int threshold = (int)(StorageLoadBalancer.ratio_ * averageSystemLoad());
+        if ( isANeighbour(target) )
+        {
+            // If the target is a neighbour then it is
+            // moveable if its
+            LoadInfo load = loadInfo2_.get(target);
+            if ( load == null )
+                return false;
+            else
+            {
+                int myload = localLoad();
+                int avgLoad = (load.count() + myload) >> 1;
+                if ( avgLoad <= threshold )
+                    return true;
+                else
+                    return false;
+            }
+        }
+        else
+        {
+            EndPoint successor = storageService_.getSuccessor(target);
+            LoadInfo sLoad = loadInfo2_.get(successor);
+            LoadInfo targetLoad = loadInfo2_.get(target);
+            if ( (sLoad.count() + targetLoad.count()) > threshold )
+                return false;
+            else
+                return true;
+        }
+    }
+    */
+
+    private boolean isANeighbour(EndPoint neighbour)
+    {
+        EndPoint predecessor = storageService_.getPredecessor(StorageService.getLocalStorageEndPoint());
+        if ( predecessor.equals(neighbour) )
+            return true;
+
+        EndPoint successor = storageService_.getSuccessor(StorageService.getLocalStorageEndPoint());
+        if ( successor.equals(neighbour) )
+            return true;
+
+        return false;
+    }
+
+    /*
+     * Determine the nodes that are lightly loaded. Choose at
+     * random one of the lightly loaded nodes and use them as
+     * a potential target for load balance.
+    */
+    /*
+    private EndPoint findARandomLightNode()
+    {
+        List<EndPoint> potentialCandidates = new ArrayList<EndPoint>();
+        Set<EndPoint> allTargets = loadInfo2_.keySet();
+        int avgLoad =  averageSystemLoad();
+
+        for( EndPoint target : allTargets )
+        {
+            LoadInfo load = loadInfo2_.get(target);
+            if ( load.count() < avgLoad )
+                potentialCandidates.add(target);
+        }
+
+        if ( potentialCandidates.size() > 0 )
+        {
+            Random random = new Random();
+            int index = random.nextInt(potentialCandidates.size());
+            return potentialCandidates.get(index);
+        }
+        return null;
+    }
+    */
+}
+
+class MoveMessage implements Serializable
+{
+    private BigInteger targetToken_;
+
+    private MoveMessage()
+    {
+    }
+
+    MoveMessage(BigInteger targetToken)
+    {
+        targetToken_ = targetToken;
+    }
+
+    BigInteger getTargetToken()
+    {
+        return targetToken_;
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,560 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ReadMessage;
+import org.apache.cassandra.db.ReadResponseMessage;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.RowMutationMessage;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.TouchMessage;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IAsyncResult;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+public class StorageProxy
+{
+    private static Logger logger_ = Logger.getLogger(StorageProxy.class);
+    
+    /**
+     * This method is responsible for creating Message to be
+     * sent over the wire to N replicas where some of the replicas
+     * may be hints.
+     */
+    private static Map<EndPoint, Message> createWriteMessages(RowMutationMessage rmMessage, Map<EndPoint, EndPoint> endpointMap) throws IOException
+    {
+        Map<EndPoint, Message> messageMap = new HashMap<EndPoint, Message>();
+        Message message = RowMutationMessage.makeRowMutationMessage(rmMessage);
+        
+        Set<EndPoint> targets = endpointMap.keySet();
+        for( EndPoint target : targets )
+        {
+            EndPoint hint = endpointMap.get(target);
+            if ( !target.equals(hint) )
+            {
+                Message hintedMessage = RowMutationMessage.makeRowMutationMessage(rmMessage);
+                hintedMessage.addHeader(RowMutationMessage.hint_, EndPoint.toBytes(hint) );
+                logger_.debug("Sending the hint of " + target.getHost() + " to " + hint.getHost());
+                messageMap.put(target, hintedMessage);
+            }
+            else
+            {
+                messageMap.put(target, message);
+            }
+        }
+        return messageMap;
+    }
+    
+    /**
+     * Use this method to have this RowMutation applied
+     * across all replicas. This method will take care
+     * of the possibility of a replica being down and hint
+     * the data across to some other replica. 
+     * @param RowMutation the mutation to be applied 
+     *                    across the replicas
+    */
+    public static void insert(RowMutation rm)
+    {
+        /*
+         * Get the N nodes from storage service where the data needs to be
+         * replicated
+         * Construct a message for write
+         * Send them asynchronously to the replicas.
+        */
+        try
+        {
+            logger_.debug(" insert");
+            Map<EndPoint, EndPoint> endpointMap = StorageService.instance().getNStorageEndPointMap(rm.key());
+            // TODO: throw a thrift exception if we do not have N nodes
+            RowMutationMessage rmMsg = new RowMutationMessage(rm); 
+            /* Create the write messages to be sent */
+            Map<EndPoint, Message> messageMap = createWriteMessages(rmMsg, endpointMap);
+            Set<EndPoint> endpoints = messageMap.keySet();
+            for(EndPoint endpoint : endpoints)
+            {
+                MessagingService.getMessagingInstance().sendOneWay(messageMap.get(endpoint), endpoint);
+            }
+        }
+        catch (Exception e)
+        {
+            logger_.info( LogUtil.throwableToString(e) );
+        }
+        return;
+    }
+
+    
+    public static Row doReadProtocol(String key, ReadMessage readMessage) throws IOException,TimeoutException
+    {
+        EndPoint endPoint = null;
+        try
+        {
+            endPoint = StorageService.instance().findSuitableEndPoint(key);
+        }
+        catch( Throwable ex)
+        {
+            ex.printStackTrace();
+        }
+        if(endPoint != null)
+        {
+            Message message = ReadMessage.makeReadMessage(readMessage);
+            IAsyncResult iar = MessagingService.getMessagingInstance().sendRR(message, endPoint);
+            Object[] result = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
+            byte[] body = (byte[])result[0];
+            DataInputBuffer bufIn = new DataInputBuffer();
+            bufIn.reset(body, body.length);
+            ReadResponseMessage responseMessage = ReadResponseMessage.serializer().deserialize(bufIn);
+            return responseMessage.row();
+        }
+        else
+        {
+            logger_.warn(" Alert : Unable to find a suitable end point for the key : " + key );
+        }
+        return null;
+    }
+
+    static void touch_local (String tablename, String key, boolean fData ) throws IOException
+    {
+		Table table = Table.open( tablename );
+		table.touch(key, fData);
+    }
+
+    static void weakTouchProtocol(String tablename, String key, boolean fData) throws Exception
+    {
+    	EndPoint endPoint = null;
+    	try
+    	{
+    		endPoint = StorageService.instance().findSuitableEndPoint(key);
+    	}
+    	catch( Throwable ex)
+    	{
+    		ex.printStackTrace();
+    	}
+    	if(endPoint != null)
+    	{
+    		if(endPoint.equals(StorageService.getLocalStorageEndPoint()))
+    		{
+    	    	touch_local(tablename, key, fData);
+    	    	return;
+    	    }
+            TouchMessage touchMessage = null;
+            touchMessage = new TouchMessage(tablename, key, fData);
+            Message message = TouchMessage.makeTouchMessage(touchMessage);
+            MessagingService.getMessagingInstance().sendOneWay(message, endPoint);
+    	}
+    	return ;
+    }
+    
+    static void strongTouchProtocol(String tablename, String key, boolean fData) throws Exception
+    {
+        Map<EndPoint, EndPoint> endpointMap = StorageService.instance().getNStorageEndPointMap(key);
+        Set<EndPoint> endpoints = endpointMap.keySet();
+        TouchMessage touchMessage = null;
+        touchMessage = new TouchMessage(tablename, key, fData);
+        Message message = TouchMessage.makeTouchMessage(touchMessage);
+        for(EndPoint endpoint : endpoints)
+        {
+            MessagingService.getMessagingInstance().sendOneWay(message, endpoint);
+        }    	
+    }
+    
+    /*
+     * Only touch data on the most suitable end point.
+     */
+    public static void touchProtocol(String tablename, String key, boolean fData, StorageService.ConsistencyLevel consistencyLevel) throws Exception
+    {
+        switch ( consistencyLevel )
+        {
+	        case WEAK:
+	            weakTouchProtocol(tablename, key, fData);
+	            break;
+	            
+	        case STRONG:
+	            strongTouchProtocol(tablename, key, fData);
+	            break;
+	            
+	        default:
+	            weakTouchProtocol(tablename, key, fData);
+	            break;
+        }
+    }
+        
+    
+    
+    public static Row readProtocol(String tablename, String key, String columnFamily, List<String> columnNames, StorageService.ConsistencyLevel consistencyLevel) throws Exception
+    {
+        Row row = null;
+        boolean foundLocal = false;
+        EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(key);
+        for(EndPoint endPoint: endpoints)
+        {
+            if(endPoint.equals(StorageService.getLocalStorageEndPoint()))
+            {
+                foundLocal = true;
+                break;
+            }
+        }   
+        if(!foundLocal && consistencyLevel == StorageService.ConsistencyLevel.WEAK)
+        {
+            ReadMessage readMessage = null;
+            readMessage = new ReadMessage(tablename, key, columnFamily, columnNames);
+            return doReadProtocol(key, readMessage);
+        }
+        else
+        {
+            switch ( consistencyLevel )
+            {
+            case WEAK:
+                row = weakReadProtocol(tablename, key, columnFamily, columnNames);
+                break;
+                
+            case STRONG:
+                row = strongReadProtocol(tablename, key, columnFamily, columnNames);
+                break;
+                
+            default:
+                row = weakReadProtocol(tablename, key, columnFamily, columnNames);
+                break;
+            }
+        }
+        return row;
+        
+        
+    }
+        
+    public static Row readProtocol(String tablename, String key, String columnFamily, int start, int count, StorageService.ConsistencyLevel consistencyLevel) throws Exception
+    {
+        Row row = null;
+        boolean foundLocal = false;
+        EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(key);
+        for(EndPoint endPoint: endpoints)
+        {
+            if(endPoint.equals(StorageService.getLocalStorageEndPoint()))
+            {
+                foundLocal = true;
+                break;
+            }
+        }   
+        if(!foundLocal && consistencyLevel == StorageService.ConsistencyLevel.WEAK)
+        {
+            ReadMessage readMessage = null;
+            readMessage = new ReadMessage(tablename, key, columnFamily, start, count);
+            return doReadProtocol(key, readMessage);
+        }
+        else
+        {
+            switch ( consistencyLevel )
+            {
+            case WEAK:
+                row = weakReadProtocol(tablename, key, columnFamily, start, count);
+                break;
+                
+            case STRONG:
+                row = strongReadProtocol(tablename, key, columnFamily, start, count);
+                break;
+                
+            default:
+                row = weakReadProtocol(tablename, key, columnFamily, start, count);
+                break;
+            }
+        }
+        return row;
+    }
+    
+    public static Row readProtocol(String tablename, String key, String columnFamily, long sinceTimestamp, StorageService.ConsistencyLevel consistencyLevel) throws Exception
+    {
+        Row row = null;
+        boolean foundLocal = false;
+        EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(key);
+        for(EndPoint endPoint: endpoints)
+        {
+            if(endPoint.equals(StorageService.getLocalStorageEndPoint()))
+            {
+                foundLocal = true;
+                break;
+            }
+        }   
+        if(!foundLocal && consistencyLevel == StorageService.ConsistencyLevel.WEAK)
+        {
+            ReadMessage readMessage = null;
+            readMessage = new ReadMessage(tablename, key, columnFamily, sinceTimestamp);
+            return doReadProtocol(key, readMessage);
+        }
+        else
+        {
+            switch ( consistencyLevel )
+            {
+            case WEAK:
+                row = weakReadProtocol(tablename, key, columnFamily, sinceTimestamp);
+                break;
+                
+            case STRONG:
+                row = strongReadProtocol(tablename, key, columnFamily, sinceTimestamp);
+                break;
+                
+            default:
+                row = weakReadProtocol(tablename, key, columnFamily, sinceTimestamp);
+                break;
+            }
+        }
+        return row;
+    }
+
+    public static Row strongReadProtocol(String tablename, String key, String columnFamily, List<String> columns) throws Exception
+    {       
+        long startTime = System.currentTimeMillis();        
+        // TODO: throw a thrift exception if we do not have N nodes
+        ReadMessage readMessage = new ReadMessage(tablename, key, columnFamily, columns);               
+        
+        ReadMessage readMessageDigestOnly = new ReadMessage(tablename, key, columnFamily, columns);     
+        readMessageDigestOnly.setIsDigestQuery(true);        
+        
+        Row row = StorageProxy.doStrongReadProtocol(key, readMessage, readMessageDigestOnly);
+        logger_.debug("readProtocol: " + (System.currentTimeMillis() - startTime) + " ms.");     
+        return row;
+    }
+    
+    /*
+     * This function executes the read protocol.
+        // 1. Get the N nodes from storage service where the data needs to be
+        // replicated
+        // 2. Construct a message for read\write
+         * 3. Set one of teh messages to get teh data and teh rest to get teh digest
+        // 4. SendRR ( to all the nodes above )
+        // 5. Wait for a response from atleast X nodes where X <= N and teh data node
+         * 6. If the digest matches return teh data.
+         * 7. else carry out read repair by getting data from all the nodes.
+        // 5. return success
+     * 
+     */
+    public static Row strongReadProtocol(String tablename, String key, String columnFamily, int start, int count) throws IOException, TimeoutException
+    {       
+        long startTime = System.currentTimeMillis();        
+        // TODO: throw a thrift exception if we do not have N nodes
+        ReadMessage readMessage = null;
+        ReadMessage readMessageDigestOnly = null;
+        if( start >= 0 && count < Integer.MAX_VALUE)
+        {
+            readMessage = new ReadMessage(tablename, key, columnFamily, start, count);
+        }
+        else
+        {
+            readMessage = new ReadMessage(tablename, key, columnFamily);
+        }
+        Message message = ReadMessage.makeReadMessage(readMessage);
+        if( start >= 0 && count < Integer.MAX_VALUE)
+        {
+            readMessageDigestOnly = new ReadMessage(tablename, key, columnFamily, start, count);
+        }
+        else
+        {
+            readMessageDigestOnly = new ReadMessage(tablename, key, columnFamily);
+        }
+        readMessageDigestOnly.setIsDigestQuery(true);        
+        Row row = doStrongReadProtocol(key, readMessage, readMessageDigestOnly);
+        logger_.debug("readProtocol: " + (System.currentTimeMillis() - startTime) + " ms.");
+        return row;
+    }
+    
+    public static Row strongReadProtocol(String tablename, String key, String columnFamily, long sinceTimestamp) throws IOException, TimeoutException
+    {       
+        long startTime = System.currentTimeMillis();        
+        // TODO: throw a thrift exception if we do not have N nodes
+        ReadMessage readMessage = null;
+        ReadMessage readMessageDigestOnly = null;
+        readMessage = new ReadMessage(tablename, key, columnFamily, sinceTimestamp);
+        Message message = ReadMessage.makeReadMessage(readMessage);
+        readMessageDigestOnly = new ReadMessage(tablename, key, columnFamily, sinceTimestamp);
+        readMessageDigestOnly.setIsDigestQuery(true);        
+        Row row = doStrongReadProtocol(key, readMessage, readMessageDigestOnly);
+        logger_.debug("readProtocol: " + (System.currentTimeMillis() - startTime) + " ms.");
+        return row;
+    }
+
+    /*
+     * This method performs the actual read from the replicas.
+     *  param @ key - key for which the data is required.
+     *  param @ readMessage - the read message to get the actual data
+     *  param @ readMessageDigest - the read message to get the digest.
+    */
+    private static Row doStrongReadProtocol(String key, ReadMessage readMessage, ReadMessage readMessageDigest) throws IOException, TimeoutException
+    {
+        Row row = null;
+        Message message = ReadMessage.makeReadMessage(readMessage);
+        Message messageDigestOnly = ReadMessage.makeReadMessage(readMessageDigest);
+        
+        IResponseResolver<Row> readResponseResolver = new ReadResponseResolver();
+        QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(
+                DatabaseDescriptor.getReplicationFactor(),
+                readResponseResolver);
+        EndPoint dataPoint = StorageService.instance().findSuitableEndPoint(key);
+        List<EndPoint> endpointList = new ArrayList<EndPoint>( Arrays.asList( StorageService.instance().getNStorageEndPoint(key) ) );
+        /* Remove the local storage endpoint from the list. */ 
+        endpointList.remove( dataPoint );
+        EndPoint[] endPoints = new EndPoint[endpointList.size() + 1];
+        Message messages[] = new Message[endpointList.size() + 1];
+        
+        // first message is the data Point 
+        endPoints[0] = dataPoint;
+        messages[0] = message;
+        
+        for(int i=1; i < endPoints.length ; i++)
+        {
+            endPoints[i] = endpointList.get(i-1);
+            messages[i] = messageDigestOnly;
+        }
+        
+        try
+        {
+            MessagingService.getMessagingInstance().sendRR(messages, endPoints, quorumResponseHandler);
+            
+            long startTime2 = System.currentTimeMillis();
+            row = quorumResponseHandler.get();
+            logger_.debug("quorumResponseHandler: " + (System.currentTimeMillis() - startTime2)
+                    + " ms.");
+            if (row == null)
+            {
+                logger_.info("ERROR No row for this key .....: " + key);
+                // TODO: throw a thrift exception 
+                return row;
+            }
+        }
+        catch (DigestMismatchException ex)
+        {
+            if ( DatabaseDescriptor.getConsistencyCheck())
+            {
+	            IResponseResolver<Row> readResponseResolverRepair = new ReadResponseResolver();
+	            QuorumResponseHandler<Row> quorumResponseHandlerRepair = new QuorumResponseHandler<Row>(
+	                    DatabaseDescriptor.getReplicationFactor(),
+	                    readResponseResolverRepair);
+	            readMessage.setIsDigestQuery(false);
+	            logger_.info("DigestMismatchException: " + key);            
+	            Message messageRepair = ReadMessage.makeReadMessage(readMessage);
+	            MessagingService.getMessagingInstance().sendRR(messageRepair, endPoints,
+	                    quorumResponseHandlerRepair);
+	            try
+	            {
+	                row = quorumResponseHandlerRepair.get();
+	            }
+	            catch(DigestMismatchException dex)
+	            {
+	                logger_.warn(LogUtil.throwableToString(dex));
+	            }
+	            if (row == null)
+	            {
+	                logger_.info("ERROR No row for this key .....: " + key);                
+	            }
+            }
+        }        
+        return row;
+    }
+    
+    public static Row weakReadProtocol(String tablename, String key, String columnFamily, List<String> columns) throws Exception
+    {       
+        long startTime = System.currentTimeMillis();
+        List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(key);
+        /* Remove the local storage endpoint from the list. */ 
+        endpoints.remove( StorageService.getLocalStorageEndPoint() );
+        // TODO: throw a thrift exception if we do not have N nodes
+        
+        Table table = Table.open( DatabaseDescriptor.getTables().get(0) );
+        Row row = table.getRow(key, columnFamily, columns);
+        
+        logger_.debug("Local Read Protocol: " + (System.currentTimeMillis() - startTime) + " ms.");
+        /*
+         * Do the consistency checks in the background and return the
+         * non NULL row.
+         */
+        if ( endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
+            StorageService.instance().doConsistencyCheck(row, endpoints, columnFamily, columns);
+        return row;
+    }
+    
+    /*
+     * This function executes the read protocol locally and should be used only if consistency is not a concern. 
+     * Read the data from the local disk and return if the row is NOT NULL. If the data is NULL do the read from
+     * one of the other replicas (in the same data center if possible) till we get the data. In the event we get
+     * the data we perform consistency checks and figure out if any repairs need to be done to the replicas. 
+     */
+    public static Row weakReadProtocol(String tablename, String key, String columnFamily, int start, int count) throws Exception
+    {
+        Row row = null;
+        long startTime = System.currentTimeMillis();
+        List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(key);
+        /* Remove the local storage endpoint from the list. */ 
+        endpoints.remove( StorageService.getLocalStorageEndPoint() );
+        // TODO: throw a thrift exception if we do not have N nodes
+        
+        Table table = Table.open( DatabaseDescriptor.getTables().get(0) );
+        if( start >= 0 && count < Integer.MAX_VALUE)
+        {
+            row = table.getRow(key, columnFamily, start, count);
+        }
+        else
+        {
+            row = table.getRow(key, columnFamily);
+        }
+        
+        logger_.debug("Local Read Protocol: " + (System.currentTimeMillis() - startTime) + " ms.");
+        /*
+         * Do the consistency checks in the background and return the
+         * non NULL row.
+         */
+        if ( endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
+        	StorageService.instance().doConsistencyCheck(row, endpoints, columnFamily, start, count);
+        return row;         
+    }
+    
+    public static Row weakReadProtocol(String tablename, String key, String columnFamily, long sinceTimestamp) throws Exception
+    {
+        Row row = null;
+        long startTime = System.currentTimeMillis();
+        List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(key);
+        /* Remove the local storage endpoint from the list. */ 
+        endpoints.remove( StorageService.getLocalStorageEndPoint() );
+        // TODO: throw a thrift exception if we do not have N nodes
+        
+        Table table = Table.open( DatabaseDescriptor.getTables().get(0) );
+        row = table.getRow(key, columnFamily,sinceTimestamp);
+        logger_.debug("Local Read Protocol: " + (System.currentTimeMillis() - startTime) + " ms.");
+        /*
+         * Do the consistency checks in the background and return the
+         * non NULL row.
+         */
+        if ( endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
+        	StorageService.instance().doConsistencyCheck(row, endpoints, columnFamily, sinceTimestamp);
+        return row;         
+    }
+
+}