You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/02/15 21:50:22 UTC
svn commit: r1071046 - in /cassandra/branches/cassandra-0.7.2: ./
src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/net/
src/java/org/apache/cassandra/service/ t...
Author: jbellis
Date: Tue Feb 15 20:50:21 2011
New Revision: 1071046
URL: http://svn.apache.org/viewvc?rev=1071046&view=rev
Log:
revert #2069
Added:
cassandra/branches/cassandra-0.7.2/test/unit/org/apache/cassandra/service/ReadResponseResolverTest.java
- copied, changed from r1071027, cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ReadResponseResolverTest.java
Removed:
cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/AbstractRowResolver.java
cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/IReadCommand.java
cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/RowDigestResolver.java
cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/RowRepairResolver.java
cassandra/branches/cassandra-0.7.2/test/unit/org/apache/cassandra/service/RowResolverTest.java
Modified:
cassandra/branches/cassandra-0.7.2/CHANGES.txt
cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/concurrent/Stage.java
cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/concurrent/StageManager.java
cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/db/RangeSliceCommand.java
cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/db/ReadCommand.java
cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/dht/BootStrapper.java
cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/net/AsyncResult.java
cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/net/IMessageCallback.java
cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/ReadCallback.java
cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/ReadResponseResolver.java
cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/RepairCallback.java
cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/WriteResponseHandler.java
cassandra/branches/cassandra-0.7.2/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
Modified: cassandra/branches/cassandra-0.7.2/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.2/CHANGES.txt?rev=1071046&r1=1071045&r2=1071046&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7.2/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7.2/CHANGES.txt Tue Feb 15 20:50:21 2011
@@ -2,7 +2,6 @@
* copy DecoratedKey.key when inserting into caches to avoid retaining
a reference to the underlying buffer (CASSANDRA-2102)
* format subcolumn names with subcomparator (CASSANDRA-2136)
- * lower-latency read repair (CASSANDRA-2069)
* fix column bloom filter deserialization (CASSANDRA-2165)
Modified: cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/concurrent/Stage.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/concurrent/Stage.java?rev=1071046&r1=1071045&r2=1071046&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/concurrent/Stage.java (original)
+++ cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/concurrent/Stage.java Tue Feb 15 20:50:21 2011
@@ -31,8 +31,7 @@ public enum Stage
ANTI_ENTROPY,
MIGRATION,
MISC,
- INTERNAL_RESPONSE,
- READ_REPAIR;
+ INTERNAL_RESPONSE;
public String getJmxType()
{
@@ -48,7 +47,6 @@ public enum Stage
case MUTATION:
case READ:
case REQUEST_RESPONSE:
- case READ_REPAIR:
return "request";
default:
throw new AssertionError("Unknown stage " + this);
Modified: cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/concurrent/StageManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/concurrent/StageManager.java?rev=1071046&r1=1071045&r2=1071046&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/concurrent/StageManager.java (original)
+++ cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/concurrent/StageManager.java Tue Feb 15 20:50:21 2011
@@ -50,7 +50,6 @@ public class StageManager
stages.put(Stage.ANTI_ENTROPY, new JMXEnabledThreadPoolExecutor(Stage.ANTI_ENTROPY));
stages.put(Stage.MIGRATION, new JMXEnabledThreadPoolExecutor(Stage.MIGRATION));
stages.put(Stage.MISC, new JMXEnabledThreadPoolExecutor(Stage.MISC));
- stages.put(Stage.READ_REPAIR, multiThreadedStage(Stage.READ_REPAIR, Runtime.getRuntime().availableProcessors()));
}
private static ThreadPoolExecutor multiThreadedStage(Stage stage, int numThreads)
Modified: cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/db/RangeSliceCommand.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/db/RangeSliceCommand.java?rev=1071046&r1=1071045&r2=1071046&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/db/RangeSliceCommand.java (original)
+++ cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/db/RangeSliceCommand.java Tue Feb 15 20:50:21 2011
@@ -47,7 +47,6 @@ import org.apache.cassandra.dht.Abstract
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.net.Message;
-import org.apache.cassandra.service.IReadCommand;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.SlicePredicate;
@@ -57,7 +56,7 @@ import org.apache.thrift.TDeserializer;
import org.apache.thrift.TSerializer;
import org.apache.cassandra.thrift.TBinaryProtocol;
-public class RangeSliceCommand implements IReadCommand
+public class RangeSliceCommand
{
private static final RangeSliceCommandSerializer serializer = new RangeSliceCommandSerializer();
@@ -114,11 +113,6 @@ public class RangeSliceCommand implement
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
return serializer.deserialize(new DataInputStream(bis));
}
-
- public String getKeyspace()
- {
- return keyspace;
- }
}
class RangeSliceCommandSerializer implements ICompactSerializer<RangeSliceCommand>
Modified: cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/db/ReadCommand.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/db/ReadCommand.java?rev=1071046&r1=1071045&r2=1071046&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/db/ReadCommand.java (original)
+++ cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/db/ReadCommand.java Tue Feb 15 20:50:21 2011
@@ -30,12 +30,11 @@ import org.apache.cassandra.db.filter.Qu
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.net.Message;
-import org.apache.cassandra.service.IReadCommand;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
-public abstract class ReadCommand implements IReadCommand
+public abstract class ReadCommand
{
public static final byte CMD_TYPE_GET_SLICE_BY_NAMES = 1;
public static final byte CMD_TYPE_GET_SLICE = 2;
@@ -92,11 +91,6 @@ public abstract class ReadCommand implem
{
return ColumnFamily.getComparatorFor(table, getColumnFamilyName(), queryPath.superColumnName);
}
-
- public String getKeyspace()
- {
- return table;
- }
}
class ReadCommandSerializer implements ICompactSerializer<ReadCommand>
Modified: cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=1071046&r1=1071045&r2=1071046&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/dht/BootStrapper.java Tue Feb 15 20:50:21 2011
@@ -282,10 +282,5 @@ public class BootStrapper
token = StorageService.getPartitioner().getTokenFactory().fromString(new String(msg.getMessageBody(), Charsets.UTF_8));
condition.signalAll();
}
-
- public boolean isLatencyForSnitch()
- {
- return false;
- }
}
}
Modified: cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/net/AsyncResult.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/net/AsyncResult.java?rev=1071046&r1=1071045&r2=1071046&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/net/AsyncResult.java (original)
+++ cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/net/AsyncResult.java Tue Feb 15 20:50:21 2011
@@ -96,11 +96,6 @@ class AsyncResult implements IAsyncResul
}
}
- public boolean isLatencyForSnitch()
- {
- return false;
- }
-
public InetAddress getFrom()
{
return from;
Modified: cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/net/IMessageCallback.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/net/IMessageCallback.java?rev=1071046&r1=1071045&r2=1071046&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/net/IMessageCallback.java (original)
+++ cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/net/IMessageCallback.java Tue Feb 15 20:50:21 2011
@@ -23,9 +23,4 @@ package org.apache.cassandra.net;
public interface IMessageCallback
{
- /**
- * @return true if this callback is on the read path and its latency should be
- * given as input to the dynamic snitch.
- */
- public boolean isLatencyForSnitch();
}
Modified: cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/net/MessagingService.java?rev=1071046&r1=1071045&r2=1071046&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/net/MessagingService.java Tue Feb 15 20:50:21 2011
@@ -138,7 +138,7 @@ public final class MessagingService impl
*/
public void maybeAddLatency(IMessageCallback cb, InetAddress address, double latency)
{
- if (cb.isLatencyForSnitch())
+ if (cb instanceof ReadCallback || cb instanceof AsyncResult)
addLatency(address, latency);
}
Modified: cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/DatacenterReadCallback.java?rev=1071046&r1=1071045&r2=1071046&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/DatacenterReadCallback.java (original)
+++ cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/DatacenterReadCallback.java Tue Feb 15 20:50:21 2011
@@ -22,12 +22,12 @@ package org.apache.cassandra.service;
import java.net.InetAddress;
-import java.util.List;
+import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.NetworkTopologyStrategy;
import org.apache.cassandra.net.Message;
@@ -44,12 +44,12 @@ public class DatacenterReadCallback<T> e
private static final String localdc = snitch.getDatacenter(FBUtilities.getLocalAddress());
private AtomicInteger localResponses;
- public DatacenterReadCallback(IResponseResolver resolver, ConsistencyLevel consistencyLevel, IReadCommand command, List<InetAddress> endpoints)
+ public DatacenterReadCallback(IResponseResolver<T> resolver, ConsistencyLevel consistencyLevel, String table)
{
- super(resolver, consistencyLevel, command, endpoints);
+ super(resolver, consistencyLevel, table);
localResponses = new AtomicInteger(blockfor);
}
-
+
@Override
public void response(Message message)
{
@@ -68,15 +68,14 @@ public class DatacenterReadCallback<T> e
@Override
public void response(ReadResponse result)
{
- ((RowDigestResolver) resolver).injectPreProcessed(result);
+ ((ReadResponseResolver) resolver).injectPreProcessed(result);
int n = localResponses.decrementAndGet();
+
if (n == 0 && resolver.isDataPresent())
{
condition.signal();
}
-
- maybeResolveForRepair();
}
@Override
@@ -87,7 +86,7 @@ public class DatacenterReadCallback<T> e
}
@Override
- public void assureSufficientLiveNodes() throws UnavailableException
+ public void assureSufficientLiveNodes(Collection<InetAddress> endpoints) throws UnavailableException
{
int localEndpoints = 0;
for (InetAddress endpoint : endpoints)
Modified: cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java?rev=1071046&r1=1071045&r2=1071046&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java (original)
+++ cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java Tue Feb 15 20:50:21 2011
@@ -115,9 +115,4 @@ public class DatacenterSyncWriteResponse
throw new UnavailableException();
}
}
-
- public boolean isLatencyForSnitch()
- {
- return false;
- }
}
Modified: cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1071046&r1=1071045&r2=1071046&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java (original)
+++ cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java Tue Feb 15 20:50:21 2011
@@ -49,6 +49,7 @@ public class RangeSliceResponseResolver
public RangeSliceResponseResolver(String table, List<InetAddress> sources)
{
+ assert sources.size() > 0;
this.sources = sources;
this.table = table;
}
@@ -102,8 +103,8 @@ public class RangeSliceResponseResolver
protected Row getReduced()
{
- ColumnFamily resolved = RowRepairResolver.resolveSuperset(versions);
- RowRepairResolver.maybeScheduleRepairs(resolved, table, key, versions, versionSources);
+ ColumnFamily resolved = ReadResponseResolver.resolveSuperset(versions);
+ ReadResponseResolver.maybeScheduleRepairs(resolved, table, key, versions, versionSources);
versions.clear();
versionSources.clear();
return new Row(key, resolved);
Modified: cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/ReadCallback.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/ReadCallback.java?rev=1071046&r1=1071045&r2=1071046&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/ReadCallback.java (original)
+++ cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/ReadCallback.java Tue Feb 15 20:50:21 2011
@@ -20,20 +20,14 @@ package org.apache.cassandra.service;
import java.io.IOException;
import java.net.InetAddress;
-import java.util.List;
-import java.util.Random;
+import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.net.IAsyncCallback;
@@ -42,61 +36,28 @@ import org.apache.cassandra.net.Messagin
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.UnavailableException;
import org.apache.cassandra.utils.SimpleCondition;
-import org.apache.cassandra.utils.WrappedRunnable;
public class ReadCallback<T> implements IAsyncCallback
{
protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class );
- private static final ThreadLocal<Random> random = new ThreadLocal<Random>()
- {
- @Override
- protected Random initialValue()
- {
- return new Random();
- }
- };
-
public final IResponseResolver<T> resolver;
protected final SimpleCondition condition = new SimpleCondition();
private final long startTime;
protected final int blockfor;
- final List<InetAddress> endpoints;
- private final IReadCommand command;
-
+
/**
* Constructor when response count has to be calculated and blocked for.
*/
- public ReadCallback(IResponseResolver<T> resolver, ConsistencyLevel consistencyLevel, IReadCommand command, List<InetAddress> endpoints)
+ public ReadCallback(IResponseResolver<T> resolver, ConsistencyLevel consistencyLevel, String table)
{
- this.command = command;
- this.blockfor = determineBlockFor(consistencyLevel, command.getKeyspace());
+ this.blockfor = determineBlockFor(consistencyLevel, table);
this.resolver = resolver;
this.startTime = System.currentTimeMillis();
- boolean repair = randomlyReadRepair();
- this.endpoints = repair || resolver instanceof RowRepairResolver
- ? endpoints
- : endpoints.subList(0, Math.min(endpoints.size(), blockfor)); // min so as to not throw exception until assureSufficient is called
-
- if (logger.isDebugEnabled())
- logger.debug(String.format("Blockfor/repair is %s/%s; setting up requests to %s",
- blockfor, repair, StringUtils.join(this.endpoints, ",")));
+
+ logger.debug("ReadCallback blocking for {} responses", blockfor);
}
- private boolean randomlyReadRepair()
- {
- if (resolver instanceof RowDigestResolver)
- {
- assert command instanceof ReadCommand : command;
- String table = ((RowDigestResolver) resolver).table;
- String columnFamily = ((ReadCommand) command).getColumnFamilyName();
- CFMetaData cfmd = DatabaseDescriptor.getTableMetaData(table).get(columnFamily);
- return cfmd.getReadRepairChance() > random.get().nextDouble();
- }
- // we don't read repair on range scans
- return false;
- }
-
public T get() throws TimeoutException, DigestMismatchException, IOException
{
long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime);
@@ -124,42 +85,21 @@ public class ReadCallback<T> implements
public void response(Message message)
{
resolver.preprocess(message);
- assert resolver.getMessageCount() <= endpoints.size();
if (resolver.getMessageCount() < blockfor)
return;
if (resolver.isDataPresent())
- {
condition.signal();
- maybeResolveForRepair();
- }
}
public void response(ReadResponse result)
{
- ((RowDigestResolver) resolver).injectPreProcessed(result);
- assert resolver.getMessageCount() <= endpoints.size();
+ ((ReadResponseResolver) resolver).injectPreProcessed(result);
if (resolver.getMessageCount() < blockfor)
return;
if (resolver.isDataPresent())
- {
condition.signal();
- maybeResolveForRepair();
- }
}
-
- /**
- * Check digests in the background on the Repair stage if we've received replies
- * too all the requests we sent.
- */
- protected void maybeResolveForRepair()
- {
- if (blockfor < endpoints.size() && resolver.getMessageCount() == endpoints.size())
- {
- assert resolver.isDataPresent();
- StageManager.getStage(Stage.READ_REPAIR).execute(new AsyncRepairRunner());
- }
- }
-
+
public int determineBlockFor(ConsistencyLevel consistencyLevel, String table)
{
switch (consistencyLevel)
@@ -176,38 +116,9 @@ public class ReadCallback<T> implements
}
}
- public void assureSufficientLiveNodes() throws UnavailableException
+ public void assureSufficientLiveNodes(Collection<InetAddress> endpoints) throws UnavailableException
{
if (endpoints.size() < blockfor)
throw new UnavailableException();
}
-
- public boolean isLatencyForSnitch()
- {
- return true;
- }
-
- private class AsyncRepairRunner extends WrappedRunnable
- {
- protected void runMayThrow() throws IOException
- {
- try
- {
- resolver.resolve();
- }
- catch (DigestMismatchException e)
- {
- if (logger.isDebugEnabled())
- logger.debug("Digest mismatch:", e);
-
- ReadCommand readCommand = (ReadCommand) command;
- final RowRepairResolver repairResolver = new RowRepairResolver(readCommand.table, readCommand.key);
- IAsyncCallback repairHandler = new AsyncRepairCallback(repairResolver, endpoints.size());
-
- Message messageRepair = readCommand.makeReadMessage();
- for (InetAddress endpoint : endpoints)
- MessagingService.instance().sendRR(messageRepair, endpoint, repairHandler);
- }
- }
- }
}
Modified: cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=1071046&r1=1071045&r2=1071046&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/ReadResponseResolver.java (original)
+++ cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/ReadResponseResolver.java Tue Feb 15 20:50:21 2011
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOError;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+
+/**
+ * Turns ReadResponse messages into Row objects, resolving to the most recent
+ * version and setting up read repairs as necessary.
+ */
+public class ReadResponseResolver implements IResponseResolver<Row>
+{
+ private static Logger logger_ = LoggerFactory.getLogger(ReadResponseResolver.class);
+ private final String table;
+ private final ConcurrentMap<Message, ReadResponse> results = new NonBlockingHashMap<Message, ReadResponse>();
+ private DecoratedKey key;
+ private ByteBuffer digest;
+ private static final Message FAKE_MESSAGE = new Message(FBUtilities.getLocalAddress(), StorageService.Verb.INTERNAL_RESPONSE, ArrayUtils.EMPTY_BYTE_ARRAY);;
+
+ public ReadResponseResolver(String table, ByteBuffer key)
+ {
+ this.table = table;
+ this.key = StorageService.getPartitioner().decorateKey(key);
+ }
+
+ public Row getData() throws IOException
+ {
+ for (Map.Entry<Message, ReadResponse> entry : results.entrySet())
+ {
+ ReadResponse result = entry.getValue();
+ if (!result.isDigestQuery())
+ return result.row();
+ }
+
+ throw new AssertionError("getData should not be invoked when no data is present");
+ }
+
+ /*
+ * This method handles three different scenarios:
+ *
+ * 1a)we're handling the initial read, of data from the closest replica + digests
+ * from the rest. In this case we check the digests against each other,
+ * throw an exception if there is a mismatch, otherwise return the data row.
+ *
+ * 1b)we're checking additional digests that arrived after the minimum to handle
+ * the requested ConsistencyLevel, i.e. asynchronouse read repair check
+ *
+ * 2) there was a mismatch on the initial read (1a or 1b), so we redid the digest requests
+ * as full data reads. In this case we need to compute the most recent version
+ * of each column, and send diffs to out-of-date replicas.
+ */
+ public Row resolve() throws DigestMismatchException, IOException
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug("resolving " + results.size() + " responses");
+
+ long startTime = System.currentTimeMillis();
+ List<ColumnFamily> versions = new ArrayList<ColumnFamily>();
+ List<InetAddress> endpoints = new ArrayList<InetAddress>();
+
+ // case 1: validate digests against each other; throw immediately on mismatch.
+ // also, collects data results into versions/endpoints lists.
+ //
+ // results are cleared as we process them, to avoid unnecessary duplication of work
+ // when resolve() is called a second time for read repair on responses that were not
+ // necessary to satisfy ConsistencyLevel.
+ for (Map.Entry<Message, ReadResponse> entry : results.entrySet())
+ {
+ ReadResponse result = entry.getValue();
+ Message message = entry.getKey();
+ if (result.isDigestQuery())
+ {
+ if (digest == null)
+ {
+ digest = result.digest();
+ }
+ else
+ {
+ ByteBuffer digest2 = result.digest();
+ if (!digest.equals(digest2))
+ throw new DigestMismatchException(key, digest, digest2);
+ }
+ }
+ else
+ {
+ versions.add(result.row().cf);
+ endpoints.add(message.getFrom());
+ }
+
+ results.remove(message);
+ }
+
+ // If there was a digest query compare it with all the data digests
+ // If there is a mismatch then throw an exception so that read repair can happen.
+ //
+ // It's important to note that we do not compare the digests of multiple data responses --
+ // if we are in that situation we know there was a previous mismatch and now we're doing a repair,
+ // so our job is now case 2: figure out what the most recent version is and update everyone to that version.
+ if (digest != null)
+ {
+ for (ColumnFamily cf : versions)
+ {
+ ByteBuffer digest2 = ColumnFamily.digest(cf);
+ if (!digest.equals(digest2))
+ throw new DigestMismatchException(key, digest, digest2);
+ }
+ if (logger_.isDebugEnabled())
+ logger_.debug("digests verified");
+ }
+
+ ColumnFamily resolved;
+ if (versions.size() > 1)
+ {
+ resolved = resolveSuperset(versions);
+ if (logger_.isDebugEnabled())
+ logger_.debug("versions merged");
+ maybeScheduleRepairs(resolved, table, key, versions, endpoints);
+ }
+ else
+ {
+ resolved = versions.get(0);
+ }
+
+ if (logger_.isDebugEnabled())
+ logger_.debug("resolve: " + (System.currentTimeMillis() - startTime) + " ms.");
+ return new Row(key, resolved);
+ }
+
+ /**
+ * For each row version, compare with resolved (the superset of all row versions);
+ * if it is missing anything, send a mutation to the endpoint it come from.
+ */
+ public static void maybeScheduleRepairs(ColumnFamily resolved, String table, DecoratedKey key, List<ColumnFamily> versions, List<InetAddress> endpoints)
+ {
+ for (int i = 0; i < versions.size(); i++)
+ {
+ ColumnFamily diffCf = ColumnFamily.diff(versions.get(i), resolved);
+ if (diffCf == null) // no repair needs to happen
+ continue;
+
+ // create and send the row mutation message based on the diff
+ RowMutation rowMutation = new RowMutation(table, key.key);
+ rowMutation.add(diffCf);
+ Message repairMessage;
+ try
+ {
+ repairMessage = rowMutation.makeRowMutationMessage(StorageService.Verb.READ_REPAIR);
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ MessagingService.instance().sendOneWay(repairMessage, endpoints.get(i));
+ }
+ }
+
+ static ColumnFamily resolveSuperset(List<ColumnFamily> versions)
+ {
+ assert versions.size() > 0;
+
+ ColumnFamily resolved = null;
+ for (ColumnFamily cf : versions)
+ {
+ if (cf != null)
+ {
+ resolved = cf.cloneMe();
+ break;
+ }
+ }
+ if (resolved == null)
+ return null;
+
+ for (ColumnFamily cf : versions)
+ resolved.resolve(cf);
+
+ return resolved;
+ }
+
+ public void preprocess(Message message)
+ {
+ byte[] body = message.getMessageBody();
+ ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+ try
+ {
+ ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
+ if (logger_.isDebugEnabled())
+ logger_.debug("Preprocessed {} response", result.isDigestQuery() ? "digest" : "data");
+ results.put(message, result);
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+
+ /** hack so local reads don't force de/serialization of an extra real Message */
+ public void injectPreProcessed(ReadResponse result)
+ {
+ assert results.get(FAKE_MESSAGE) == null; // should only be one local reply
+ results.put(FAKE_MESSAGE, result);
+ }
+
+ public boolean isDataPresent()
+ {
+ for (ReadResponse result : results.values())
+ {
+ if (!result.isDigestQuery())
+ return true;
+ }
+ return false;
+ }
+
+ public Iterable<Message> getMessages()
+ {
+ return results.keySet();
+ }
+
+ public int getMessageCount()
+ {
+ return results.size();
+ }
+}
Modified: cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/RepairCallback.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/RepairCallback.java?rev=1071046&r1=1071045&r2=1071046&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/RepairCallback.java (original)
+++ cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/RepairCallback.java Tue Feb 15 20:50:21 2011
@@ -39,14 +39,6 @@ public class RepairCallback<T> implement
private final SimpleCondition condition = new SimpleCondition();
private final long startTime;
- /**
- * The main difference between this and ReadCallback is, ReadCallback has a ConsistencyLevel
- * it needs to achieve. Repair on the other hand is happy to repair whoever replies within the timeout.
- *
- * (The other main difference of course is, this is only created once we know we have a digest
- * mismatch, and we're going to do full-data reads from everyone -- that is, this is the final
- * stage in the read process.)
- */
public RepairCallback(IResponseResolver<T> resolver, List<InetAddress> endpoints)
{
this.resolver = resolver;
@@ -54,6 +46,10 @@ public class RepairCallback<T> implement
this.startTime = System.currentTimeMillis();
}
+ /**
+ * The main difference between this and ReadCallback is, ReadCallback has a ConsistencyLevel
+ * it needs to achieve. Repair on the other hand is happy to repair whoever replies within the timeout.
+ */
public T get() throws TimeoutException, DigestMismatchException, IOException
{
long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime);
@@ -75,9 +71,4 @@ public class RepairCallback<T> implement
if (resolver.getMessageCount() == endpoints.size())
condition.signal();
}
-
- public boolean isLatencyForSnitch()
- {
- return true;
- }
}
Modified: cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1071046&r1=1071045&r2=1071046&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/StorageProxy.java Tue Feb 15 20:50:21 2011
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.QueryFilter;
@@ -58,6 +59,17 @@ public class StorageProxy implements Sto
{
private static final Logger logger = LoggerFactory.getLogger(StorageProxy.class);
+ private static ScheduledExecutorService repairExecutor = new ScheduledThreadPoolExecutor(1); // TODO JMX-enable this
+
+ private static final ThreadLocal<Random> random = new ThreadLocal<Random>()
+ {
+ @Override
+ protected Random initialValue()
+ {
+ return new Random();
+ }
+ };
+
// mbean stuff
private static final LatencyTracker readStats = new LatencyTracker();
private static final LatencyTracker rangeStats = new LatencyTracker();
@@ -66,8 +78,6 @@ public class StorageProxy implements Sto
private static int maxHintWindow = DatabaseDescriptor.getMaxHintWindow();
private static final String UNREACHABLE = "UNREACHABLE";
- public static final StorageProxy instance = new StorageProxy();
-
private StorageProxy() {}
static
{
@@ -313,55 +323,66 @@ public class StorageProxy implements Sto
private static List<Row> fetchRows(List<ReadCommand> commands, ConsistencyLevel consistency_level) throws IOException, UnavailableException, TimeoutException
{
List<ReadCallback<Row>> readCallbacks = new ArrayList<ReadCallback<Row>>();
+ List<List<InetAddress>> commandEndpoints = new ArrayList<List<InetAddress>>();
List<Row> rows = new ArrayList<Row>();
+ Set<ReadCommand> repairs = new HashSet<ReadCommand>();
// send out read requests
for (ReadCommand command: commands)
{
assert !command.isDigestQuery();
- logger.debug("Command/ConsistencyLevel is {}/{}", command, consistency_level);
List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(), endpoints);
- RowDigestResolver resolver = new RowDigestResolver(command.table, command.key);
- ReadCallback<Row> handler = getReadCallback(resolver, command, consistency_level, endpoints);
- handler.assureSufficientLiveNodes();
- assert !handler.endpoints.isEmpty();
+ ReadResponseResolver resolver = new ReadResponseResolver(command.table, command.key);
+ ReadCallback<Row> handler = getReadCallback(resolver, command.table, consistency_level);
+ handler.assureSufficientLiveNodes(endpoints);
+ // if we're not going to read repair, cut the endpoints list down to the ones required to satisfy ConsistencyLevel
+ if (randomlyReadRepair(command))
+ {
+ if (endpoints.size() > handler.blockfor)
+ repairs.add(command);
+ }
+ else
+ {
+ endpoints = endpoints.subList(0, handler.blockfor);
+ }
+
// The data-request message is sent to dataPoint, the node that will actually get
// the data for us. The other replicas are only sent a digest query.
ReadCommand digestCommand = null;
- if (handler.endpoints.size() > 1)
+ if (endpoints.size() > 1)
{
digestCommand = command.copy();
digestCommand.setDigestQuery(true);
}
- InetAddress dataPoint = handler.endpoints.get(0);
+ InetAddress dataPoint = endpoints.get(0);
if (dataPoint.equals(FBUtilities.getLocalAddress()))
{
if (logger.isDebugEnabled())
- logger.debug("reading data locally");
+ logger.debug("reading data for " + command + " locally");
StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
}
else
{
Message message = command.makeReadMessage();
if (logger.isDebugEnabled())
- logger.debug("reading data from " + dataPoint);
+ logger.debug("reading data for " + command + " from " + dataPoint);
MessagingService.instance().sendRR(message, dataPoint, handler);
}
// We lazy-construct the digest Message object since it may not be necessary if we
// are doing a local digest read, or no digest reads at all.
Message digestMessage = null;
- for (InetAddress digestPoint : handler.endpoints.subList(1, handler.endpoints.size()))
+ for (InetAddress digestPoint : endpoints.subList(1, endpoints.size()))
{
if (digestPoint.equals(FBUtilities.getLocalAddress()))
{
if (logger.isDebugEnabled())
- logger.debug("reading digest locally");
+ logger.debug("reading digest for " + command + " locally");
StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler));
}
else
@@ -369,45 +390,44 @@ public class StorageProxy implements Sto
if (digestMessage == null)
digestMessage = digestCommand.makeReadMessage();
if (logger.isDebugEnabled())
- logger.debug("reading digest for from " + digestPoint);
+ logger.debug("reading digest for " + command + " from " + digestPoint);
MessagingService.instance().sendRR(digestMessage, digestPoint, handler);
}
}
readCallbacks.add(handler);
+ commandEndpoints.add(endpoints);
}
// read results and make a second pass for any digest mismatches
List<RepairCallback<Row>> repairResponseHandlers = null;
for (int i = 0; i < commands.size(); i++)
{
- ReadCallback<Row> handler = readCallbacks.get(i);
+ ReadCallback<Row> readCallback = readCallbacks.get(i);
Row row;
ReadCommand command = commands.get(i);
+ List<InetAddress> endpoints = commandEndpoints.get(i);
try
{
long startTime2 = System.currentTimeMillis();
- row = handler.get(); // CL.ONE is special cased here to ignore digests even if some have arrived
+ row = readCallback.get(); // CL.ONE is special cased here to ignore digests even if some have arrived
if (row != null)
rows.add(row);
if (logger.isDebugEnabled())
logger.debug("Read: " + (System.currentTimeMillis() - startTime2) + " ms.");
+
+ if (repairs.contains(command))
+ repairExecutor.schedule(new RepairRunner(readCallback.resolver, command, endpoints), DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
}
catch (DigestMismatchException ex)
{
if (logger.isDebugEnabled())
logger.debug("Digest mismatch:", ex);
-
- RowRepairResolver resolver = new RowRepairResolver(command.table, command.key);
- RepairCallback<Row> repairHandler = new RepairCallback<Row>(resolver, handler.endpoints);
- Message messageRepair = command.makeReadMessage();
- for (InetAddress endpoint : handler.endpoints)
- MessagingService.instance().sendRR(messageRepair, endpoint, repairHandler);
-
+ RepairCallback<Row> handler = repair(command, endpoints);
if (repairResponseHandlers == null)
repairResponseHandlers = new ArrayList<RepairCallback<Row>>();
- repairResponseHandlers.add(repairHandler);
+ repairResponseHandlers.add(handler);
}
}
@@ -456,13 +476,24 @@ public class StorageProxy implements Sto
}
}
- static <T> ReadCallback<T> getReadCallback(IResponseResolver<T> resolver, IReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> endpoints)
+ static <T> ReadCallback<T> getReadCallback(IResponseResolver<T> resolver, String table, ConsistencyLevel consistencyLevel)
{
if (consistencyLevel.equals(ConsistencyLevel.LOCAL_QUORUM) || consistencyLevel.equals(ConsistencyLevel.EACH_QUORUM))
{
- return new DatacenterReadCallback(resolver, consistencyLevel, command, endpoints);
+ return new DatacenterReadCallback(resolver, consistencyLevel, table);
}
- return new ReadCallback(resolver, consistencyLevel, command, endpoints);
+ return new ReadCallback(resolver, consistencyLevel, table);
+ }
+
+ private static RepairCallback<Row> repair(ReadCommand command, List<InetAddress> endpoints)
+ throws IOException
+ {
+ ReadResponseResolver resolver = new ReadResponseResolver(command.table, command.key);
+ RepairCallback<Row> handler = new RepairCallback<Row>(resolver, endpoints);
+ Message messageRepair = command.makeReadMessage();
+ for (InetAddress endpoint : endpoints)
+ MessagingService.instance().sendRR(messageRepair, endpoint, handler);
+ return handler;
}
/*
@@ -514,14 +545,16 @@ public class StorageProxy implements Sto
// collect replies and resolve according to consistency level
RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace, liveEndpoints);
- ReadCallback<List<Row>> handler = getReadCallback(resolver, command, consistency_level, liveEndpoints);
- handler.assureSufficientLiveNodes();
+ AbstractReplicationStrategy rs = Table.open(command.keyspace).getReplicationStrategy();
+ ReadCallback<List<Row>> handler = getReadCallback(resolver, command.keyspace, consistency_level);
+ // TODO bail early if live endpoints can't satisfy requested consistency level
for (InetAddress endpoint : liveEndpoints)
{
MessagingService.instance().sendRR(message, endpoint, handler);
if (logger.isDebugEnabled())
logger.debug("reading " + c2 + " from " + endpoint);
}
+ // TODO read repair on remaining replicas?
// if we're done, great, otherwise, move to the next range
try
@@ -574,11 +607,6 @@ public class StorageProxy implements Sto
versions.put(message.getFrom(), theirVersion);
latch.countDown();
}
-
- public boolean isLatencyForSnitch()
- {
- return false;
- }
};
// an empty message acts as a request to the SchemaCheckVerbHandler.
for (InetAddress endpoint : liveHosts)
@@ -671,6 +699,12 @@ public class StorageProxy implements Sto
return ranges;
}
+
+ private static boolean randomlyReadRepair(ReadCommand command)
+ {
+ CFMetaData cfmd = DatabaseDescriptor.getTableMetaData(command.table).get(command.getColumnFamilyName());
+ return cfmd.getReadRepairChance() > random.get().nextDouble();
+ }
public long getReadOperations()
{
@@ -747,7 +781,7 @@ public class StorageProxy implements Sto
return writeStats.getRecentLatencyHistogramMicros();
}
- public static List<Row> scan(final String keyspace, String column_family, IndexClause index_clause, SlicePredicate column_predicate, ConsistencyLevel consistency_level)
+ public static List<Row> scan(String keyspace, String column_family, IndexClause index_clause, SlicePredicate column_predicate, ConsistencyLevel consistency_level)
throws IOException, TimeoutException, UnavailableException
{
IPartitioner p = StorageService.getPartitioner();
@@ -765,16 +799,12 @@ public class StorageProxy implements Sto
// collect replies and resolve according to consistency level
RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(keyspace, liveEndpoints);
- IReadCommand iCommand = new IReadCommand()
- {
- public String getKeyspace()
- {
- return keyspace;
- }
- };
- ReadCallback<List<Row>> handler = getReadCallback(resolver, iCommand, consistency_level, liveEndpoints);
- handler.assureSufficientLiveNodes();
-
+ ReadCallback<List<Row>> handler = getReadCallback(resolver, keyspace, consistency_level);
+
+ // bail early if live endpoints can't satisfy requested consistency level
+ if(handler.blockfor > liveEndpoints.size())
+ throw new UnavailableException();
+
IndexScanCommand command = new IndexScanCommand(keyspace, column_family, index_clause, column_predicate, range);
Message message = command.getMessage();
for (InetAddress endpoint : liveEndpoints)
@@ -882,4 +912,40 @@ public class StorageProxy implements Sto
{
return !Gossiper.instance.getUnreachableMembers().isEmpty();
}
+
+ private static class RepairRunner extends WrappedRunnable
+ {
+ private final IResponseResolver<Row> resolver;
+ private final ReadCommand command;
+ private final List<InetAddress> endpoints;
+
+ public RepairRunner(IResponseResolver<Row> resolver, ReadCommand command, List<InetAddress> endpoints)
+ {
+ this.resolver = resolver;
+ this.command = command;
+ this.endpoints = endpoints;
+ }
+
+ protected void runMayThrow() throws IOException
+ {
+ try
+ {
+ resolver.resolve();
+ }
+ catch (DigestMismatchException e)
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("Digest mismatch:", e);
+ final RepairCallback<Row> callback = repair(command, endpoints);
+ Runnable runnable = new WrappedRunnable()
+ {
+ public void runMayThrow() throws DigestMismatchException, IOException, TimeoutException
+ {
+ callback.get();
+ }
+ };
+ repairExecutor.schedule(runnable, DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
+ }
+ }
+ }
}
Modified: cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/TruncateResponseHandler.java?rev=1071046&r1=1071045&r2=1071046&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/TruncateResponseHandler.java (original)
+++ cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/TruncateResponseHandler.java Tue Feb 15 20:50:21 2011
@@ -73,9 +73,4 @@ public class TruncateResponseHandler imp
if (responses.get() >= responseCount)
condition.signal();
}
-
- public boolean isLatencyForSnitch()
- {
- return false;
- }
}
Modified: cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/WriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/WriteResponseHandler.java?rev=1071046&r1=1071045&r2=1071046&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/WriteResponseHandler.java (original)
+++ cassandra/branches/cassandra-0.7.2/src/java/org/apache/cassandra/service/WriteResponseHandler.java Tue Feb 15 20:50:21 2011
@@ -121,9 +121,4 @@ public class WriteResponseHandler extend
throw new UnavailableException();
}
}
-
- public boolean isLatencyForSnitch()
- {
- return false;
- }
}
Modified: cassandra/branches/cassandra-0.7.2/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.2/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java?rev=1071046&r1=1071045&r2=1071046&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7.2/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java (original)
+++ cassandra/branches/cassandra-0.7.2/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java Tue Feb 15 20:50:21 2011
@@ -71,7 +71,7 @@ public class ConsistencyLevelTest extend
AbstractReplicationStrategy strategy;
- for (final String table : DatabaseDescriptor.getNonSystemTables())
+ for (String table : DatabaseDescriptor.getNonSystemTables())
{
strategy = getStrategy(table, tmd);
StorageService.calculatePendingRanges(strategy, table);
@@ -96,15 +96,7 @@ public class ConsistencyLevelTest extend
IWriteResponseHandler writeHandler = strategy.getWriteResponseHandler(hosts, hintedNodes, c);
- IReadCommand command = new IReadCommand()
- {
- public String getKeyspace()
- {
- return table;
- }
- };
- RowRepairResolver resolver = new RowRepairResolver(table, ByteBufferUtil.bytes("foo"));
- ReadCallback<Row> readHandler = StorageProxy.getReadCallback(resolver, command, c, new ArrayList<InetAddress>(hintedNodes.keySet()));
+ ReadCallback<Row> readHandler = StorageProxy.getReadCallback(new ReadResponseResolver(table, ByteBufferUtil.bytes("foo")), table, c);
boolean isWriteUnavailable = false;
boolean isReadUnavailable = false;
@@ -119,7 +111,7 @@ public class ConsistencyLevelTest extend
try
{
- readHandler.assureSufficientLiveNodes();
+ readHandler.assureSufficientLiveNodes(hintedNodes.asMap().keySet());
}
catch (UnavailableException e)
{
Copied: cassandra/branches/cassandra-0.7.2/test/unit/org/apache/cassandra/service/ReadResponseResolverTest.java (from r1071027, cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ReadResponseResolverTest.java)
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.2/test/unit/org/apache/cassandra/service/ReadResponseResolverTest.java?p2=cassandra/branches/cassandra-0.7.2/test/unit/org/apache/cassandra/service/ReadResponseResolverTest.java&p1=cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ReadResponseResolverTest.java&r1=1071027&r2=1071046&rev=1071046&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ReadResponseResolverTest.java (original)
+++ cassandra/branches/cassandra-0.7.2/test/unit/org/apache/cassandra/service/ReadResponseResolverTest.java Tue Feb 15 20:50:21 2011
@@ -0,0 +1,96 @@
+package org.apache.cassandra.service;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.util.Arrays;
+
+import org.apache.cassandra.SchemaLoader;
+import org.junit.Test;
+
+import org.apache.cassandra.db.ColumnFamily;
+
+import static org.apache.cassandra.db.TableTest.assertColumns;
+import static org.apache.cassandra.Util.column;
+import static junit.framework.Assert.assertNull;
+
+public class ReadResponseResolverTest extends SchemaLoader
+{
+ @Test
+ public void testResolveSupersetNewer()
+ {
+ ColumnFamily cf1 = ColumnFamily.create("Keyspace1", "Standard1");
+ cf1.addColumn(column("c1", "v1", 0));
+
+ ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
+ cf2.addColumn(column("c1", "v2", 1));
+
+ ColumnFamily resolved = ReadResponseResolver.resolveSuperset(Arrays.asList(cf1, cf2));
+ assertColumns(resolved, "c1");
+ assertColumns(ColumnFamily.diff(cf1, resolved), "c1");
+ assertNull(ColumnFamily.diff(cf2, resolved));
+ }
+
+ @Test
+ public void testResolveSupersetDisjoint()
+ {
+ ColumnFamily cf1 = ColumnFamily.create("Keyspace1", "Standard1");
+ cf1.addColumn(column("c1", "v1", 0));
+
+ ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
+ cf2.addColumn(column("c2", "v2", 1));
+
+ ColumnFamily resolved = ReadResponseResolver.resolveSuperset(Arrays.asList(cf1, cf2));
+ assertColumns(resolved, "c1", "c2");
+ assertColumns(ColumnFamily.diff(cf1, resolved), "c2");
+ assertColumns(ColumnFamily.diff(cf2, resolved), "c1");
+ }
+
+ @Test
+ public void testResolveSupersetNullOne()
+ {
+ ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
+ cf2.addColumn(column("c2", "v2", 1));
+
+ ColumnFamily resolved = ReadResponseResolver.resolveSuperset(Arrays.asList(null, cf2));
+ assertColumns(resolved, "c2");
+ assertColumns(ColumnFamily.diff(null, resolved), "c2");
+ assertNull(ColumnFamily.diff(cf2, resolved));
+ }
+
+ @Test
+ public void testResolveSupersetNullTwo()
+ {
+ ColumnFamily cf1 = ColumnFamily.create("Keyspace1", "Standard1");
+ cf1.addColumn(column("c1", "v1", 0));
+
+ ColumnFamily resolved = ReadResponseResolver.resolveSuperset(Arrays.asList(cf1, null));
+ assertColumns(resolved, "c1");
+ assertNull(ColumnFamily.diff(cf1, resolved));
+ assertColumns(ColumnFamily.diff(null, resolved), "c1");
+ }
+
+ @Test
+ public void testResolveSupersetNullBoth()
+ {
+ assertNull(ReadResponseResolver.resolveSuperset(Arrays.<ColumnFamily>asList(null, null)));
+ }
+}