You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 08:57:31 UTC
svn commit: r749218 [28/34] - in /incubator/cassandra: branches/ dist/
nightly/ site/ tags/ trunk/ trunk/lib/ trunk/src/ trunk/src/org/
trunk/src/org/apache/ trunk/src/org/apache/cassandra/
trunk/src/org/apache/cassandra/analytics/ trunk/src/org/apache...
Added: incubator/cassandra/trunk/src/org/apache/cassandra/service/IPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/IPartitioner.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/IPartitioner.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/IPartitioner.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.math.BigInteger;
+
+public interface IPartitioner
+{
+ public BigInteger hash(String key);
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/service/IResponseResolver.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/IResponseResolver.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/IResponseResolver.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/IResponseResolver.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.util.List;
+
+import org.apache.cassandra.net.Message;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IResponseResolver<T> {
+
+ /*
+ * This Method resolves the responses that are passed in . for example : if
+ * its write response then all we get is true or false return values which
+ * implies if the writes were successful but for reads its more complicated
+ * you need to look at the responses and then based on differences schedule
+ * repairs . Hence you need to derive a response resolver based on your
+ * needs from this interface.
+ */
+ public T resolve(List<Message> responses) throws DigestMismatchException;
+ public boolean isDataPresent(List<Message> responses);
+
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/service/LeaderElector.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/LeaderElector.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/LeaderElector.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/LeaderElector.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndPointState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.IEndPointStateChangeSubscriber;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.*;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+class LeaderElector implements IEndPointStateChangeSubscriber
+{
+ private static Logger logger_ = Logger.getLogger(LeaderElector.class);
+ protected static final String leaderState_ = "LEADER";
+ private static LeaderElector instance_ = null;
+ private static Lock createLock_ = new ReentrantLock();
+
+ /*
+ * Factory method that gets an instance of the StorageService
+ * class.
+ */
+ public static LeaderElector instance()
+ {
+ if ( instance_ == null )
+ {
+ LeaderElector.createLock_.lock();
+ try
+ {
+ if ( instance_ == null )
+ {
+ instance_ = new LeaderElector();
+ }
+ }
+ finally
+ {
+ createLock_.unlock();
+ }
+ }
+ return instance_;
+ }
+
+ /* The elected leader. */
+ private AtomicReference<EndPoint> leader_;
+ private Condition condition_;
+ private ExecutorService leaderElectionService_ = new DebuggableThreadPoolExecutor(1,
+ 1,
+ Integer.MAX_VALUE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new ThreadFactoryImpl("LEADER-ELECTOR")
+ );
+
+ private class LeaderDeathMonitor implements Runnable
+ {
+ private String pathCreated_;
+
+ LeaderDeathMonitor(String pathCreated)
+ {
+ pathCreated_ = pathCreated;
+ }
+
+ public void run()
+ {
+ ZooKeeper zk = StorageService.instance().getZooKeeperHandle();
+ String path = "/Cassandra/" + DatabaseDescriptor.getClusterName() + "/Leader";
+ try
+ {
+ String createPath = path + "/L-";
+ LeaderElector.createLock_.lock();
+ while( true )
+ {
+ /* Get all znodes under the Leader znode */
+ List<String> values = zk.getChildren(path, false);
+ SortedMap<Integer, String> suffixToZnode = getSuffixToZnodeMapping(values);
+ String value = suffixToZnode.get( suffixToZnode.firstKey() );
+ /*
+ * Get the first znode and if it is the
+ * pathCreated created above then the data
+ * in that znode is the leader's identity.
+ */
+ if ( leader_ == null )
+ {
+ leader_ = new AtomicReference<EndPoint>( EndPoint.fromBytes( zk.getData(path + "/" + value, false, null) ) );
+ }
+ else
+ {
+ leader_.set( EndPoint.fromBytes( zk.getData(path + "/" + value, false, null) ) );
+ /* Disseminate the state as to who the leader is. */
+ onLeaderElection();
+ }
+ logger_.debug("Elected leader is " + leader_ + " @ znode " + ( path + "/" + value ) );
+ /* We need only the last portion of this znode */
+ int index = getLocalSuffix();
+ if ( index > suffixToZnode.firstKey() )
+ {
+ String pathToCheck = path + "/" + getImmediatelyPrecedingZnode(suffixToZnode, index);
+ Stat stat = zk.exists(pathToCheck, true);
+ if ( stat != null )
+ {
+ logger_.debug("Awaiting my turn ...");
+ condition_.await();
+ logger_.debug("Checking to see if leader is around ...");
+ }
+ }
+ else
+ {
+ break;
+ }
+ }
+ }
+ catch ( InterruptedException ex )
+ {
+ logger_.warn(LogUtil.throwableToString(ex));
+ }
+ catch ( IOException ex )
+ {
+ logger_.warn(LogUtil.throwableToString(ex));
+ }
+ catch ( KeeperException ex )
+ {
+ logger_.warn(LogUtil.throwableToString(ex));
+ }
+ finally
+ {
+ LeaderElector.createLock_.unlock();
+ }
+ }
+
+ private SortedMap<Integer, String> getSuffixToZnodeMapping(List<String> values)
+ {
+ SortedMap<Integer, String> suffixZNodeMap = new TreeMap<Integer, String>();
+ for ( String znode : values )
+ {
+ String[] peices = znode.split("-");
+ suffixZNodeMap.put(Integer.parseInt( peices[1] ), znode);
+ }
+ return suffixZNodeMap;
+ }
+
+ private String getImmediatelyPrecedingZnode(SortedMap<Integer, String> suffixToZnode, int index)
+ {
+ List<Integer> suffixes = new ArrayList<Integer>( suffixToZnode.keySet() );
+ Collections.sort(suffixes);
+ int position = Collections.binarySearch(suffixes, index);
+ return suffixToZnode.get( suffixes.get( position - 1 ) );
+ }
+
+ /**
+ * If the local node's leader related znode is L-3
+ * this method will return 3.
+ * @return suffix portion of L-3.
+ */
+ private int getLocalSuffix()
+ {
+ String[] peices = pathCreated_.split("/");
+ String leaderPeice = peices[peices.length - 1];
+ String[] leaderPeices = leaderPeice.split("-");
+ return Integer.parseInt( leaderPeices[1] );
+ }
+ }
+
+ private LeaderElector()
+ {
+ condition_ = LeaderElector.createLock_.newCondition();
+ }
+
+ /**
+ * Use to inform interested parties about the change in the state
+ * for specified endpoint
+ *
+ * @param endpoint endpoint for which the state change occured.
+ * @param epState state that actually changed for the above endpoint.
+ */
+ public void onChange(EndPoint endpoint, EndPointState epState)
+ {
+ /* node identifier for this endpoint on the identifier space */
+ ApplicationState leaderState = epState.getApplicationState(LeaderElector.leaderState_);
+ if (leaderState != null && !leader_.equals(endpoint))
+ {
+ EndPoint leader = EndPoint.fromString( leaderState.getState() );
+ logger_.debug("New leader in the cluster is " + leader);
+ leader_.set(endpoint);
+ }
+ }
+
+ void start() throws Throwable
+ {
+ /* Register with the Gossiper for EndPointState notifications */
+ Gossiper.instance().register(this);
+ logger_.debug("Starting the leader election process ...");
+ ZooKeeper zk = StorageService.instance().getZooKeeperHandle();
+ String path = "/Cassandra/" + DatabaseDescriptor.getClusterName() + "/Leader";
+ String createPath = path + "/L-";
+
+ /* Create the znodes under the Leader znode */
+ logger_.debug("Attempting to create znode " + createPath);
+ String pathCreated = zk.create(createPath, EndPoint.toBytes( StorageService.getLocalControlEndPoint() ), Ids.OPEN_ACL_UNSAFE, (CreateMode.EPHEMERAL_SEQUENTIAL) );
+ logger_.debug("Created znode under leader znode " + pathCreated);
+ leaderElectionService_.submit(new LeaderDeathMonitor(pathCreated));
+ }
+
+ void signal()
+ {
+ logger_.debug("Signalling others to check on leader ...");
+ try
+ {
+ LeaderElector.createLock_.lock();
+ condition_.signal();
+ }
+ finally
+ {
+ LeaderElector.createLock_.unlock();
+ }
+ }
+
+ EndPoint getLeader()
+ {
+ return (leader_ != null ) ? leader_.get() : StorageService.getLocalStorageEndPoint();
+ }
+
+ private void onLeaderElection() throws InterruptedException, IOException
+ {
+ /*
+ * If the local node is the leader then not only does he
+ * diseminate the information but also starts the M/R job
+ * tracker. Non leader nodes start the M/R task tracker
+ * thereby initializing the M/R subsystem.
+ */
+ if ( StorageService.instance().isLeader(leader_.get()) )
+ {
+ Gossiper.instance().addApplicationState(LeaderElector.leaderState_, new ApplicationState(leader_.toString()));
+ }
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/service/LoadDisseminator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/LoadDisseminator.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/LoadDisseminator.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/LoadDisseminator.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.util.TimerTask;
+
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.utils.FileUtils;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+class LoadDisseminator extends TimerTask
+{
+ private final static Logger logger_ = Logger.getLogger(LoadDisseminator.class);
+ protected final static String loadInfo_= "LOAD-INFORMATION";
+
+ public void run()
+ {
+ try
+ {
+ long diskSpace = FileUtils.getUsedDiskSpace();
+ String diskUtilization = FileUtils.stringifyFileSize(diskSpace);
+ logger_.debug("Disseminating load info ...");
+ Gossiper.instance().addApplicationState(LoadDisseminator.loadInfo_, new ApplicationState(diskUtilization));
+ }
+ catch ( Throwable ex )
+ {
+ logger_.warn( LogUtil.throwableToString(ex) );
+ }
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/service/LoadInfo.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/LoadInfo.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/LoadInfo.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/LoadInfo.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,64 @@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.util.Comparator;
+
+import org.apache.cassandra.utils.FileUtils;
+
+
+class LoadInfo
+{
+ protected static class DiskSpaceComparator implements Comparator<LoadInfo>
+ {
+ public int compare(LoadInfo li, LoadInfo li2)
+ {
+ if ( li == null || li2 == null )
+ throw new IllegalArgumentException("Cannot pass in values that are NULL.");
+
+ double space = FileUtils.stringToFileSize(li.diskSpace_);
+ double space2 = FileUtils.stringToFileSize(li2.diskSpace_);
+ return (int)(space - space2);
+ }
+ }
+
+ private String diskSpace_;
+
+ LoadInfo(long diskSpace)
+ {
+ diskSpace_ = FileUtils.stringifyFileSize(diskSpace);
+ }
+
+ LoadInfo(String loadInfo)
+ {
+ diskSpace_ = loadInfo;
+ }
+
+ String diskSpace()
+ {
+ return diskSpace_;
+ }
+
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder("");
+ sb.append(diskSpace_);
+ return sb.toString();
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/service/LocationInfoVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/LocationInfoVerbHandler.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/LocationInfoVerbHandler.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/LocationInfoVerbHandler.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.net.CompactEndPointSerializationHelper;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class LocationInfoVerbHandler implements IVerbHandler
+{
+ private static Logger logger_ = Logger.getLogger( LocationInfoVerbHandler.class );
+
+ public void doVerb(Message message)
+ {
+ EndPoint from = message.getFrom();
+ logger_.info("Received a location download request from " + from);
+
+ Object[] body = message.getMessageBody();
+ byte[] bytes = (byte[])body[0];
+ DataInputBuffer bufIn = new DataInputBuffer();
+ bufIn.reset(bytes, bytes.length);
+ try
+ {
+ Range range = Range.serializer().deserialize(bufIn);
+ /* Get the replicas for the given range */
+ EndPoint[] replicas = StorageService.instance().getNStorageEndPoint(range.right());
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+
+ for ( EndPoint replica : replicas )
+ {
+ replica.setPort(DatabaseDescriptor.getControlPort());
+ if ( FailureDetector.instance().isAlive(replica) )
+ {
+ replica.setPort(DatabaseDescriptor.getStoragePort());
+ CompactEndPointSerializationHelper.serialize(replica, dos);
+ break;
+ }
+ }
+
+ Message response = message.getReply(StorageService.getLocalStorageEndPoint(), new Object[]{bos.toByteArray()});
+ logger_.info("Sending the token download response to " + from);
+ MessagingService.getMessagingInstance().sendOneWay(response, from);
+ }
+ catch (IOException ex)
+ {
+ logger_.warn(LogUtil.throwableToString(ex));
+ }
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/service/OrderPreservingHashPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/OrderPreservingHashPartitioner.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/OrderPreservingHashPartitioner.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/OrderPreservingHashPartitioner.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.math.BigInteger;
+
+public class OrderPreservingHashPartitioner implements IPartitioner
+{
+ private final static int maxKeyHashLength_ = 24;
+ private static final BigInteger prime_ = BigInteger.valueOf(31);
+
+ public BigInteger hash(String key)
+ {
+ BigInteger h = BigInteger.ZERO;
+ char val[] = key.toCharArray();
+
+ for (int i = 0; i < OrderPreservingHashPartitioner.maxKeyHashLength_; i++)
+ {
+ if( i < val.length )
+ h = OrderPreservingHashPartitioner.prime_.multiply(h).add( BigInteger.valueOf(val[i]) );
+ else
+ h = OrderPreservingHashPartitioner.prime_.multiply(h).add( OrderPreservingHashPartitioner.prime_ );
+ }
+ return h;
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/service/PartitionerType.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/PartitionerType.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/PartitionerType.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/PartitionerType.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+public enum PartitionerType
+{
+ RANDOM,
+ OPHF
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/service/QuorumResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/QuorumResponseHandler.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/QuorumResponseHandler.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/QuorumResponseHandler.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.WriteResponseMessage;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.utils.*;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class QuorumResponseHandler<T> implements IAsyncCallback
+{
+ private static Logger logger_ = Logger.getLogger( QuorumResponseHandler.class );
+ private Lock lock_ = new ReentrantLock();
+ private Condition condition_;
+ private int responseCount_;
+ private List<Message> responses_ = new ArrayList<Message>();
+ private IResponseResolver<T> responseResolver_;
+ private AtomicBoolean done_ = new AtomicBoolean(false);
+
+ public QuorumResponseHandler(int responseCount, IResponseResolver<T> responseResolver)
+ {
+ condition_ = lock_.newCondition();
+ responseCount_ = responseCount;
+ responseResolver_ = responseResolver;
+ }
+
+ public void setResponseCount(int responseCount)
+ {
+ responseCount_ = responseCount;
+ }
+
+ public T get() throws TimeoutException, DigestMismatchException
+ {
+ long startTime = System.currentTimeMillis();
+ lock_.lock();
+ try
+ {
+ boolean bVal = true;
+ try
+ {
+ if ( !done_.get() )
+ {
+ bVal = condition_.await(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
+ }
+ }
+ catch ( InterruptedException ex )
+ {
+ logger_.debug( LogUtil.throwableToString(ex) );
+ }
+
+ if ( !bVal && !done_.get() )
+ {
+ StringBuilder sb = new StringBuilder("");
+ for ( Message message : responses_ )
+ {
+ sb.append(message.getFrom());
+ }
+ throw new TimeoutException("Operation timed out - received only " + responses_.size() + " responses from " + sb.toString() + " .");
+ }
+ }
+ finally
+ {
+ lock_.unlock();
+ for(Message response : responses_)
+ {
+ MessagingService.removeRegisteredCallback( response.getMessageId() );
+ }
+ }
+ logger_.info("QuorumResponseHandler: " + (System.currentTimeMillis() - startTime)
+ + " ms.");
+
+ return responseResolver_.resolve( responses_);
+ }
+
+ public void response(Message message)
+ {
+ lock_.lock();
+ try
+ {
+ int majority = (responseCount_ >> 1) + 1;
+ if ( !done_.get() )
+ {
+ responses_.add( message );
+ if ( responses_.size() >= majority && responseResolver_.isDataPresent(responses_))
+ {
+ done_.set(true);
+ condition_.signal();
+ }
+ }
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/service/RandomPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/RandomPartitioner.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/RandomPartitioner.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/RandomPartitioner.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.math.BigInteger;
+
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * This class generates a MD5 hash of the key. It uses the standard technique
+ * used in all DHT's.
+ *
+ * @author alakshman
+ *
+ */
+public class RandomPartitioner implements IPartitioner
+{
+ public BigInteger hash(String key)
+ {
+ return FBUtilities.hash(key);
+ }
+}
\ No newline at end of file
Added: incubator/cassandra/trunk/src/org/apache/cassandra/service/ReadRepairManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/ReadRepairManager.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/ReadRepairManager.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/ReadRepairManager.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.concurrent.locks.*;
+
+import org.apache.cassandra.db.Column;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.RowMutationMessage;
+import org.apache.cassandra.db.SuperColumn;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Header;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.Cachetable;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.ICacheExpungeHook;
+import org.apache.cassandra.utils.ICachetable;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+
+/*
+ * This class manages the read repairs . This is a singleton class
+ * it basically uses the cache table construct to schedule writes that have to be
+ * made for read repairs.
+ * A cachetable is created which wakes up every n milliseconds specified by
+ * expirationTimeInMillis and calls a global hook fn on pending entries
+ * This fn basically sends the message to the appropriate servers to update them
+ * with the latest changes.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+class ReadRepairManager
+{
+ private static Logger logger_ = Logger.getLogger(ReadRepairManager.class);
+ private static final long expirationTimeInMillis = 2000;
+ private static Lock lock_ = new ReentrantLock();
+ private static ReadRepairManager self_ = null;
+
+ /*
+ * This is the internal class which actually
+ * implements the global hook fn called by the readrepair manager
+ */
+ static class ReadRepairPerformer implements
+ ICacheExpungeHook<String, Message>
+ {
+ private static Logger logger_ = Logger.getLogger(ReadRepairPerformer.class);
+ /*
+ * The hook fn which takes the end point and the row mutation that
+ * needs to be sent to the end point in order
+ * to perform read repair.
+ */
+ public void callMe(String target,
+ Message message)
+ {
+ String[] pieces = FBUtilities.strip(target, ":");
+ EndPoint to = new EndPoint(pieces[0], Integer.parseInt(pieces[1]));
+ MessagingService.getMessagingInstance().sendOneWay(message, to);
+ }
+
+ }
+
+ private ICachetable<String, Message> readRepairTable_ = new Cachetable<String, Message>(expirationTimeInMillis, new ReadRepairManager.ReadRepairPerformer());
+
+ protected ReadRepairManager()
+ {
+
+ }
+
+ public static ReadRepairManager instance()
+ {
+ if (self_ == null)
+ {
+ lock_.lock();
+ try
+ {
+ if ( self_ == null )
+ self_ = new ReadRepairManager();
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+ return self_;
+ }
+
+ /*
+ * This is the fn that should be used to scheule a read repair
+ * specify a endpoint on whcih the read repair should happen and the row mutaion
+ * message that has the repaired row.
+ */
+ public void schedule(EndPoint target, RowMutationMessage rowMutationMessage)
+ {
+ /*
+ Message message = new Message(StorageService.getLocalStorageEndPoint(),
+ StorageService.mutationStage_,
+ StorageService.readRepairVerbHandler_, new Object[]
+ { rowMutationMessage });
+ */
+ try
+ {
+ Message message = RowMutationMessage.makeRowMutationMessage(rowMutationMessage, StorageService.readRepairVerbHandler_);
+ String key = target + ":" + message.getMessageId();
+ readRepairTable_.put(key, message);
+ }
+ catch ( IOException ex )
+ {
+ logger_.info(LogUtil.throwableToString(ex));
+ }
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/ReadResponseResolver.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/ReadResponseResolver.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/ReadResponseResolver.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ReadResponseMessage;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.RowMutationMessage;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+/*
+ * This class is used by all read functions and is called by the Qorum
+ * when atleast a few of the servers ( few is specified in Quorum)
+ * have sent the response . The resolve fn then schedules read repair
+ * and resolution of read data from the various servers.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public class ReadResponseResolver implements IResponseResolver<Row>
+{
+
+ private static Logger logger_ = Logger.getLogger(WriteResponseResolver.class);
+
+ /*
+ * This method for resolving read data should look at the timestamps of each
+ * of the columns that are read and should pick up columns with the latest
+ * timestamp. For those columns where the timestamp is not the latest a
+ * repair request should be scheduled.
+ *
+ */
+ public Row resolve(List<Message> responses) throws DigestMismatchException
+ {
+ long startTime = System.currentTimeMillis();
+ Row retRow = null;
+ List<Row> rowList = new ArrayList<Row>();
+ List<EndPoint> endPoints = new ArrayList<EndPoint>();
+ String key = null;
+ String table = null;
+ byte[] digest = new byte[0];
+ boolean isDigestQuery = false;
+
+ /*
+ * Populate the list of rows from each of the messages
+ * Check to see if there is a digest query. If a digest
+ * query exists then we need to compare the digest with
+ * the digest of the data that is received.
+ */
+ DataInputBuffer bufIn = new DataInputBuffer();
+ for (Message response : responses)
+ {
+ byte[] body = (byte[])response.getMessageBody()[0];
+ bufIn.reset(body, body.length);
+ try
+ {
+ long start = System.currentTimeMillis();
+ ReadResponseMessage result = ReadResponseMessage.serializer().deserialize(bufIn);
+ logger_.debug( "Response deserialization time : " + (System.currentTimeMillis() - start) + " ms.");
+ if(!result.isDigestQuery())
+ {
+ rowList.add(result.row());
+ endPoints.add(response.getFrom());
+ key = result.row().key();
+ table = result.table();
+ }
+ else
+ {
+ digest = result.digest();
+ isDigestQuery = true;
+ }
+ }
+ catch( IOException ex )
+ {
+ logger_.info(LogUtil.throwableToString(ex));
+ }
+ }
+ // If there was a digest query compare it withh all teh data digests
+ // If there is a mismatch then thwrow an exception so that read repair can happen.
+ if(isDigestQuery)
+ {
+ for(Row row: rowList)
+ {
+ if( !Arrays.equals(row.digest(), digest) )
+ {
+ throw new DigestMismatchException("The Digest does not match");
+ }
+ }
+ }
+
+ /* If the rowList is empty then we had some exception above. */
+ if ( rowList.size() == 0 )
+ {
+ return retRow;
+ }
+
+ /* Now calculate the resolved row */
+ retRow = new Row(key);
+ for (int i = 0 ; i < rowList.size(); i++)
+ {
+ retRow.repair(rowList.get(i));
+ }
+ // At this point we have the return row .
+ // Now we need to calculate the differnce
+ // so that we can schedule read repairs
+
+ for (int i = 0 ; i < rowList.size(); i++)
+ {
+ // calculate the difference , since retRow is the resolved
+ // row it can be used as the super set , remember no deletes
+ // will happen with diff its only for additions so far
+ // TODO : handle deletes
+ Row diffRow = rowList.get(i).diff(retRow);
+ if(diffRow == null) // no repair needs to happen
+ continue;
+ // create the row mutation message based on the diff and schedule a read repair
+ RowMutation rowMutation = new RowMutation(table, key);
+ Map<String, ColumnFamily> columnFamilies = diffRow.getColumnFamilies();
+ Set<String> cfNames = columnFamilies.keySet();
+
+ for ( String cfName : cfNames )
+ {
+ ColumnFamily cf = columnFamilies.get(cfName);
+ rowMutation.add(cfName, cf);
+ }
+ RowMutationMessage rowMutationMessage = new RowMutationMessage(rowMutation);
+ // schedule the read repair
+ ReadRepairManager.instance().schedule(endPoints.get(i),rowMutationMessage);
+ }
+ logger_.info("resolve: " + (System.currentTimeMillis() - startTime)
+ + " ms.");
+ return retRow;
+ }
+
+ public boolean isDataPresent(List<Message> responses)
+ {
+ boolean isDataPresent = false;
+ for (Message response : responses)
+ {
+ byte[] body = (byte[])response.getMessageBody()[0];
+ DataInputBuffer bufIn = new DataInputBuffer();
+ bufIn.reset(body, body.length);
+ try
+ {
+ ReadResponseMessage result = ReadResponseMessage.serializer().deserialize(bufIn);
+ if(!result.isDigestQuery())
+ {
+ isDataPresent = true;
+ }
+ bufIn.close();
+ }
+ catch(IOException ex)
+ {
+ logger_.info(LogUtil.throwableToString(ex));
+ }
+ }
+ return isDataPresent;
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageLoadBalancer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageLoadBalancer.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageLoadBalancer.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageLoadBalancer.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,414 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.Serializable;
+import java.math.BigInteger;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.SingleThreadedStage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.dht.LeaveJoinProtocolImpl;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndPointState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.IEndPointStateChangeSubscriber;
+import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.net.*;
+import org.apache.cassandra.utils.*;
+
+/*
+ * The load balancing algorithm here is an implementation of
+ * the algorithm as described in the paper "Scalable range query
+ * processing for large-scale distributed database applications".
+ * This class keeps track of load information across the system.
+ * It registers itself with the Gossiper for ApplicationState namely
+ * load information i.e number of requests processed w.r.t distinct
+ * keys at an Endpoint. Monitor load infomation for a 5 minute
+ * interval and then do load balancing operations if necessary.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+final class StorageLoadBalancer implements IEndPointStateChangeSubscriber, IComponentShutdown
+{
+ class LoadBalancer implements Runnable
+ {
+ LoadBalancer()
+ {
+ /* Copy the entries in loadInfo_ into loadInfo2_ and use it for all calculations */
+ loadInfo2_.putAll(loadInfo_);
+ }
+
+ /**
+ * Obtain a node which is a potential target. Start with
+ * the neighbours i.e either successor or predecessor.
+ * Send the target a MoveMessage. If the node cannot be
+ * relocated on the ring then we pick another candidate for
+ * relocation.
+ */
+ public void run()
+ {
+ /*
+ int threshold = (int)(StorageLoadBalancer.ratio_ * averageSystemLoad());
+ int myLoad = localLoad();
+ EndPoint predecessor = storageService_.getPredecessor(StorageService.getLocalStorageEndPoint());
+ logger_.debug("Trying to relocate the predecessor " + predecessor);
+ boolean value = tryThisNode(myLoad, threshold, predecessor);
+ if ( !value )
+ {
+ loadInfo2_.remove(predecessor);
+ EndPoint successor = storageService_.getSuccessor(StorageService.getLocalStorageEndPoint());
+ logger_.debug("Trying to relocate the successor " + successor);
+ value = tryThisNode(myLoad, threshold, successor);
+ if ( !value )
+ {
+ loadInfo2_.remove(successor);
+ while ( !loadInfo2_.isEmpty() )
+ {
+ EndPoint target = findARandomLightNode();
+ if ( target != null )
+ {
+ logger_.debug("Trying to relocate the random node " + target);
+ value = tryThisNode(myLoad, threshold, target);
+ if ( !value )
+ {
+ loadInfo2_.remove(target);
+ }
+ else
+ {
+ break;
+ }
+ }
+ else
+ {
+ // No light nodes available - this is NOT good.
+ logger_.warn("Not even a single lightly loaded node is available ...");
+ break;
+ }
+ }
+
+ loadInfo2_.clear();
+ // If we are here and no node was available to
+ // perform load balance with we need to report and bail.
+ if ( !value )
+ {
+ logger_.warn("Load Balancing operations weren't performed for this node");
+ }
+ }
+ }
+ */
+ }
+
+ /*
+ private boolean tryThisNode(int myLoad, int threshold, EndPoint target)
+ {
+ boolean value = false;
+ LoadInfo li = loadInfo2_.get(target);
+ int pLoad = li.count();
+ if ( ((myLoad + pLoad) >> 1) <= threshold )
+ {
+ //calculate the number of keys to be transferred
+ int keyCount = ( (myLoad - pLoad) >> 1 );
+ logger_.debug("Number of keys we attempt to transfer to " + target + " " + keyCount);
+ // Determine the token that the target should join at.
+ BigInteger targetToken = BootstrapAndLbHelper.getTokenBasedOnPrimaryCount(keyCount);
+ // Send a MoveMessage and see if this node is relocateable
+ MoveMessage moveMessage = new MoveMessage(targetToken);
+ Message message = new Message(StorageService.getLocalStorageEndPoint(), StorageLoadBalancer.lbStage_, StorageLoadBalancer.moveMessageVerbHandler_, new Object[]{moveMessage});
+ logger_.debug("Sending a move message to " + target);
+ IAsyncResult result = MessagingService.getMessagingInstance().sendRR(message, target);
+ value = (Boolean)result.get()[0];
+ logger_.debug("Response for query to relocate " + target + " is " + value);
+ }
+ return value;
+ }
+ */
+ }
+
+ class MoveMessageVerbHandler implements IVerbHandler
+ {
+ public void doVerb(Message message)
+ {
+ Message reply = message.getReply(StorageService.getLocalStorageEndPoint(), new Object[]{isMoveable_.get()});
+ MessagingService.getMessagingInstance().sendOneWay(reply, message.getFrom());
+ if ( isMoveable_.get() )
+ {
+ MoveMessage moveMessage = (MoveMessage)message.getMessageBody()[0];
+ BigInteger targetToken = moveMessage.getTargetToken();
+ /* Start the leave operation and join the ring at the position specified */
+ isMoveable_.set(false);
+ }
+ }
+ }
+
+ private static final Logger logger_ = Logger.getLogger(StorageLoadBalancer.class);
+ private static final String lbStage_ = "LOAD-BALANCER-STAGE";
+ private static final String moveMessageVerbHandler_ = "MOVE-MESSAGE-VERB-HANDLER";
+ /* time to delay in minutes the actual load balance procedure if heavily loaded */
+ private static final int delay_ = 5;
+ /* Ratio of highest loaded node and the average load. */
+ private static final double ratio_ = 1.5;
+
+ private StorageService storageService_;
+ /* this indicates whether this node is already helping someone else */
+ private AtomicBoolean isMoveable_ = new AtomicBoolean(false);
+ private Map<EndPoint, LoadInfo> loadInfo_ = new HashMap<EndPoint, LoadInfo>();
+ /* This map is a clone of the one above and is used for various calculations during LB operation */
+ private Map<EndPoint, LoadInfo> loadInfo2_ = new HashMap<EndPoint, LoadInfo>();
+ /* This thread pool is used for initiating load balancing operations */
+ private ScheduledThreadPoolExecutor lb_ = new DebuggableScheduledThreadPoolExecutor(
+ 1,
+ new ThreadFactoryImpl("LB-OPERATIONS")
+ );
+ /* This thread pool is used by target node to leave the ring. */
+ private ExecutorService lbOperations_ = new DebuggableThreadPoolExecutor(1,
+ 1,
+ Integer.MAX_VALUE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new ThreadFactoryImpl("LB-TARGET")
+ );
+
+ StorageLoadBalancer(StorageService storageService)
+ {
+ storageService_ = storageService;
+ /* register the load balancer stage */
+ StageManager.registerStage(StorageLoadBalancer.lbStage_, new SingleThreadedStage(StorageLoadBalancer.lbStage_));
+ /* register the load balancer verb handler */
+ MessagingService.getMessagingInstance().registerVerbHandlers(StorageLoadBalancer.moveMessageVerbHandler_, new MoveMessageVerbHandler());
+ /* register with the StorageService */
+ storageService_.registerComponentForShutdown(this);
+ }
+
+ public void start()
+ {
+ /* Register with the Gossiper for EndPointState notifications */
+ Gossiper.instance().register(this);
+ }
+
+ public void shutdown()
+ {
+ lbOperations_.shutdownNow();
+ lb_.shutdownNow();
+ }
+
+ public void onChange(EndPoint endpoint, EndPointState epState)
+ {
+ logger_.debug("CHANGE IN STATE FOR @ StorageLoadBalancer " + endpoint);
+ // load information for this specified endpoint for load balancing
+ ApplicationState loadInfoState = epState.getApplicationState(LoadDisseminator.loadInfo_);
+ if ( loadInfoState != null )
+ {
+ String lInfoState = loadInfoState.getState();
+ LoadInfo lInfo = new LoadInfo(lInfoState);
+ loadInfo_.put(endpoint, lInfo);
+
+ /*
+ int currentLoad = Integer.parseInt(loadInfoState.getState());
+ // update load information for this endpoint
+ loadInfo_.put(endpoint, currentLoad);
+
+ // clone load information to perform calculations
+ loadInfo2_.putAll(loadInfo_);
+ // Perform the analysis for load balance operations
+ if ( isHeavyNode() )
+ {
+ logger_.debug(StorageService.getLocalStorageEndPoint() + " is a heavy node with load " + localLoad());
+ // lb_.schedule( new LoadBalancer(), StorageLoadBalancer.delay_, TimeUnit.MINUTES );
+ }
+ */
+ }
+ }
+
+ /*
+ * Load information associated with a given endpoint.
+ */
+ LoadInfo getLoad(EndPoint ep)
+ {
+ LoadInfo li = loadInfo_.get(ep);
+ return li;
+ }
+
+ /*
+ private boolean isMoveable()
+ {
+ if ( !isMoveable_.get() )
+ return false;
+ int myload = localLoad();
+ EndPoint successor = storageService_.getSuccessor(StorageService.getLocalStorageEndPoint());
+ LoadInfo li = loadInfo2_.get(successor);
+ // "load" is NULL means that the successor node has not
+ // yet gossiped its load information. We should return
+ // false in this case since we want to err on the side
+ // of caution.
+ if ( li == null )
+ return false;
+ else
+ {
+ if ( ( myload + li.count() ) > StorageLoadBalancer.ratio_*averageSystemLoad() )
+ return false;
+ else
+ return true;
+ }
+ }
+ */
+
+ /*
+ private int localLoad()
+ {
+ LoadInfo value = loadInfo2_.get(StorageService.getLocalStorageEndPoint());
+ return (value == null) ? 0 : value.count();
+ }
+ */
+
+ /*
+ private int averageSystemLoad()
+ {
+ int nodeCount = loadInfo2_.size();
+ Set<EndPoint> nodes = loadInfo2_.keySet();
+
+ int systemLoad = 0;
+ for ( EndPoint node : nodes )
+ {
+ LoadInfo load = loadInfo2_.get(node);
+ if ( load != null )
+ systemLoad += load.count();
+ }
+ int averageLoad = (nodeCount > 0) ? (systemLoad / nodeCount) : 0;
+ logger_.debug("Average system load should be " + averageLoad);
+ return averageLoad;
+ }
+ */
+
+ /*
+ private boolean isHeavyNode()
+ {
+ return ( localLoad() > ( StorageLoadBalancer.ratio_ * averageSystemLoad() ) );
+ }
+ */
+
+ /*
+ private boolean isMoveable(EndPoint target)
+ {
+ int threshold = (int)(StorageLoadBalancer.ratio_ * averageSystemLoad());
+ if ( isANeighbour(target) )
+ {
+ // If the target is a neighbour then it is
+ // moveable if its
+ LoadInfo load = loadInfo2_.get(target);
+ if ( load == null )
+ return false;
+ else
+ {
+ int myload = localLoad();
+ int avgLoad = (load.count() + myload) >> 1;
+ if ( avgLoad <= threshold )
+ return true;
+ else
+ return false;
+ }
+ }
+ else
+ {
+ EndPoint successor = storageService_.getSuccessor(target);
+ LoadInfo sLoad = loadInfo2_.get(successor);
+ LoadInfo targetLoad = loadInfo2_.get(target);
+ if ( (sLoad.count() + targetLoad.count()) > threshold )
+ return false;
+ else
+ return true;
+ }
+ }
+ */
+
+ private boolean isANeighbour(EndPoint neighbour)
+ {
+ EndPoint predecessor = storageService_.getPredecessor(StorageService.getLocalStorageEndPoint());
+ if ( predecessor.equals(neighbour) )
+ return true;
+
+ EndPoint successor = storageService_.getSuccessor(StorageService.getLocalStorageEndPoint());
+ if ( successor.equals(neighbour) )
+ return true;
+
+ return false;
+ }
+
+ /*
+ * Determine the nodes that are lightly loaded. Choose at
+ * random one of the lightly loaded nodes and use them as
+ * a potential target for load balance.
+ */
+ /*
+ private EndPoint findARandomLightNode()
+ {
+ List<EndPoint> potentialCandidates = new ArrayList<EndPoint>();
+ Set<EndPoint> allTargets = loadInfo2_.keySet();
+ int avgLoad = averageSystemLoad();
+
+ for( EndPoint target : allTargets )
+ {
+ LoadInfo load = loadInfo2_.get(target);
+ if ( load.count() < avgLoad )
+ potentialCandidates.add(target);
+ }
+
+ if ( potentialCandidates.size() > 0 )
+ {
+ Random random = new Random();
+ int index = random.nextInt(potentialCandidates.size());
+ return potentialCandidates.get(index);
+ }
+ return null;
+ }
+ */
+}
+
+class MoveMessage implements Serializable
+{
+ private BigInteger targetToken_;
+
+ private MoveMessage()
+ {
+ }
+
+ MoveMessage(BigInteger targetToken)
+ {
+ targetToken_ = targetToken;
+ }
+
+ BigInteger getTargetToken()
+ {
+ return targetToken_;
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,560 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ReadMessage;
+import org.apache.cassandra.db.ReadResponseMessage;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.RowMutationMessage;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.TouchMessage;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IAsyncResult;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+public class StorageProxy
+{
+ private static Logger logger_ = Logger.getLogger(StorageProxy.class);
+
+ /**
+ * This method is responsible for creating Message to be
+ * sent over the wire to N replicas where some of the replicas
+ * may be hints.
+ */
+ private static Map<EndPoint, Message> createWriteMessages(RowMutationMessage rmMessage, Map<EndPoint, EndPoint> endpointMap) throws IOException
+ {
+ Map<EndPoint, Message> messageMap = new HashMap<EndPoint, Message>();
+ Message message = RowMutationMessage.makeRowMutationMessage(rmMessage);
+
+ Set<EndPoint> targets = endpointMap.keySet();
+ for( EndPoint target : targets )
+ {
+ EndPoint hint = endpointMap.get(target);
+ if ( !target.equals(hint) )
+ {
+ Message hintedMessage = RowMutationMessage.makeRowMutationMessage(rmMessage);
+ hintedMessage.addHeader(RowMutationMessage.hint_, EndPoint.toBytes(hint) );
+ logger_.debug("Sending the hint of " + target.getHost() + " to " + hint.getHost());
+ messageMap.put(target, hintedMessage);
+ }
+ else
+ {
+ messageMap.put(target, message);
+ }
+ }
+ return messageMap;
+ }
+
+ /**
+ * Use this method to have this RowMutation applied
+ * across all replicas. This method will take care
+ * of the possibility of a replica being down and hint
+ * the data across to some other replica.
+ * @param RowMutation the mutation to be applied
+ * across the replicas
+ */
+ public static void insert(RowMutation rm)
+ {
+ /*
+ * Get the N nodes from storage service where the data needs to be
+ * replicated
+ * Construct a message for write
+ * Send them asynchronously to the replicas.
+ */
+ try
+ {
+ logger_.debug(" insert");
+ Map<EndPoint, EndPoint> endpointMap = StorageService.instance().getNStorageEndPointMap(rm.key());
+ // TODO: throw a thrift exception if we do not have N nodes
+ RowMutationMessage rmMsg = new RowMutationMessage(rm);
+ /* Create the write messages to be sent */
+ Map<EndPoint, Message> messageMap = createWriteMessages(rmMsg, endpointMap);
+ Set<EndPoint> endpoints = messageMap.keySet();
+ for(EndPoint endpoint : endpoints)
+ {
+ MessagingService.getMessagingInstance().sendOneWay(messageMap.get(endpoint), endpoint);
+ }
+ }
+ catch (Exception e)
+ {
+ logger_.info( LogUtil.throwableToString(e) );
+ }
+ return;
+ }
+
+
+ public static Row doReadProtocol(String key, ReadMessage readMessage) throws IOException,TimeoutException
+ {
+ EndPoint endPoint = null;
+ try
+ {
+ endPoint = StorageService.instance().findSuitableEndPoint(key);
+ }
+ catch( Throwable ex)
+ {
+ ex.printStackTrace();
+ }
+ if(endPoint != null)
+ {
+ Message message = ReadMessage.makeReadMessage(readMessage);
+ IAsyncResult iar = MessagingService.getMessagingInstance().sendRR(message, endPoint);
+ Object[] result = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
+ byte[] body = (byte[])result[0];
+ DataInputBuffer bufIn = new DataInputBuffer();
+ bufIn.reset(body, body.length);
+ ReadResponseMessage responseMessage = ReadResponseMessage.serializer().deserialize(bufIn);
+ return responseMessage.row();
+ }
+ else
+ {
+ logger_.warn(" Alert : Unable to find a suitable end point for the key : " + key );
+ }
+ return null;
+ }
+
+ static void touch_local (String tablename, String key, boolean fData ) throws IOException
+ {
+ Table table = Table.open( tablename );
+ table.touch(key, fData);
+ }
+
+ static void weakTouchProtocol(String tablename, String key, boolean fData) throws Exception
+ {
+ EndPoint endPoint = null;
+ try
+ {
+ endPoint = StorageService.instance().findSuitableEndPoint(key);
+ }
+ catch( Throwable ex)
+ {
+ ex.printStackTrace();
+ }
+ if(endPoint != null)
+ {
+ if(endPoint.equals(StorageService.getLocalStorageEndPoint()))
+ {
+ touch_local(tablename, key, fData);
+ return;
+ }
+ TouchMessage touchMessage = null;
+ touchMessage = new TouchMessage(tablename, key, fData);
+ Message message = TouchMessage.makeTouchMessage(touchMessage);
+ MessagingService.getMessagingInstance().sendOneWay(message, endPoint);
+ }
+ return ;
+ }
+
+ static void strongTouchProtocol(String tablename, String key, boolean fData) throws Exception
+ {
+ Map<EndPoint, EndPoint> endpointMap = StorageService.instance().getNStorageEndPointMap(key);
+ Set<EndPoint> endpoints = endpointMap.keySet();
+ TouchMessage touchMessage = null;
+ touchMessage = new TouchMessage(tablename, key, fData);
+ Message message = TouchMessage.makeTouchMessage(touchMessage);
+ for(EndPoint endpoint : endpoints)
+ {
+ MessagingService.getMessagingInstance().sendOneWay(message, endpoint);
+ }
+ }
+
+ /*
+ * Only touch data on the most suitable end point.
+ */
+ public static void touchProtocol(String tablename, String key, boolean fData, StorageService.ConsistencyLevel consistencyLevel) throws Exception
+ {
+ switch ( consistencyLevel )
+ {
+ case WEAK:
+ weakTouchProtocol(tablename, key, fData);
+ break;
+
+ case STRONG:
+ strongTouchProtocol(tablename, key, fData);
+ break;
+
+ default:
+ weakTouchProtocol(tablename, key, fData);
+ break;
+ }
+ }
+
+
+
+ public static Row readProtocol(String tablename, String key, String columnFamily, List<String> columnNames, StorageService.ConsistencyLevel consistencyLevel) throws Exception
+ {
+ Row row = null;
+ boolean foundLocal = false;
+ EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(key);
+ for(EndPoint endPoint: endpoints)
+ {
+ if(endPoint.equals(StorageService.getLocalStorageEndPoint()))
+ {
+ foundLocal = true;
+ break;
+ }
+ }
+ if(!foundLocal && consistencyLevel == StorageService.ConsistencyLevel.WEAK)
+ {
+ ReadMessage readMessage = null;
+ readMessage = new ReadMessage(tablename, key, columnFamily, columnNames);
+ return doReadProtocol(key, readMessage);
+ }
+ else
+ {
+ switch ( consistencyLevel )
+ {
+ case WEAK:
+ row = weakReadProtocol(tablename, key, columnFamily, columnNames);
+ break;
+
+ case STRONG:
+ row = strongReadProtocol(tablename, key, columnFamily, columnNames);
+ break;
+
+ default:
+ row = weakReadProtocol(tablename, key, columnFamily, columnNames);
+ break;
+ }
+ }
+ return row;
+
+
+ }
+
+ public static Row readProtocol(String tablename, String key, String columnFamily, int start, int count, StorageService.ConsistencyLevel consistencyLevel) throws Exception
+ {
+ Row row = null;
+ boolean foundLocal = false;
+ EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(key);
+ for(EndPoint endPoint: endpoints)
+ {
+ if(endPoint.equals(StorageService.getLocalStorageEndPoint()))
+ {
+ foundLocal = true;
+ break;
+ }
+ }
+ if(!foundLocal && consistencyLevel == StorageService.ConsistencyLevel.WEAK)
+ {
+ ReadMessage readMessage = null;
+ readMessage = new ReadMessage(tablename, key, columnFamily, start, count);
+ return doReadProtocol(key, readMessage);
+ }
+ else
+ {
+ switch ( consistencyLevel )
+ {
+ case WEAK:
+ row = weakReadProtocol(tablename, key, columnFamily, start, count);
+ break;
+
+ case STRONG:
+ row = strongReadProtocol(tablename, key, columnFamily, start, count);
+ break;
+
+ default:
+ row = weakReadProtocol(tablename, key, columnFamily, start, count);
+ break;
+ }
+ }
+ return row;
+ }
+
+ public static Row readProtocol(String tablename, String key, String columnFamily, long sinceTimestamp, StorageService.ConsistencyLevel consistencyLevel) throws Exception
+ {
+ Row row = null;
+ boolean foundLocal = false;
+ EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(key);
+ for(EndPoint endPoint: endpoints)
+ {
+ if(endPoint.equals(StorageService.getLocalStorageEndPoint()))
+ {
+ foundLocal = true;
+ break;
+ }
+ }
+ if(!foundLocal && consistencyLevel == StorageService.ConsistencyLevel.WEAK)
+ {
+ ReadMessage readMessage = null;
+ readMessage = new ReadMessage(tablename, key, columnFamily, sinceTimestamp);
+ return doReadProtocol(key, readMessage);
+ }
+ else
+ {
+ switch ( consistencyLevel )
+ {
+ case WEAK:
+ row = weakReadProtocol(tablename, key, columnFamily, sinceTimestamp);
+ break;
+
+ case STRONG:
+ row = strongReadProtocol(tablename, key, columnFamily, sinceTimestamp);
+ break;
+
+ default:
+ row = weakReadProtocol(tablename, key, columnFamily, sinceTimestamp);
+ break;
+ }
+ }
+ return row;
+ }
+
+ public static Row strongReadProtocol(String tablename, String key, String columnFamily, List<String> columns) throws Exception
+ {
+ long startTime = System.currentTimeMillis();
+ // TODO: throw a thrift exception if we do not have N nodes
+ ReadMessage readMessage = new ReadMessage(tablename, key, columnFamily, columns);
+
+ ReadMessage readMessageDigestOnly = new ReadMessage(tablename, key, columnFamily, columns);
+ readMessageDigestOnly.setIsDigestQuery(true);
+
+ Row row = StorageProxy.doStrongReadProtocol(key, readMessage, readMessageDigestOnly);
+ logger_.debug("readProtocol: " + (System.currentTimeMillis() - startTime) + " ms.");
+ return row;
+ }
+
+ /*
+ * This function executes the read protocol.
+ // 1. Get the N nodes from storage service where the data needs to be
+ // replicated
+ // 2. Construct a message for read\write
+ * 3. Set one of teh messages to get teh data and teh rest to get teh digest
+ // 4. SendRR ( to all the nodes above )
+ // 5. Wait for a response from atleast X nodes where X <= N and teh data node
+ * 6. If the digest matches return teh data.
+ * 7. else carry out read repair by getting data from all the nodes.
+ // 5. return success
+ *
+ */
+ public static Row strongReadProtocol(String tablename, String key, String columnFamily, int start, int count) throws IOException, TimeoutException
+ {
+ long startTime = System.currentTimeMillis();
+ // TODO: throw a thrift exception if we do not have N nodes
+ ReadMessage readMessage = null;
+ ReadMessage readMessageDigestOnly = null;
+ if( start >= 0 && count < Integer.MAX_VALUE)
+ {
+ readMessage = new ReadMessage(tablename, key, columnFamily, start, count);
+ }
+ else
+ {
+ readMessage = new ReadMessage(tablename, key, columnFamily);
+ }
+ Message message = ReadMessage.makeReadMessage(readMessage);
+ if( start >= 0 && count < Integer.MAX_VALUE)
+ {
+ readMessageDigestOnly = new ReadMessage(tablename, key, columnFamily, start, count);
+ }
+ else
+ {
+ readMessageDigestOnly = new ReadMessage(tablename, key, columnFamily);
+ }
+ readMessageDigestOnly.setIsDigestQuery(true);
+ Row row = doStrongReadProtocol(key, readMessage, readMessageDigestOnly);
+ logger_.debug("readProtocol: " + (System.currentTimeMillis() - startTime) + " ms.");
+ return row;
+ }
+
+ public static Row strongReadProtocol(String tablename, String key, String columnFamily, long sinceTimestamp) throws IOException, TimeoutException
+ {
+ long startTime = System.currentTimeMillis();
+ // TODO: throw a thrift exception if we do not have N nodes
+ ReadMessage readMessage = null;
+ ReadMessage readMessageDigestOnly = null;
+ readMessage = new ReadMessage(tablename, key, columnFamily, sinceTimestamp);
+ Message message = ReadMessage.makeReadMessage(readMessage);
+ readMessageDigestOnly = new ReadMessage(tablename, key, columnFamily, sinceTimestamp);
+ readMessageDigestOnly.setIsDigestQuery(true);
+ Row row = doStrongReadProtocol(key, readMessage, readMessageDigestOnly);
+ logger_.debug("readProtocol: " + (System.currentTimeMillis() - startTime) + " ms.");
+ return row;
+ }
+
+ /*
+ * This method performs the actual read from the replicas.
+ * param @ key - key for which the data is required.
+ * param @ readMessage - the read message to get the actual data
+ * param @ readMessageDigest - the read message to get the digest.
+ */
+ private static Row doStrongReadProtocol(String key, ReadMessage readMessage, ReadMessage readMessageDigest) throws IOException, TimeoutException
+ {
+ Row row = null;
+ Message message = ReadMessage.makeReadMessage(readMessage);
+ Message messageDigestOnly = ReadMessage.makeReadMessage(readMessageDigest);
+
+ IResponseResolver<Row> readResponseResolver = new ReadResponseResolver();
+ QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(
+ DatabaseDescriptor.getReplicationFactor(),
+ readResponseResolver);
+ EndPoint dataPoint = StorageService.instance().findSuitableEndPoint(key);
+ List<EndPoint> endpointList = new ArrayList<EndPoint>( Arrays.asList( StorageService.instance().getNStorageEndPoint(key) ) );
+ /* Remove the local storage endpoint from the list. */
+ endpointList.remove( dataPoint );
+ EndPoint[] endPoints = new EndPoint[endpointList.size() + 1];
+ Message messages[] = new Message[endpointList.size() + 1];
+
+ // first message is the data Point
+ endPoints[0] = dataPoint;
+ messages[0] = message;
+
+ for(int i=1; i < endPoints.length ; i++)
+ {
+ endPoints[i] = endpointList.get(i-1);
+ messages[i] = messageDigestOnly;
+ }
+
+ try
+ {
+ MessagingService.getMessagingInstance().sendRR(messages, endPoints, quorumResponseHandler);
+
+ long startTime2 = System.currentTimeMillis();
+ row = quorumResponseHandler.get();
+ logger_.debug("quorumResponseHandler: " + (System.currentTimeMillis() - startTime2)
+ + " ms.");
+ if (row == null)
+ {
+ logger_.info("ERROR No row for this key .....: " + key);
+ // TODO: throw a thrift exception
+ return row;
+ }
+ }
+ catch (DigestMismatchException ex)
+ {
+ if ( DatabaseDescriptor.getConsistencyCheck())
+ {
+ IResponseResolver<Row> readResponseResolverRepair = new ReadResponseResolver();
+ QuorumResponseHandler<Row> quorumResponseHandlerRepair = new QuorumResponseHandler<Row>(
+ DatabaseDescriptor.getReplicationFactor(),
+ readResponseResolverRepair);
+ readMessage.setIsDigestQuery(false);
+ logger_.info("DigestMismatchException: " + key);
+ Message messageRepair = ReadMessage.makeReadMessage(readMessage);
+ MessagingService.getMessagingInstance().sendRR(messageRepair, endPoints,
+ quorumResponseHandlerRepair);
+ try
+ {
+ row = quorumResponseHandlerRepair.get();
+ }
+ catch(DigestMismatchException dex)
+ {
+ logger_.warn(LogUtil.throwableToString(dex));
+ }
+ if (row == null)
+ {
+ logger_.info("ERROR No row for this key .....: " + key);
+ }
+ }
+ }
+ return row;
+ }
+
+ public static Row weakReadProtocol(String tablename, String key, String columnFamily, List<String> columns) throws Exception
+ {
+ long startTime = System.currentTimeMillis();
+ List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(key);
+ /* Remove the local storage endpoint from the list. */
+ endpoints.remove( StorageService.getLocalStorageEndPoint() );
+ // TODO: throw a thrift exception if we do not have N nodes
+
+ Table table = Table.open( DatabaseDescriptor.getTables().get(0) );
+ Row row = table.getRow(key, columnFamily, columns);
+
+ logger_.debug("Local Read Protocol: " + (System.currentTimeMillis() - startTime) + " ms.");
+ /*
+ * Do the consistency checks in the background and return the
+ * non NULL row.
+ */
+ if ( endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
+ StorageService.instance().doConsistencyCheck(row, endpoints, columnFamily, columns);
+ return row;
+ }
+
+ /*
+ * This function executes the read protocol locally and should be used only if consistency is not a concern.
+ * Read the data from the local disk and return if the row is NOT NULL. If the data is NULL do the read from
+ * one of the other replicas (in the same data center if possible) till we get the data. In the event we get
+ * the data we perform consistency checks and figure out if any repairs need to be done to the replicas.
+ */
+ public static Row weakReadProtocol(String tablename, String key, String columnFamily, int start, int count) throws Exception
+ {
+ Row row = null;
+ long startTime = System.currentTimeMillis();
+ List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(key);
+ /* Remove the local storage endpoint from the list. */
+ endpoints.remove( StorageService.getLocalStorageEndPoint() );
+ // TODO: throw a thrift exception if we do not have N nodes
+
+ Table table = Table.open( DatabaseDescriptor.getTables().get(0) );
+ if( start >= 0 && count < Integer.MAX_VALUE)
+ {
+ row = table.getRow(key, columnFamily, start, count);
+ }
+ else
+ {
+ row = table.getRow(key, columnFamily);
+ }
+
+ logger_.debug("Local Read Protocol: " + (System.currentTimeMillis() - startTime) + " ms.");
+ /*
+ * Do the consistency checks in the background and return the
+ * non NULL row.
+ */
+ if ( endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
+ StorageService.instance().doConsistencyCheck(row, endpoints, columnFamily, start, count);
+ return row;
+ }
+
+ public static Row weakReadProtocol(String tablename, String key, String columnFamily, long sinceTimestamp) throws Exception
+ {
+ Row row = null;
+ long startTime = System.currentTimeMillis();
+ List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(key);
+ /* Remove the local storage endpoint from the list. */
+ endpoints.remove( StorageService.getLocalStorageEndPoint() );
+ // TODO: throw a thrift exception if we do not have N nodes
+
+ Table table = Table.open( DatabaseDescriptor.getTables().get(0) );
+ row = table.getRow(key, columnFamily,sinceTimestamp);
+ logger_.debug("Local Read Protocol: " + (System.currentTimeMillis() - startTime) + " ms.");
+ /*
+ * Do the consistency checks in the background and return the
+ * non NULL row.
+ */
+ if ( endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
+ StorageService.instance().doConsistencyCheck(row, endpoints, columnFamily, sinceTimestamp);
+ return row;
+ }
+
+}