You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2009/03/27 17:08:53 UTC

svn commit: r759189 - /incubator/cassandra/trunk/src/org/apache/cassandra/service/MultiQuorumResponseHandler.java

Author: alakshman
Date: Fri Mar 27 16:08:53 2009
New Revision: 759189

URL: http://svn.apache.org/viewvc?rev=759189&view=rev
Log:
Forgot to add this as part of the multiget().

Added:
    incubator/cassandra/trunk/src/org/apache/cassandra/service/MultiQuorumResponseHandler.java

Added: incubator/cassandra/trunk/src/org/apache/cassandra/service/MultiQuorumResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/MultiQuorumResponseHandler.java?rev=759189&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/MultiQuorumResponseHandler.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/MultiQuorumResponseHandler.java Fri Mar 27 16:08:53 2009
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ReadMessage;
+import org.apache.cassandra.db.ReadResponseMessage;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.WriteResponseMessage;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.utils.*;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class MultiQuorumResponseHandler implements IAsyncCallback
+{ 
+    private static Logger logger_ = Logger.getLogger( QuorumResponseHandler.class );
+    private Lock lock_ = new ReentrantLock();
+    private Condition condition_;
+    /* This maps the keys to the original data read messages */
+    private Map<String, ReadMessage> readMessages_ = new HashMap<String, ReadMessage>();
+    /* This maps the key to its set of replicas */
+    private Map<String, EndPoint[]> endpoints_ = new HashMap<String, EndPoint[]>();
+    /* This maps the groupId to the individual callback for the set of messages */
+    private Map<String, SingleQuorumResponseHandler> handlers_ = new HashMap<String, SingleQuorumResponseHandler>();
+    /* This should hold all the responses for the keys */
+    private List<Row> responses_ = new ArrayList<Row>();
+    private AtomicBoolean done_ = new AtomicBoolean(false);
+    
+    /**
+     * This is used to handle the responses from the individual messages
+     * that are sent out to the replicas.
+     * @author alakshman
+     *
+    */
+    private class SingleQuorumResponseHandler implements IAsyncCallback
+    {
+        private Lock lock_ = new ReentrantLock();
+        private IResponseResolver<Row> responseResolver_;
+        private List<Message> responses_ = new ArrayList<Message>();
+        
+        SingleQuorumResponseHandler(IResponseResolver<Row> responseResolver)
+        {
+            responseResolver_ = responseResolver;
+        }
+        
+        public void attachContext(Object o)
+        {
+            throw new UnsupportedOperationException("This operation is not supported in this implementation");
+        }
+        
+        public void response(Message response)
+        {
+            lock_.lock();
+            try
+            {
+                responses_.add(response);
+                int majority = (DatabaseDescriptor.getReplicationFactor() >> 1) + 1;                            
+                if ( responses_.size() >= majority && responseResolver_.isDataPresent(responses_))
+                {
+                    onCompletion();               
+                }
+            }
+            catch ( IOException ex )
+            {
+                logger_.info( LogUtil.throwableToString(ex) );
+            }
+            finally
+            {
+                lock_.unlock();
+            }
+        }
+        
+        private void onCompletion() throws IOException
+        {
+            try
+            {
+                Row row = responseResolver_.resolve(responses_);
+                MultiQuorumResponseHandler.this.onCompleteResponse(row);
+            }
+            catch ( DigestMismatchException ex )
+            {
+                /* 
+                 * The DigestMismatchException has the key for which the mismatch
+                 * occured bundled in it as context 
+                */
+                String key = ex.getMessage();
+                onDigestMismatch(key);
+            }
+        }
+        
+        /**
+         * This method is invoked on a digest match. We pass in the key
+         * in order to retrieve the appropriate data message that needs
+         * to be sent out to the replicas. 
+         * 
+         * @param key for which the mismatch occured.
+        */
+        private void onDigestMismatch(String key) throws IOException
+        {
+            if ( DatabaseDescriptor.getConsistencyCheck())
+            {                                
+                ReadMessage readMessage = readMessages_.get(key);
+                readMessage.setIsDigestQuery(false);            
+                Message messageRepair = ReadMessage.makeReadMessage(readMessage);
+                EndPoint[] endpoints = MultiQuorumResponseHandler.this.endpoints_.get( readMessage.key() );
+                Message[][] messages = new Message[][]{ {messageRepair, messageRepair, messageRepair} };
+                EndPoint[][] epList = new EndPoint[][]{ endpoints };
+                MessagingService.getMessagingInstance().sendRR(messages, epList, MultiQuorumResponseHandler.this);                
+            }
+        }
+    }
+    
+    public MultiQuorumResponseHandler(Map<String, ReadMessage> readMessages, Map<String, EndPoint[]> endpoints)
+    {        
+        condition_ = lock_.newCondition();
+        readMessages_ = readMessages;
+        endpoints_ = endpoints;
+    }
+    
+    public Row[] get() throws TimeoutException
+    {
+        long startTime = System.currentTimeMillis();
+        lock_.lock();
+        try
+        {            
+            boolean bVal = true;            
+            try
+            {
+                if ( !done_.get() )
+                {                   
+                    bVal = condition_.await(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
+                }
+            }
+            catch ( InterruptedException ex )
+            {
+                logger_.debug( LogUtil.throwableToString(ex) );
+            }
+            
+            if ( !bVal && !done_.get() )
+            {
+                StringBuilder sb = new StringBuilder("");
+                for ( Row row : responses_ )
+                {
+                    sb.append(row.key());
+                    sb.append(":");
+                }                
+                throw new TimeoutException("Operation timed out - received only " +  responses_.size() + " responses from " + sb.toString() + " .");
+            }
+        }
+        finally
+        {
+            lock_.unlock();
+        }
+        
+        logger_.info("MultiQuorumResponseHandler: " + (System.currentTimeMillis() - startTime) + " ms.");
+        return responses_.toArray( new Row[0] );
+    }
+    
+    /**
+     * Invoked when a complete response has been obtained
+     * for one of the sub-groups a.k.a keys for the query 
+     * has been performed.
+     * 
+     * @param row obtained as a result of the response.
+     */
+    void onCompleteResponse(Row row)
+    {        
+        if ( !done_.get() )
+        {
+            responses_.add(row);
+            if ( responses_.size() == readMessages_.size() )
+            {
+                done_.set(true);
+                condition_.signal();                
+            }
+        }
+    }
+    
+    /**
+     * The handler of the response message that has been
+     * sent by one of the replicas for one of the keys.
+     * 
+     * @param message the reponse message for one of the
+     *        message that we sent out.
+     */
+    public void response(Message message)
+    {
+        lock_.lock();
+        try
+        {
+            SingleQuorumResponseHandler handler = handlers_.get(message.getMessageId());
+            handler.response(message);
+        }
+        finally
+        {
+            lock_.unlock();
+        }
+    }
+    
+    /**
+     * The context that is passed in for the query of
+     * multiple keys in the system. For each message 
+     * id in the context register a callback handler 
+     * for the same. This is done so that all responses
+     * for a given key use the same callback handler.
+     * 
+     * @param o the context which is an array of strings
+     *        corresponding to the message id's for each
+     *        key.
+     */
+    public void attachContext(Object o)
+    {
+        String[] gids = (String[])o;
+        for ( String gid : gids )
+        {
+            IResponseResolver<Row> responseResolver = new ReadResponseResolver();
+            SingleQuorumResponseHandler handler = new SingleQuorumResponseHandler(responseResolver);
+            handlers_.put(gid, handler);
+        }
+    }
+}