You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/12/05 01:32:33 UTC

svn commit: r887468 - in /incubator/cassandra/trunk: ./ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/utils/

Author: jbellis
Date: Sat Dec  5 00:32:32 2009
New Revision: 887468

URL: http://svn.apache.org/viewvc?rev=887468&view=rev
Log:
add strong reads to range slicing
patch by jbellis; reviewed by Stu Hood for CASSANDRA-568

Added:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java   (with props)
Modified:
    incubator/cassandra/trunk/CHANGES.txt
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java

Modified: incubator/cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/CHANGES.txt?rev=887468&r1=887467&r2=887468&view=diff
==============================================================================
--- incubator/cassandra/trunk/CHANGES.txt (original)
+++ incubator/cassandra/trunk/CHANGES.txt Sat Dec  5 00:32:32 2009
@@ -11,6 +11,7 @@
  * respect JAVA_HOME in bin/ scripts (several tickets)
  * add StorageService.initClient for fat clients on the JVM (CASSANDRA-535)
    (see contrib/client_only for an example of use)
+ * make consistency_level functional in get_range_slice (CASSANDRA-568)
 
 
 0.5.0 beta

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java?rev=887468&r1=887467&r2=887468&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java Sat Dec  5 00:32:32 2009
@@ -81,6 +81,17 @@
         this.max_keys = max_keys;
     }
 
+    public RangeSliceCommand(String keyspace, String column_family, byte[] super_column, SlicePredicate predicate, DecoratedKey startKey, DecoratedKey finishKey, int max_keys)
+    {
+        this.keyspace = keyspace;
+        this.column_family = column_family;
+        this.super_column = super_column;
+        this.predicate = predicate;
+        this.startKey = startKey;
+        this.finishKey = finishKey;
+        this.max_keys = max_keys;
+    }
+
     public Message getMessage() throws IOException
     {
         DataOutputBuffer dob = new DataOutputBuffer();

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=887468&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java Sat Dec  5 00:32:32 2009
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.*;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.RangeSliceReply;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.net.Message;
+
+/**
+ * Turns RangeSliceReply objects into row (string -> CF) maps, resolving
+ * to the most recent ColumnFamily and setting up read repairs as necessary.
+ */
+public class RangeSliceResponseResolver implements IResponseResolver<Map<String, ColumnFamily>>
+{
+    private static final Logger logger_ = Logger.getLogger(RangeSliceResponseResolver.class);
+    private final String table;
+    private final Range range;
+    private final List<InetAddress> sources;
+    private boolean isCompleted;
+
+    public RangeSliceResponseResolver(String table, Range range, List<InetAddress> sources)
+    {
+        assert sources.size() > 0;
+        this.sources = sources;
+        this.range = range;
+        this.table = table;
+    }
+
+    public Map<String, ColumnFamily> resolve(List<Message> responses) throws DigestMismatchException, IOException
+    {
+        Map<InetAddress, Map<String, ColumnFamily>> replies = new HashMap<InetAddress, Map<String, ColumnFamily>>(responses.size());
+        Set<String> allKeys = new HashSet<String>();
+        for (Message response : responses)
+        {
+            RangeSliceReply reply = RangeSliceReply.read(response.getMessageBody());
+            isCompleted &= reply.rangeCompletedLocally;
+            Map<String, ColumnFamily> rows = new HashMap<String, ColumnFamily>(reply.rows.size());
+            for (Row row : reply.rows)
+            {
+                rows.put(row.key, row.cf);
+                allKeys.add(row.key);
+            }
+            replies.put(response.getFrom(), rows);
+        }
+
+        // for each row, compute the combination of all different versions seen, and repair incomplete versions
+        // TODO since the rows all arrive in sorted order, we should be able to do this more efficiently w/o all the Map conversion
+        Map<String, ColumnFamily> resolvedRows = new HashMap<String, ColumnFamily>(allKeys.size());
+        for (String key : allKeys)
+        {
+            List<ColumnFamily> versions = new ArrayList<ColumnFamily>(sources.size());
+            for (InetAddress endpoint : sources)
+            {
+                versions.add(replies.get(endpoint).get(key));
+            }
+            ColumnFamily resolved = ReadResponseResolver.resolveSuperset(versions);
+            ReadResponseResolver.maybeScheduleRepairs(resolved, table, key, versions, sources);
+            resolvedRows.put(key, resolved);
+        }
+        return resolvedRows;
+    }
+
+    public boolean isDataPresent(List<Message> responses)
+    {
+        return responses.size() >= sources.size();
+    }
+
+    /**
+     * only valid after resolve has been called (typically via QRH.get)
+     */
+    public boolean completed()
+    {
+        return isCompleted;
+    }
+}
\ No newline at end of file

Propchange: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=887468&r1=887467&r2=887468&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java Sat Dec  5 00:32:32 2009
@@ -37,6 +37,10 @@
 
 import org.apache.log4j.Logger;
 
+/**
+ * Turns ReadResponse messages into Row objects, resolving to the most recent
+ * version and setting up read repairs as necessary.
+ */
 public class ReadResponseResolver implements IResponseResolver<Row>
 {
 	private static Logger logger_ = Logger.getLogger(ReadResponseResolver.class);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=887468&r1=887467&r2=887468&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Sat Dec  5 00:32:32 2009
@@ -40,6 +40,8 @@
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.concurrent.StageManager;
 
@@ -533,39 +535,64 @@
         long startTime = System.currentTimeMillis();
         TokenMetadata tokenMetadata = StorageService.instance().getTokenMetadata();
 
-        InetAddress endPoint = StorageService.instance().findSuitableEndPoint(command.startKey.key);
+        InetAddress endPoint = StorageService.instance().getPrimary(command.startKey.token);
         InetAddress startEndpoint = endPoint;
+        int responseCount = determineBlockFor(DatabaseDescriptor.getReplicationFactor(), DatabaseDescriptor.getReplicationFactor(), consistency_level);
 
         Map<String, ColumnFamily> rows = new HashMap<String, ColumnFamily>(command.max_keys);
         do
         {
-            Message message = command.getMessage();
+            Range primaryRange = StorageService.instance().getPrimaryRangeForEndPoint(endPoint);
+            List<InetAddress> endpoints = StorageService.instance().getLiveNaturalEndpoints(primaryRange.right());
+            if (endpoints.size() < responseCount)
+                throw new UnavailableException();
+
+            // to make comparing the results from each node easy, we restrict each command to the data in the primary range for this iteration
+            IPartitioner<?> p = StorageService.getPartitioner();
+            DecoratedKey startKey;
+            DecoratedKey finishKey;
+            if (primaryRange.left().equals(primaryRange.right()))
+            {
+                startKey = command.startKey;
+                finishKey = command.finishKey;
+            }
+            else
+            {
+                startKey = Collections.max(Arrays.asList(command.startKey, new DecoratedKey(primaryRange.left(), null)));
+                finishKey = command.finishKey.isEmpty()
+                                       ? new DecoratedKey(primaryRange.right(), null)
+                                       : Collections.min(Arrays.asList(command.finishKey, new DecoratedKey(primaryRange.right(), null)));
+            }
+            RangeSliceCommand c2 = new RangeSliceCommand(command.keyspace, command.column_family, command.super_column, command.predicate, startKey, finishKey, command.max_keys);
+            Message message = c2.getMessage();
+
+            // collect replies and resolve according to consistency level
+            RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace, primaryRange, endpoints);
+            QuorumResponseHandler<Map<String, ColumnFamily>> handler = new QuorumResponseHandler<Map<String, ColumnFamily>>(responseCount, resolver);
             if (logger.isDebugEnabled())
-                logger.debug("reading " + command + " from " + message.getMessageId() + "@" + endPoint);
-            IAsyncResult iar = MessagingService.instance().sendRR(message, endPoint);
-            byte[] responseBody;
+                logger.debug("reading " + command + " for " + primaryRange + " from " + message.getMessageId() + "@" + endPoint);
+            for (InetAddress replicaEndpoint : endpoints)
+            {
+                MessagingService.instance().sendRR(message, replicaEndpoint, handler);
+            }
+
+            // if we're done, great, otherwise, move to the next range
             try
             {
-                responseBody = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
+                rows.putAll(handler.get());
             }
-            catch (TimeoutException ex)
+            catch (TimeoutException e)
             {
                 throw new TimedOutException();
             }
-            RangeSliceReply reply = RangeSliceReply.read(responseBody);
-            for (Row row : reply.rows)
+            catch (DigestMismatchException e)
             {
-                rows.put(row.key, ColumnFamily.resolve(row.cf, rows.get(row.key)));
+                throw new AssertionError(e); // no digests in range slices yet
             }
-
-            if (rows.size() >= command.max_keys || reply.rangeCompletedLocally)
+            if (rows.size() >= command.max_keys || resolver.completed())
                 break;
 
-            do
-            {
-                endPoint = tokenMetadata.getSuccessor(endPoint); // TODO move this into the Strategies & modify for RackAwareStrategy
-            }
-            while (!FailureDetector.instance().isAlive(endPoint));
+            endPoint = tokenMetadata.getSuccessor(endPoint);
         }
         while (!endPoint.equals(startEndpoint));
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=887468&r1=887467&r2=887468&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Sat Dec  5 00:32:32 2009
@@ -801,8 +801,12 @@
      */
     public InetAddress getPrimary(String key)
     {
+        return getPrimary(partitioner_.getToken(key));
+    }
+
+    public InetAddress getPrimary(Token token)
+    {
         InetAddress endpoint = FBUtilities.getLocalAddress();
-        Token token = partitioner_.getToken(key);
         List tokens = new ArrayList<Token>(tokenMetadata_.sortedTokens());
         if (tokens.size() > 0)
         {
@@ -849,8 +853,8 @@
     public List<InetAddress> getNaturalEndpoints(String key)
     {
         return replicationStrategy_.getNaturalEndpoints(partitioner_.getToken(key));
-    }    
-    
+    }
+
     /**
      * This method attempts to return N endpoints that are responsible for storing the
      * specified key i.e for replication.
@@ -860,15 +864,20 @@
      */
     public List<InetAddress> getLiveNaturalEndpoints(String key)
     {
+        return getLiveNaturalEndpoints(partitioner_.getToken(key));
+    }
+
+    public List<InetAddress> getLiveNaturalEndpoints(Token token)
+    {
         List<InetAddress> liveEps = new ArrayList<InetAddress>();
-        List<InetAddress> endpoints = getNaturalEndpoints(key);
-        
-        for ( InetAddress endpoint : endpoints )
+        List<InetAddress> endpoints = replicationStrategy_.getNaturalEndpoints(token);
+
+        for (InetAddress endpoint : endpoints)
         {
-            if ( FailureDetector.instance().isAlive(endpoint) )
+            if (FailureDetector.instance().isAlive(endpoint))
                 liveEps.add(endpoint);
         }
-        
+
         return liveEps;
     }
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java?rev=887468&r1=887467&r2=887468&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java Sat Dec  5 00:32:32 2009
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.cassandra.utils;
 
 public class Pair<T1, T2>