You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/01/05 07:58:17 UTC

svn commit: r1055319 - /cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/

Author: jbellis
Date: Wed Jan  5 06:58:16 2011
New Revision: 1055319

URL: http://svn.apache.org/viewvc?rev=1055319&view=rev
Log:
r/m SP.weakRead, rename strongRead to fetchRows.  implement read repair as a second resolve after the initial one for the data
patch by jbellis; reviewed by Daniel Doubleday for CASSANDRA-982

Removed:
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java
Modified:
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java?rev=1055319&r1=1055318&r2=1055319&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java Wed Jan  5 06:58:16 2011
@@ -24,7 +24,7 @@ import org.apache.cassandra.net.Message;
 
 public interface IResponseResolver<T> {
 
-	/*
+	/**
 	 * This Method resolves the responses that are passed in . for example : if
 	 * its write response then all we get is true or false return values which
 	 * implies if the writes were successful but for reads its more complicated
@@ -33,8 +33,14 @@ public interface IResponseResolver<T> {
 	 * needs from this interface.
 	 */
 	public T resolve() throws DigestMismatchException, IOException;
+
 	public boolean isDataPresent();
 
+    /**
+     * returns the data response without comparing with any digests
+     */
+    public T getData() throws IOException;
+
     public void preprocess(Message message);
     public Iterable<Message> getMessages();
     public int getMessageCount();

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/QuorumResponseHandler.java?rev=1055319&r1=1055318&r2=1055319&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/QuorumResponseHandler.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/QuorumResponseHandler.java Wed Jan  5 06:58:16 2011
@@ -39,8 +39,9 @@ import org.apache.cassandra.utils.Simple
 public class QuorumResponseHandler<T> implements IAsyncCallback
 {
     protected static final Logger logger = LoggerFactory.getLogger( QuorumResponseHandler.class );
+
+    public final IResponseResolver<T> resolver;
     protected final SimpleCondition condition = new SimpleCondition();
-    protected final IResponseResolver<T> resolver;
     private final long startTime;
     protected final int blockfor;
     
@@ -58,38 +59,34 @@ public class QuorumResponseHandler<T> im
     
     public T get() throws TimeoutException, DigestMismatchException, IOException
     {
+        long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime);
+        boolean success;
         try
         {
-            long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime);
-            boolean success;
-            try
-            {
-                success = condition.await(timeout, TimeUnit.MILLISECONDS);
-            }
-            catch (InterruptedException ex)
-            {
-                throw new AssertionError(ex);
-            }
-
-            if (!success)
-            {
-                StringBuilder sb = new StringBuilder("");
-                for (Message message : resolver.getMessages())
-                {
-                    sb.append(message.getFrom());
-                }
-                throw new TimeoutException("Operation timed out - received only " + resolver.getMessageCount() + " responses from " + sb.toString() + " .");
-            }
+            success = condition.await(timeout, TimeUnit.MILLISECONDS);
         }
-        finally
+        catch (InterruptedException ex)
         {
-            for (Message response : resolver.getMessages())
-            {
-                MessagingService.removeRegisteredCallback(response.getMessageId());
-            }
+            throw new AssertionError(ex);
         }
 
-        return resolver.resolve();
+        if (!success)
+        {
+            StringBuilder sb = new StringBuilder("");
+            for (Message message : resolver.getMessages())
+                sb.append(message.getFrom()).append(", ");
+            throw new TimeoutException("Operation timed out - received only " + resolver.getMessageCount() + " responses from " + sb.toString() + " .");
+        }
+
+        return blockfor == 1 ? resolver.getData() : resolver.resolve();
+    }
+
+    public void close()
+    {
+        for (Message response : resolver.getMessages())
+        {
+            MessagingService.removeRegisteredCallback(response.getMessageId());
+        }
     }
     
     public void response(Message message)

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1055319&r1=1055318&r2=1055319&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java Wed Jan  5 06:58:16 2011
@@ -54,6 +54,15 @@ public class RangeSliceResponseResolver 
         this.table = table;
     }
 
+    public List<Row> getData() throws IOException
+    {
+        Message response = responses.iterator().next();
+        RangeSliceReply reply = RangeSliceReply.read(response.getMessageBody());
+        return reply.rows;
+    }
+
+    // Note: this deserializes the response a 2nd time if getData was called first
+    // (this is not currently an issue since we don't do read repair for range queries.)
     public List<Row> resolve() throws IOException
     {
         CollatingIterator collator = new CollatingIterator(new Comparator<Pair<Row,InetAddress>>()

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=1055319&r1=1055318&r2=1055319&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java Wed Jan  5 06:58:16 2011
@@ -46,6 +46,7 @@ public class ReadResponseResolver implem
     private final String table;
     private final Map<Message, ReadResponse> results = new NonBlockingHashMap<Message, ReadResponse>();
     private DecoratedKey key;
+    private ByteBuffer digest;
 
     public ReadResponseResolver(String table, ByteBuffer key)
     {
@@ -53,14 +54,29 @@ public class ReadResponseResolver implem
         this.key = StorageService.getPartitioner().decorateKey(key);
     }
     
+    public Row getData() throws IOException
+    {
+        for (Map.Entry<Message, ReadResponse> entry : results.entrySet())
+        {
+            ReadResponse result = entry.getValue();
+            if (!result.isDigestQuery())
+                return result.row();
+        }
+
+        throw new AssertionError("getData should not be invoked when no data is present");
+    }
+
     /*
-     * This method handles two different scenarios:
+     * This method handles three different scenarios:
      *
-     * 1) we're handling the initial read, of data from the closest replica + digests
+     * 1a)we're handling the initial read, of data from the closest replica + digests
      *    from the rest.  In this case we check the digests against each other,
      *    throw an exception if there is a mismatch, otherwise return the data row.
      *
-     * 2) there was a mismatch on the initial read, so we redid the digest requests
+     * 1b)we're checking additional digests that arrived after the minimum to handle
+     *    the requested ConsistencyLevel, i.e. asynchronouse read repair check
+     *
+     * 2) there was a mismatch on the initial read (1a or 1b), so we redid the digest requests
      *    as full data reads.  In this case we need to compute the most recent version
      *    of each column, and send diffs to out-of-date replicas.
      */
@@ -72,7 +88,6 @@ public class ReadResponseResolver implem
         long startTime = System.currentTimeMillis();
 		List<ColumnFamily> versions = new ArrayList<ColumnFamily>();
 		List<InetAddress> endpoints = new ArrayList<InetAddress>();
-		ByteBuffer digest = null;
 
         // validate digests against each other; throw immediately on mismatch.
         // also, collects data results into versions/endpoints lists.

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1055319&r1=1055318&r2=1055319&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java Wed Jan  5 06:58:16 2011
@@ -48,7 +48,6 @@ import org.apache.cassandra.gms.Gossiper
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.net.IAsyncCallback;
-import org.apache.cassandra.net.IAsyncResult;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.thrift.*;
@@ -63,6 +62,8 @@ public class StorageProxy implements Sto
 {
     private static final Logger logger = LoggerFactory.getLogger(StorageProxy.class);
 
+    private static ScheduledExecutorService repairExecutor = new ScheduledThreadPoolExecutor(1); // TODO JMX-enable this
+
     private static final Random random = new Random();
     // mbean stuff
     private static final LatencyTracker readStats = new LatencyTracker();
@@ -305,15 +306,7 @@ public class StorageProxy implements Sto
         List<Row> rows;
         try
         {
-            if (consistency_level == ConsistencyLevel.ONE)
-            {
-                rows = weakRead(commands);
-            }
-            else
-            {
-                assert consistency_level.getValue() >= ConsistencyLevel.QUORUM.getValue();
-                rows = strongRead(commands, consistency_level);
-            }
+            rows = fetchRows(commands, consistency_level);
         }
         finally
         {
@@ -322,91 +315,23 @@ public class StorageProxy implements Sto
         return rows;
     }
 
-    private static List<Row> weakRead(List<ReadCommand> commands) throws IOException, UnavailableException, TimeoutException
-    {
-        List<Row> rows = new ArrayList<Row>();
-
-        // send off all the commands asynchronously
-        List<Future<Object>> localFutures = null;
-        HashMap<ReadCommand, IAsyncResult> remoteResults = null;
-        for (ReadCommand command: commands)
-        {
-            InetAddress endPoint = StorageService.instance.findSuitableEndpoint(command.table, command.key);
-            if (endPoint.equals(FBUtilities.getLocalAddress()))
-            {
-                if (logger.isDebugEnabled())
-                    logger.debug("weakread reading " + command + " locally");
-
-                if (localFutures == null)
-                    localFutures = new ArrayList<Future<Object>>();
-                Callable<Object> callable = new weakReadLocalCallable(command);
-                localFutures.add(StageManager.getStage(Stage.READ).submit(callable));
-            }
-            else
-            {
-                if (remoteResults == null)
-                    remoteResults = new HashMap<ReadCommand, IAsyncResult>();
-                Message message = command.makeReadMessage();
-                if (logger.isDebugEnabled())
-                    logger.debug("weakread reading " + command + " from " + message.getMessageId() + "@" + endPoint);
-                remoteResults.put(command, MessagingService.instance().sendRR(message, endPoint));
-            }
-        }
-
-        // wait for results
-        if (localFutures != null)
-        {
-            for (Future<Object> future : localFutures)
-            {
-                Row row;
-                try
-                {
-                    row = (Row) future.get();
-                }
-                catch (Exception e)
-                {
-                    throw new RuntimeException(e);
-                }
-                rows.add(row);
-            }
-        }
-        if (remoteResults != null)
-        {
-            for (Map.Entry<ReadCommand, IAsyncResult> entry : remoteResults.entrySet())
-            {
-                ReadCommand command = entry.getKey();
-                IAsyncResult iar = entry.getValue();
-                byte[] body;
-                body = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
-                ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
-                ReadResponse response = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
-                assert response.row() != null;
-                rows.add(response.row());
-                if (randomlyReadRepair(command))
-                    StorageService.instance.doConsistencyCheck(response.row(), command, iar.getFrom());
-            }
-        }
-
-        return rows;
-    }
-
-    /*
-     * This function executes the read protocol.
-        // 1. Get the N nodes from storage service where the data needs to be
-        // replicated
-        // 2. Construct a message for read\write
-         * 3. Set one of the messages to get the data and the rest to get the digest
-        // 4. SendRR ( to all the nodes above )
-        // 5. Wait for a response from at least X nodes where X <= N and the data node
-         * 6. If the digest matches return the data.
-         * 7. else carry out read repair by getting data from all the nodes.
-        // 5. return success
+    /**
+     * This function executes local and remote reads, and blocks for the results:
+     *
+     * 1. Get the replica locations, sorted by response time according to the snitch
+     * 2. Send a data request to the closest replica, and digest requests to either
+     *    a) all the replicas, if read repair is enabled
+     *    b) the closest R-1 replicas, where R is the number required to satisfy the ConsistencyLevel
+     * 3. Wait for a response from R replicas
+     * 4. If the digests (if any) match the data return the data
+     * 5. else carry out read repair by getting data from all the nodes.
      */
-    private static List<Row> strongRead(List<ReadCommand> commands, ConsistencyLevel consistency_level) throws IOException, UnavailableException, TimeoutException
+    private static List<Row> fetchRows(List<ReadCommand> commands, ConsistencyLevel consistency_level) throws IOException, UnavailableException, TimeoutException
     {
         List<QuorumResponseHandler<Row>> quorumResponseHandlers = new ArrayList<QuorumResponseHandler<Row>>();
         List<List<InetAddress>> commandEndpoints = new ArrayList<List<InetAddress>>();
         List<Row> rows = new ArrayList<Row>();
+        Set<ReadCommand> repairs = new HashSet<ReadCommand>();
 
         // send out read requests
         for (ReadCommand command: commands)
@@ -425,16 +350,28 @@ public class StorageProxy implements Sto
             QuorumResponseHandler<Row> handler = rs.getQuorumResponseHandler(resolver, consistency_level);
             handler.assureSufficientLiveNodes(endpoints);
 
-            Message messages[] = new Message[endpoints.size()];
+            int targets;
+            if (randomlyReadRepair(command))
+            {
+                targets = endpoints.size();
+                if (targets > handler.blockfor)
+                    repairs.add(command);
+            }
+            else
+            {
+                targets = handler.blockfor;
+            }
+            Message[] messages = new Message[targets];
+
             // data-request message is sent to dataPoint, the node that will actually get
             // the data for us. The other replicas are only sent a digest query.
-            int n = 0;
-            for (InetAddress endpoint : endpoints)
+            for (int i = 0; i < messages.length; i++)
             {
+                InetAddress endpoint = endpoints.get(i);
                 Message m = endpoint.equals(dataPoint) ? message : messageDigestOnly;
-                messages[n++] = m;
+                messages[i] = m;
                 if (logger.isDebugEnabled())
-                    logger.debug("strongread reading " + (m == message ? "data" : "digest") + " for " + command + " from " + m.getMessageId() + "@" + endpoint);
+                    logger.debug("reading " + (m == message ? "data" : "digest") + " for " + command + " from " + m.getMessageId() + "@" + endpoint);
             }
             MessagingService.instance().sendRR(messages, endpoints, handler);
             quorumResponseHandlers.add(handler);
@@ -442,12 +379,13 @@ public class StorageProxy implements Sto
         }
 
         // read results and make a second pass for any digest mismatches
-        List<QuorumResponseHandler<Row>> repairResponseHandlers = null;
+        List<RepairCallback<Row>> repairResponseHandlers = null;
         for (int i = 0; i < commands.size(); i++)
         {
             QuorumResponseHandler<Row> quorumResponseHandler = quorumResponseHandlers.get(i);
             Row row;
             ReadCommand command = commands.get(i);
+            List<InetAddress> endpoints = commandEndpoints.get(i);
             try
             {
                 long startTime2 = System.currentTimeMillis();
@@ -457,18 +395,17 @@ public class StorageProxy implements Sto
 
                 if (logger.isDebugEnabled())
                     logger.debug("quorumResponseHandler: " + (System.currentTimeMillis() - startTime2) + " ms.");
+
+                if (repairs.contains(command))
+                    repairExecutor.schedule(new RepairRunner(quorumResponseHandler.resolver, command, endpoints), DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
             }
             catch (DigestMismatchException ex)
             {
-                AbstractReplicationStrategy rs = Table.open(command.table).getReplicationStrategy();
-                ReadResponseResolver resolver = new ReadResponseResolver(command.table, command.key);
-                QuorumResponseHandler<Row> handler = rs.getQuorumResponseHandler(resolver, consistency_level);
                 if (logger.isDebugEnabled())
                     logger.debug("Digest mismatch:", ex);
-                Message messageRepair = command.makeReadMessage();
-                MessagingService.instance().sendRR(messageRepair, commandEndpoints.get(i), handler);
+                RepairCallback<Row> handler = repair(command, endpoints);
                 if (repairResponseHandlers == null)
-                    repairResponseHandlers = new ArrayList<QuorumResponseHandler<Row>>();
+                    repairResponseHandlers = new ArrayList<RepairCallback<Row>>();
                 repairResponseHandlers.add(handler);
             }
         }
@@ -476,7 +413,7 @@ public class StorageProxy implements Sto
         // read the results for the digest mismatch retries
         if (repairResponseHandlers != null)
         {
-            for (QuorumResponseHandler<Row> handler : repairResponseHandlers)
+            for (RepairCallback<Row> handler : repairResponseHandlers)
             {
                 try
                 {
@@ -494,6 +431,17 @@ public class StorageProxy implements Sto
         return rows;
     }
 
+    // TODO repair resolver shouldn't take consistencylevel (it should repair exactly as many as it receives replies for)
+    private static RepairCallback<Row> repair(ReadCommand command, List<InetAddress> endpoints)
+    throws IOException
+    {
+        ReadResponseResolver resolver = new ReadResponseResolver(command.table, command.key);
+        RepairCallback<Row> handler = new RepairCallback<Row>(resolver, endpoints);
+        Message messageRepair = command.makeReadMessage();
+        MessagingService.instance().sendRR(messageRepair, endpoints, handler);
+        return handler;
+    }
+
     /*
     * This function executes the read protocol locally.  Consistency checks are performed in the background.
     */
@@ -845,31 +793,6 @@ public class StorageProxy implements Sto
         return hintedHandoffEnabled;
     }
 
-    static class weakReadLocalCallable implements Callable<Object>
-    {
-        private ReadCommand command;
-
-        weakReadLocalCallable(ReadCommand command)
-        {
-            this.command = command;
-        }
-
-        public Object call() throws IOException
-        {
-            if (logger.isDebugEnabled())
-                logger.debug("weakreadlocal reading " + command);
-
-            Table table = Table.open(command.table);
-            Row row = command.getRow(table);
-
-            // Do the consistency checks in the background
-            if (randomlyReadRepair(command))
-                StorageService.instance.doConsistencyCheck(row, command, FBUtilities.getLocalAddress());
-
-            return row;
-        }
-    }
-
     /**
      * Performs the truncate operatoin, which effectively deletes all data from
      * the column family cfname
@@ -915,4 +838,32 @@ public class StorageProxy implements Sto
     {
         return !Gossiper.instance.getUnreachableMembers().isEmpty();
     }
+
+    private static class RepairRunner extends WrappedRunnable
+    {
+        private final IResponseResolver<Row> resolver;
+        private final ReadCommand command;
+        private final List<InetAddress> endpoints;
+
+        public RepairRunner(IResponseResolver<Row> resolver, ReadCommand command, List<InetAddress> endpoints)
+        {
+            this.resolver = resolver;
+            this.command = command;
+            this.endpoints = endpoints;
+        }
+
+        protected void runMayThrow() throws IOException
+        {
+            try
+            {
+                resolver.resolve();
+            }
+            catch (DigestMismatchException e)
+            {
+                if (logger.isDebugEnabled())
+                    logger.debug("Digest mismatch:", e);
+                repair(command, endpoints);
+            }
+        }
+    }
 }

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java?rev=1055319&r1=1055318&r2=1055319&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java Wed Jan  5 06:58:16 2011
@@ -449,18 +449,6 @@ public class StorageService implements I
     }
 
     /**
-     * This method performs the requisite operations to make
-     * sure that the N replicas are in sync. We do this in the
-     * background when we do not care much about consistency.
-     */
-    public void doConsistencyCheck(Row row, ReadCommand command, InetAddress dataSource)
-    {
-        List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
-        if (endpoints.size() > 1)
-            consistencyManager_.submit(new ConsistencyChecker(command, row, endpoints, dataSource));
-    }
-
-    /**
      * for a keyspace, return the ranges and corresponding hosts for a given keyspace.
      * @param keyspace
      * @return