You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/12/05 01:32:33 UTC
svn commit: r887468 - in /incubator/cassandra/trunk: ./
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/service/
src/java/org/apache/cassandra/utils/
Author: jbellis
Date: Sat Dec 5 00:32:32 2009
New Revision: 887468
URL: http://svn.apache.org/viewvc?rev=887468&view=rev
Log:
add strong reads to range slicing
patch by jbellis; reviewed by Stu Hood for CASSANDRA-568
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java (with props)
Modified:
incubator/cassandra/trunk/CHANGES.txt
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java
Modified: incubator/cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/CHANGES.txt?rev=887468&r1=887467&r2=887468&view=diff
==============================================================================
--- incubator/cassandra/trunk/CHANGES.txt (original)
+++ incubator/cassandra/trunk/CHANGES.txt Sat Dec 5 00:32:32 2009
@@ -11,6 +11,7 @@
* respect JAVA_HOME in bin/ scripts (several tickets)
* add StorageService.initClient for fat clients on the JVM (CASSANDRA-535)
(see contrib/client_only for an example of use)
+ * make consistency_level functional in get_range_slice (CASSANDRA-568)
0.5.0 beta
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java?rev=887468&r1=887467&r2=887468&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java Sat Dec 5 00:32:32 2009
@@ -81,6 +81,17 @@
this.max_keys = max_keys;
}
+ public RangeSliceCommand(String keyspace, String column_family, byte[] super_column, SlicePredicate predicate, DecoratedKey startKey, DecoratedKey finishKey, int max_keys)
+ {
+ this.keyspace = keyspace;
+ this.column_family = column_family;
+ this.super_column = super_column;
+ this.predicate = predicate;
+ this.startKey = startKey;
+ this.finishKey = finishKey;
+ this.max_keys = max_keys;
+ }
+
public Message getMessage() throws IOException
{
DataOutputBuffer dob = new DataOutputBuffer();
Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=887468&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java Sat Dec 5 00:32:32 2009
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.*;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.RangeSliceReply;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.net.Message;
+
+/**
+ * Turns RangeSliceReply objects into row (string -> CF) maps, resolving
+ * to the most recent ColumnFamily and setting up read repairs as necessary.
+ */
+public class RangeSliceResponseResolver implements IResponseResolver<Map<String, ColumnFamily>>
+{
+ private static final Logger logger_ = Logger.getLogger(RangeSliceResponseResolver.class);
+ private final String table;
+ private final Range range;
+ private final List<InetAddress> sources;
+ private boolean isCompleted;
+
+ public RangeSliceResponseResolver(String table, Range range, List<InetAddress> sources)
+ {
+ assert sources.size() > 0;
+ this.sources = sources;
+ this.range = range;
+ this.table = table;
+ }
+
+ public Map<String, ColumnFamily> resolve(List<Message> responses) throws DigestMismatchException, IOException
+ {
+ Map<InetAddress, Map<String, ColumnFamily>> replies = new HashMap<InetAddress, Map<String, ColumnFamily>>(responses.size());
+ Set<String> allKeys = new HashSet<String>();
+ for (Message response : responses)
+ {
+ RangeSliceReply reply = RangeSliceReply.read(response.getMessageBody());
+ isCompleted &= reply.rangeCompletedLocally;
+ Map<String, ColumnFamily> rows = new HashMap<String, ColumnFamily>(reply.rows.size());
+ for (Row row : reply.rows)
+ {
+ rows.put(row.key, row.cf);
+ allKeys.add(row.key);
+ }
+ replies.put(response.getFrom(), rows);
+ }
+
+ // for each row, compute the combination of all different versions seen, and repair incomplete versions
+ // TODO since the rows all arrive in sorted order, we should be able to do this more efficiently w/o all the Map conversion
+ Map<String, ColumnFamily> resolvedRows = new HashMap<String, ColumnFamily>(allKeys.size());
+ for (String key : allKeys)
+ {
+ List<ColumnFamily> versions = new ArrayList<ColumnFamily>(sources.size());
+ for (InetAddress endpoint : sources)
+ {
+ versions.add(replies.get(endpoint).get(key));
+ }
+ ColumnFamily resolved = ReadResponseResolver.resolveSuperset(versions);
+ ReadResponseResolver.maybeScheduleRepairs(resolved, table, key, versions, sources);
+ resolvedRows.put(key, resolved);
+ }
+ return resolvedRows;
+ }
+
+ public boolean isDataPresent(List<Message> responses)
+ {
+ return responses.size() >= sources.size();
+ }
+
+ /**
+ * only valid after resolve has been called (typically via QRH.get)
+ */
+ public boolean completed()
+ {
+ return isCompleted;
+ }
+}
\ No newline at end of file
Propchange: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=887468&r1=887467&r2=887468&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java Sat Dec 5 00:32:32 2009
@@ -37,6 +37,10 @@
import org.apache.log4j.Logger;
+/**
+ * Turns ReadResponse messages into Row objects, resolving to the most recent
+ * version and setting up read repairs as necessary.
+ */
public class ReadResponseResolver implements IResponseResolver<Row>
{
private static Logger logger_ = Logger.getLogger(ReadResponseResolver.class);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=887468&r1=887467&r2=887468&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Sat Dec 5 00:32:32 2009
@@ -40,6 +40,8 @@
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.concurrent.StageManager;
@@ -533,39 +535,64 @@
long startTime = System.currentTimeMillis();
TokenMetadata tokenMetadata = StorageService.instance().getTokenMetadata();
- InetAddress endPoint = StorageService.instance().findSuitableEndPoint(command.startKey.key);
+ InetAddress endPoint = StorageService.instance().getPrimary(command.startKey.token);
InetAddress startEndpoint = endPoint;
+ int responseCount = determineBlockFor(DatabaseDescriptor.getReplicationFactor(), DatabaseDescriptor.getReplicationFactor(), consistency_level);
Map<String, ColumnFamily> rows = new HashMap<String, ColumnFamily>(command.max_keys);
do
{
- Message message = command.getMessage();
+ Range primaryRange = StorageService.instance().getPrimaryRangeForEndPoint(endPoint);
+ List<InetAddress> endpoints = StorageService.instance().getLiveNaturalEndpoints(primaryRange.right());
+ if (endpoints.size() < responseCount)
+ throw new UnavailableException();
+
+ // to make comparing the results from each node easy, we restrict each command to the data in the primary range for this iteration
+ IPartitioner<?> p = StorageService.getPartitioner();
+ DecoratedKey startKey;
+ DecoratedKey finishKey;
+ if (primaryRange.left().equals(primaryRange.right()))
+ {
+ startKey = command.startKey;
+ finishKey = command.finishKey;
+ }
+ else
+ {
+ startKey = Collections.max(Arrays.asList(command.startKey, new DecoratedKey(primaryRange.left(), null)));
+ finishKey = command.finishKey.isEmpty()
+ ? new DecoratedKey(primaryRange.right(), null)
+ : Collections.min(Arrays.asList(command.finishKey, new DecoratedKey(primaryRange.right(), null)));
+ }
+ RangeSliceCommand c2 = new RangeSliceCommand(command.keyspace, command.column_family, command.super_column, command.predicate, startKey, finishKey, command.max_keys);
+ Message message = c2.getMessage();
+
+ // collect replies and resolve according to consistency level
+ RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace, primaryRange, endpoints);
+ QuorumResponseHandler<Map<String, ColumnFamily>> handler = new QuorumResponseHandler<Map<String, ColumnFamily>>(responseCount, resolver);
if (logger.isDebugEnabled())
- logger.debug("reading " + command + " from " + message.getMessageId() + "@" + endPoint);
- IAsyncResult iar = MessagingService.instance().sendRR(message, endPoint);
- byte[] responseBody;
+ logger.debug("reading " + command + " for " + primaryRange + " from " + message.getMessageId() + "@" + endPoint);
+ for (InetAddress replicaEndpoint : endpoints)
+ {
+ MessagingService.instance().sendRR(message, replicaEndpoint, handler);
+ }
+
+ // if we're done, great, otherwise, move to the next range
try
{
- responseBody = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
+ rows.putAll(handler.get());
}
- catch (TimeoutException ex)
+ catch (TimeoutException e)
{
throw new TimedOutException();
}
- RangeSliceReply reply = RangeSliceReply.read(responseBody);
- for (Row row : reply.rows)
+ catch (DigestMismatchException e)
{
- rows.put(row.key, ColumnFamily.resolve(row.cf, rows.get(row.key)));
+ throw new AssertionError(e); // no digests in range slices yet
}
-
- if (rows.size() >= command.max_keys || reply.rangeCompletedLocally)
+ if (rows.size() >= command.max_keys || resolver.completed())
break;
- do
- {
- endPoint = tokenMetadata.getSuccessor(endPoint); // TODO move this into the Strategies & modify for RackAwareStrategy
- }
- while (!FailureDetector.instance().isAlive(endPoint));
+ endPoint = tokenMetadata.getSuccessor(endPoint);
}
while (!endPoint.equals(startEndpoint));
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=887468&r1=887467&r2=887468&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Sat Dec 5 00:32:32 2009
@@ -801,8 +801,12 @@
*/
public InetAddress getPrimary(String key)
{
+ return getPrimary(partitioner_.getToken(key));
+ }
+
+ public InetAddress getPrimary(Token token)
+ {
InetAddress endpoint = FBUtilities.getLocalAddress();
- Token token = partitioner_.getToken(key);
List tokens = new ArrayList<Token>(tokenMetadata_.sortedTokens());
if (tokens.size() > 0)
{
@@ -849,8 +853,8 @@
public List<InetAddress> getNaturalEndpoints(String key)
{
return replicationStrategy_.getNaturalEndpoints(partitioner_.getToken(key));
- }
-
+ }
+
/**
* This method attempts to return N endpoints that are responsible for storing the
* specified key i.e for replication.
@@ -860,15 +864,20 @@
*/
public List<InetAddress> getLiveNaturalEndpoints(String key)
{
+ return getLiveNaturalEndpoints(partitioner_.getToken(key));
+ }
+
+ public List<InetAddress> getLiveNaturalEndpoints(Token token)
+ {
List<InetAddress> liveEps = new ArrayList<InetAddress>();
- List<InetAddress> endpoints = getNaturalEndpoints(key);
-
- for ( InetAddress endpoint : endpoints )
+ List<InetAddress> endpoints = replicationStrategy_.getNaturalEndpoints(token);
+
+ for (InetAddress endpoint : endpoints)
{
- if ( FailureDetector.instance().isAlive(endpoint) )
+ if (FailureDetector.instance().isAlive(endpoint))
liveEps.add(endpoint);
}
-
+
return liveEps;
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java?rev=887468&r1=887467&r2=887468&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java Sat Dec 5 00:32:32 2009
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.cassandra.utils;
public class Pair<T1, T2>