You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/01/07 11:12:35 UTC
git commit: Ensure CL guarantees on digest mismatch
Updated Branches:
refs/heads/cassandra-1.2 af8a477cc -> 3d787b78c
Ensure CL guarantees on digest mismatch
patch by slebresne; reviewed by jbellis for CASSANDRA-5113
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3d787b78
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3d787b78
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3d787b78
Branch: refs/heads/cassandra-1.2
Commit: 3d787b78c155773edcf29af8290ef1bea62a4206
Parents: af8a477
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Mon Jan 7 11:11:45 2013 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Mon Jan 7 11:11:45 2013 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/ReadCommand.java | 4 +-
.../apache/cassandra/db/SliceFromReadCommand.java | 6 +-
.../cassandra/service/AsyncRepairCallback.java | 4 +-
.../cassandra/service/DatacenterReadCallback.java | 13 +-
.../service/RangeSliceResponseResolver.java | 4 +-
.../org/apache/cassandra/service/ReadCallback.java | 26 ++-
.../apache/cassandra/service/RepairCallback.java | 86 -------
.../apache/cassandra/service/RowDataResolver.java | 182 +++++++++++++++
.../cassandra/service/RowDigestResolver.java | 2 +-
.../cassandra/service/RowRepairResolver.java | 182 ---------------
.../org/apache/cassandra/service/StorageProxy.java | 16 +-
.../apache/cassandra/service/RowResolverTest.java | 20 +-
13 files changed, 247 insertions(+), 299 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d787b78/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2ec66c9..95aad22 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -20,6 +20,7 @@
* cqlsh: Add default limit to SELECT statements (CASSANDRA-4972)
* cqlsh: fix DESCRIBE for 1.1 cfs in CQL3 (CASSANDRA-5101)
* Correctly gossip with nodes >= 1.1.7 (CASSANDRA-5102)
+ * Ensure CL guarantees on digest mismatch (CASSANDRA-5113)
Merged from 1.1:
* Pig: correctly decode row keys in widerow mode (CASSANDRA-5098)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d787b78/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index f3494e5..6c364cb 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -32,7 +32,7 @@ import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.IReadCommand;
-import org.apache.cassandra.service.RepairCallback;
+import org.apache.cassandra.service.RowDataResolver;
import org.apache.cassandra.utils.IFilter;
@@ -94,7 +94,7 @@ public abstract class ReadCommand implements IReadCommand
}
// maybeGenerateRetryCommand is used to generate a retry for short reads
- public ReadCommand maybeGenerateRetryCommand(RepairCallback handler, Row row)
+ public ReadCommand maybeGenerateRetryCommand(RowDataResolver resolver, Row row)
{
return null;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d787b78/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
index d52826b..8a08a42 100644
--- a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
@@ -30,7 +30,7 @@ import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.filter.SliceQueryFilter;
import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.service.RepairCallback;
+import org.apache.cassandra.service.RowDataResolver;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -71,9 +71,9 @@ public class SliceFromReadCommand extends ReadCommand
}
@Override
- public ReadCommand maybeGenerateRetryCommand(RepairCallback handler, Row row)
+ public ReadCommand maybeGenerateRetryCommand(RowDataResolver resolver, Row row)
{
- int maxLiveColumns = handler.getMaxLiveCount();
+ int maxLiveColumns = resolver.getMaxLiveCount();
int count = filter.count;
assert maxLiveColumns <= count;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d787b78/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AsyncRepairCallback.java b/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
index 675f61c..63b7df3 100644
--- a/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
+++ b/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
@@ -28,11 +28,11 @@ import org.apache.cassandra.utils.WrappedRunnable;
public class AsyncRepairCallback implements IAsyncCallback
{
- private final RowRepairResolver repairResolver;
+ private final RowDataResolver repairResolver;
private final int blockfor;
protected final AtomicInteger received = new AtomicInteger(0);
- public AsyncRepairCallback(RowRepairResolver repairResolver, int blockfor)
+ public AsyncRepairCallback(RowDataResolver repairResolver, int blockfor)
{
this.repairResolver = repairResolver;
this.blockfor = blockfor;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d787b78/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterReadCallback.java b/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
index d125553..e1ae652 100644
--- a/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
+++ b/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
@@ -46,11 +46,22 @@ public class DatacenterReadCallback<TMessage, TResolved> extends ReadCallback<TM
}
};
- public DatacenterReadCallback(IResponseResolver resolver, ConsistencyLevel consistencyLevel, IReadCommand command, List<InetAddress> endpoints)
+ public DatacenterReadCallback(IResponseResolver<TMessage, TResolved> resolver, ConsistencyLevel consistencyLevel, IReadCommand command, List<InetAddress> endpoints)
{
super(resolver, consistencyLevel, command, endpoints);
}
+ protected DatacenterReadCallback(IResponseResolver<TMessage, TResolved> resolver, ConsistencyLevel consistencyLevel, int blockfor, IReadCommand command, List<InetAddress> endpoints)
+ {
+ super(resolver, consistencyLevel, blockfor, command, endpoints);
+ }
+
+ @Override
+ public ReadCallback<TMessage, TResolved> withNewResolver(IResponseResolver<TMessage, TResolved> newResolver)
+ {
+ return new DatacenterReadCallback(newResolver, consistencyLevel, blockfor, command, endpoints);
+ }
+
@Override
protected void sortForConsistencyLevel(List<InetAddress> endpoints)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d787b78/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java b/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
index 0d24fbf..1dfd01e 100644
--- a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
@@ -138,7 +138,7 @@ public class RangeSliceResponseResolver implements IResponseResolver<RangeSliceR
protected Row getReduced()
{
ColumnFamily resolved = versions.size() > 1
- ? RowRepairResolver.resolveSuperset(versions)
+ ? RowDataResolver.resolveSuperset(versions)
: versions.get(0);
if (versions.size() < sources.size())
{
@@ -154,7 +154,7 @@ public class RangeSliceResponseResolver implements IResponseResolver<RangeSliceR
}
// resolved can be null even if versions doesn't have all nulls because of the call to removeDeleted in resolveSuperSet
if (resolved != null)
- repairResults.addAll(RowRepairResolver.scheduleRepairs(resolved, table, key, versions, versionSources));
+ repairResults.addAll(RowDataResolver.scheduleRepairs(resolved, table, key, versions, versionSources));
versions.clear();
versionSources.clear();
return new Row(key, resolved);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d787b78/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index 8df2e10..e12859a 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -60,7 +60,7 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
private final long startTime;
protected final int blockfor;
final List<InetAddress> endpoints;
- private final IReadCommand command;
+ protected final IReadCommand command;
protected final ConsistencyLevel consistencyLevel;
protected final AtomicInteger received = new AtomicInteger(0);
@@ -75,11 +75,26 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
this.startTime = System.currentTimeMillis();
this.consistencyLevel = consistencyLevel;
sortForConsistencyLevel(endpoints);
- this.endpoints = resolver instanceof RowRepairResolver ? endpoints : filterEndpoints(endpoints);
+ this.endpoints = filterEndpoints(endpoints);
if (logger.isTraceEnabled())
logger.trace(String.format("Blockfor is %s; setting up requests to %s", blockfor, StringUtils.join(this.endpoints, ",")));
}
+ protected ReadCallback(IResponseResolver<TMessage, TResolved> resolver, ConsistencyLevel consistencyLevel, int blockfor, IReadCommand command, List<InetAddress> endpoints)
+ {
+ this.command = command;
+ this.blockfor = blockfor;
+ this.consistencyLevel = consistencyLevel;
+ this.resolver = resolver;
+ this.startTime = System.currentTimeMillis();
+ this.endpoints = endpoints;
+ }
+
+ public ReadCallback<TMessage, TResolved> withNewResolver(IResponseResolver<TMessage, TResolved> newResolver)
+ {
+ return new ReadCallback(newResolver, consistencyLevel, blockfor, command, endpoints);
+ }
+
/**
* Endpoints is already restricted to live replicas, sorted by snitch preference. This is a hook for
* DatacenterReadCallback to move local-DC replicas to the front of the list. We need this both
@@ -209,17 +224,22 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
{
protected void runMayThrow() throws IOException
{
+ // If the resolver is a RowDigestResolver, we need to do a full data read if there is a mismatch.
+ // Otherwise, resolve will send the repairs directly if needs be (and in that case we should never
+ // get a digest mismatch)
try
{
resolver.resolve();
}
catch (DigestMismatchException e)
{
+ assert resolver instanceof RowDigestResolver;
+
if (logger.isDebugEnabled())
logger.debug("Digest mismatch:", e);
ReadCommand readCommand = (ReadCommand) command;
- final RowRepairResolver repairResolver = new RowRepairResolver(readCommand.table, readCommand.key, readCommand.filter());
+ final RowDataResolver repairResolver = new RowDataResolver(readCommand.table, readCommand.key, readCommand.filter());
IAsyncCallback repairHandler = new AsyncRepairCallback(repairResolver, endpoints.size());
MessageOut<ReadCommand> message = ((ReadCommand) command).createMessage();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d787b78/src/java/org/apache/cassandra/service/RepairCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RepairCallback.java b/src/java/org/apache/cassandra/service/RepairCallback.java
deleted file mode 100644
index 9388328..0000000
--- a/src/java/org/apache/cassandra/service/RepairCallback.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.Row;
-import org.apache.cassandra.net.IAsyncCallback;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.utils.SimpleCondition;
-
-public class RepairCallback implements IAsyncCallback
-{
- public final RowRepairResolver resolver;
- private final List<InetAddress> endpoints;
- private final SimpleCondition condition = new SimpleCondition();
- private final long startTime;
- protected final AtomicInteger received = new AtomicInteger(0);
-
- /**
- * The main difference between this and ReadCallback is, ReadCallback has a ConsistencyLevel
- * it needs to achieve. Repair on the other hand is happy to repair whoever replies within the timeout.
- *
- * (The other main difference of course is, this is only created once we know we have a digest
- * mismatch, and we're going to do full-data reads from everyone -- that is, this is the final
- * stage in the read process.)
- */
- public RepairCallback(RowRepairResolver resolver, List<InetAddress> endpoints)
- {
- this.resolver = resolver;
- this.endpoints = endpoints;
- this.startTime = System.currentTimeMillis();
- }
-
- public Row get() throws DigestMismatchException, IOException
- {
- long timeout = DatabaseDescriptor.getWriteRpcTimeout() - (System.currentTimeMillis() - startTime);
- try
- {
- condition.await(timeout, TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException ex)
- {
- throw new AssertionError(ex);
- }
-
- return received.get() > 1 ? resolver.resolve() : null;
- }
-
- public void response(MessageIn message)
- {
- resolver.preprocess(message);
- if (received.incrementAndGet() == endpoints.size())
- condition.signal();
- }
-
- public boolean isLatencyForSnitch()
- {
- return true;
- }
-
- public int getMaxLiveCount()
- {
- return resolver.getMaxLiveCount();
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d787b78/src/java/org/apache/cassandra/service/RowDataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RowDataResolver.java b/src/java/org/apache/cassandra/service/RowDataResolver.java
new file mode 100644
index 0000000..5545293
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/RowDataResolver.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.db.filter.IDiskAtomFilter;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
+import org.apache.cassandra.net.IAsyncResult;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.CloseableIterator;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.IFilter;
+
+public class RowDataResolver extends AbstractRowResolver
+{
+ private int maxLiveCount = 0;
+ public List<IAsyncResult> repairResults = Collections.emptyList();
+ private final IDiskAtomFilter filter;
+
+ public RowDataResolver(String table, ByteBuffer key, IDiskAtomFilter qFilter)
+ {
+ super(key, table);
+ this.filter = qFilter;
+ }
+
+ /*
+ * This method handles the following scenario:
+ *
+ * there was a mismatch on the initial read, so we redid the digest requests
+ * as full data reads. In this case we need to compute the most recent version
+ * of each column, and send diffs to out-of-date replicas.
+ */
+ public Row resolve() throws DigestMismatchException, IOException
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("resolving " + replies.size() + " responses");
+ long startTime = System.currentTimeMillis();
+
+ ColumnFamily resolved;
+ if (replies.size() > 1)
+ {
+ List<ColumnFamily> versions = new ArrayList<ColumnFamily>(replies.size());
+ List<InetAddress> endpoints = new ArrayList<InetAddress>(replies.size());
+
+ for (MessageIn<ReadResponse> message : replies)
+ {
+ ReadResponse response = message.payload;
+ ColumnFamily cf = response.row().cf;
+ assert !response.isDigestQuery() : "Received digest response to repair read from " + message.from;
+ versions.add(cf);
+ endpoints.add(message.from);
+
+ // compute maxLiveCount to prevent short reads -- see https://issues.apache.org/jira/browse/CASSANDRA-2643
+ int liveCount = cf == null ? 0 : filter.getLiveCount(cf);
+ if (liveCount > maxLiveCount)
+ maxLiveCount = liveCount;
+ }
+
+ resolved = resolveSuperset(versions);
+ if (logger.isDebugEnabled())
+ logger.debug("versions merged");
+
+ // send updates to any replica that was missing part of the full row
+ // (resolved can be null even if versions doesn't have all nulls because of the call to removeDeleted in resolveSuperSet)
+ if (resolved != null)
+ repairResults = scheduleRepairs(resolved, table, key, versions, endpoints);
+ }
+ else
+ {
+ resolved = replies.iterator().next().payload.row().cf;
+ }
+
+ if (logger.isDebugEnabled())
+ logger.debug("resolve: " + (System.currentTimeMillis() - startTime) + " ms.");
+
+ return new Row(key, resolved);
+ }
+
+ /**
+ * For each row version, compare with resolved (the superset of all row versions);
+ * if it is missing anything, send a mutation to the endpoint it come from.
+ */
+ public static List<IAsyncResult> scheduleRepairs(ColumnFamily resolved, String table, DecoratedKey key, List<ColumnFamily> versions, List<InetAddress> endpoints)
+ {
+ List<IAsyncResult> results = new ArrayList<IAsyncResult>(versions.size());
+
+ for (int i = 0; i < versions.size(); i++)
+ {
+ ColumnFamily diffCf = ColumnFamily.diff(versions.get(i), resolved);
+ if (diffCf == null) // no repair needs to happen
+ continue;
+
+ // create and send the row mutation message based on the diff
+ RowMutation rowMutation = new RowMutation(table, key.key);
+ rowMutation.add(diffCf);
+ MessageOut repairMessage;
+ // use a separate verb here because we don't want these to be get the white glove hint-
+ // on-timeout behavior that a "real" mutation gets
+ repairMessage = rowMutation.createMessage(MessagingService.Verb.READ_REPAIR);
+ results.add(MessagingService.instance().sendRR(repairMessage, endpoints.get(i)));
+ }
+
+ return results;
+ }
+
+ static ColumnFamily resolveSuperset(Iterable<ColumnFamily> versions)
+ {
+ assert Iterables.size(versions) > 0;
+
+ ColumnFamily resolved = null;
+ for (ColumnFamily cf : versions)
+ {
+ if (cf == null)
+ continue;
+
+ if (resolved == null)
+ resolved = cf.cloneMeShallow();
+ else
+ resolved.delete(cf);
+ }
+ if (resolved == null)
+ return null;
+
+ // mimic the collectCollatedColumn + removeDeleted path that getColumnFamily takes.
+ // this will handle removing columns and subcolumns that are supressed by a row or
+ // supercolumn tombstone.
+ QueryFilter filter = new QueryFilter(null, new QueryPath(resolved.metadata().cfName), new IdentityQueryFilter());
+ List<CloseableIterator<IColumn>> iters = new ArrayList<CloseableIterator<IColumn>>();
+ for (ColumnFamily version : versions)
+ {
+ if (version == null)
+ continue;
+ iters.add(FBUtilities.closeableIterator(version.iterator()));
+ }
+ filter.collateColumns(resolved, iters, Integer.MIN_VALUE);
+ return ColumnFamilyStore.removeDeleted(resolved, Integer.MIN_VALUE);
+ }
+
+ public Row getData() throws IOException
+ {
+ return replies.iterator().next().payload.row();
+ }
+
+ public boolean isDataPresent()
+ {
+ return !replies.isEmpty();
+ }
+
+ public int getMaxLiveCount()
+ {
+ return maxLiveCount;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d787b78/src/java/org/apache/cassandra/service/RowDigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RowDigestResolver.java b/src/java/org/apache/cassandra/service/RowDigestResolver.java
index e0e262b..eeccbeb 100644
--- a/src/java/org/apache/cassandra/service/RowDigestResolver.java
+++ b/src/java/org/apache/cassandra/service/RowDigestResolver.java
@@ -95,7 +95,7 @@ public class RowDigestResolver extends AbstractRowResolver
// with the data response. If there is a mismatch then throw an exception so that read repair can happen.
//
// It's important to note that we do not consider the possibility of multiple data responses --
- // that can only happen when we're doing the repair post-mismatch, and will be handled by RowRepairResolver.
+ // that can only happen when we're doing the repair post-mismatch, and will be handled by RowDataResolver.
if (digest != null)
{
ByteBuffer digest2 = ColumnFamily.digest(data);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d787b78/src/java/org/apache/cassandra/service/RowRepairResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RowRepairResolver.java b/src/java/org/apache/cassandra/service/RowRepairResolver.java
deleted file mode 100644
index 21cf5ab..0000000
--- a/src/java/org/apache/cassandra/service/RowRepairResolver.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import com.google.common.collect.Iterables;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.filter.QueryPath;
-import org.apache.cassandra.db.filter.SliceQueryFilter;
-import org.apache.cassandra.net.IAsyncResult;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.CloseableIterator;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.IFilter;
-
-public class RowRepairResolver extends AbstractRowResolver
-{
- private int maxLiveCount = 0;
- public List<IAsyncResult> repairResults = Collections.emptyList();
- private final IDiskAtomFilter filter;
-
- public RowRepairResolver(String table, ByteBuffer key, IDiskAtomFilter qFilter)
- {
- super(key, table);
- this.filter = qFilter;
- }
-
- /*
- * This method handles the following scenario:
- *
- * there was a mismatch on the initial read, so we redid the digest requests
- * as full data reads. In this case we need to compute the most recent version
- * of each column, and send diffs to out-of-date replicas.
- */
- public Row resolve() throws DigestMismatchException, IOException
- {
- if (logger.isDebugEnabled())
- logger.debug("resolving " + replies.size() + " responses");
- long startTime = System.currentTimeMillis();
-
- ColumnFamily resolved;
- if (replies.size() > 1)
- {
- List<ColumnFamily> versions = new ArrayList<ColumnFamily>(replies.size());
- List<InetAddress> endpoints = new ArrayList<InetAddress>(replies.size());
-
- for (MessageIn<ReadResponse> message : replies)
- {
- ReadResponse response = message.payload;
- ColumnFamily cf = response.row().cf;
- assert !response.isDigestQuery() : "Received digest response to repair read from " + message.from;
- versions.add(cf);
- endpoints.add(message.from);
-
- // compute maxLiveCount to prevent short reads -- see https://issues.apache.org/jira/browse/CASSANDRA-2643
- int liveCount = cf == null ? 0 : filter.getLiveCount(cf);
- if (liveCount > maxLiveCount)
- maxLiveCount = liveCount;
- }
-
- resolved = resolveSuperset(versions);
- if (logger.isDebugEnabled())
- logger.debug("versions merged");
-
- // send updates to any replica that was missing part of the full row
- // (resolved can be null even if versions doesn't have all nulls because of the call to removeDeleted in resolveSuperSet)
- if (resolved != null)
- repairResults = scheduleRepairs(resolved, table, key, versions, endpoints);
- }
- else
- {
- resolved = replies.iterator().next().payload.row().cf;
- }
-
- if (logger.isDebugEnabled())
- logger.debug("resolve: " + (System.currentTimeMillis() - startTime) + " ms.");
-
- return new Row(key, resolved);
- }
-
- /**
- * For each row version, compare with resolved (the superset of all row versions);
- * if it is missing anything, send a mutation to the endpoint it come from.
- */
- public static List<IAsyncResult> scheduleRepairs(ColumnFamily resolved, String table, DecoratedKey key, List<ColumnFamily> versions, List<InetAddress> endpoints)
- {
- List<IAsyncResult> results = new ArrayList<IAsyncResult>(versions.size());
-
- for (int i = 0; i < versions.size(); i++)
- {
- ColumnFamily diffCf = ColumnFamily.diff(versions.get(i), resolved);
- if (diffCf == null) // no repair needs to happen
- continue;
-
- // create and send the row mutation message based on the diff
- RowMutation rowMutation = new RowMutation(table, key.key);
- rowMutation.add(diffCf);
- MessageOut repairMessage;
- // use a separate verb here because we don't want these to be get the white glove hint-
- // on-timeout behavior that a "real" mutation gets
- repairMessage = rowMutation.createMessage(MessagingService.Verb.READ_REPAIR);
- results.add(MessagingService.instance().sendRR(repairMessage, endpoints.get(i)));
- }
-
- return results;
- }
-
- static ColumnFamily resolveSuperset(Iterable<ColumnFamily> versions)
- {
- assert Iterables.size(versions) > 0;
-
- ColumnFamily resolved = null;
- for (ColumnFamily cf : versions)
- {
- if (cf == null)
- continue;
-
- if (resolved == null)
- resolved = cf.cloneMeShallow();
- else
- resolved.delete(cf);
- }
- if (resolved == null)
- return null;
-
- // mimic the collectCollatedColumn + removeDeleted path that getColumnFamily takes.
- // this will handle removing columns and subcolumns that are supressed by a row or
- // supercolumn tombstone.
- QueryFilter filter = new QueryFilter(null, new QueryPath(resolved.metadata().cfName), new IdentityQueryFilter());
- List<CloseableIterator<IColumn>> iters = new ArrayList<CloseableIterator<IColumn>>();
- for (ColumnFamily version : versions)
- {
- if (version == null)
- continue;
- iters.add(FBUtilities.closeableIterator(version.iterator()));
- }
- filter.collateColumns(resolved, iters, Integer.MIN_VALUE);
- return ColumnFamilyStore.removeDeleted(resolved, Integer.MIN_VALUE);
- }
-
- public Row getData() throws IOException
- {
- throw new UnsupportedOperationException();
- }
-
- public boolean isDataPresent()
- {
- throw new UnsupportedOperationException();
- }
-
- public int getMaxLiveCount()
- {
- return maxLiveCount;
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d787b78/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index fe427af..0fb7ec4 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -923,7 +923,7 @@ public class StorageProxy implements StorageProxyMBean
// read results and make a second pass for any digest mismatches
List<ReadCommand> repairCommands = null;
- List<RepairCallback> repairResponseHandlers = null;
+ List<ReadCallback<ReadResponse, Row>> repairResponseHandlers = null;
for (int i = 0; i < commands.size(); i++)
{
ReadCallback<ReadResponse, Row> handler = readCallbacks[i];
@@ -946,13 +946,14 @@ public class StorageProxy implements StorageProxyMBean
catch (DigestMismatchException ex)
{
logger.debug("Digest mismatch: {}", ex.toString());
- RowRepairResolver resolver = new RowRepairResolver(command.table, command.key, command.filter());
- RepairCallback repairHandler = new RepairCallback(resolver, handler.endpoints);
+ // Do a full data read to resolve the correct response (and repair node that need be)
+ RowDataResolver resolver = new RowDataResolver(command.table, command.key, command.filter());
+ ReadCallback<ReadResponse, Row> repairHandler = handler.withNewResolver(resolver);
if (repairCommands == null)
{
repairCommands = new ArrayList<ReadCommand>();
- repairResponseHandlers = new ArrayList<RepairCallback>();
+ repairResponseHandlers = new ArrayList<ReadCallback<ReadResponse, Row>>();
}
repairCommands.add(command);
repairResponseHandlers.add(repairHandler);
@@ -974,7 +975,7 @@ public class StorageProxy implements StorageProxyMBean
for (int i = 0; i < repairCommands.size(); i++)
{
ReadCommand command = repairCommands.get(i);
- RepairCallback handler = repairResponseHandlers.get(i);
+ ReadCallback<ReadResponse, Row> handler = repairResponseHandlers.get(i);
Row row;
try
@@ -986,11 +987,12 @@ public class StorageProxy implements StorageProxyMBean
throw new AssertionError(e); // full data requested from each node here, no digests should be sent
}
+ RowDataResolver resolver = (RowDataResolver)handler.resolver;
try
{
// wait for the repair writes to be acknowledged, to minimize impact on any replica that's
// behind on writes in case the out-of-sync row is read multiple times in quick succession
- FBUtilities.waitOnFutures(handler.resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout());
+ FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout());
}
catch (TimeoutException e)
{
@@ -999,7 +1001,7 @@ public class StorageProxy implements StorageProxyMBean
}
// retry any potential short reads
- ReadCommand retryCommand = command.maybeGenerateRetryCommand(handler, row);
+ ReadCommand retryCommand = command.maybeGenerateRetryCommand(resolver, row);
if (retryCommand != null)
{
logger.debug("Issuing retry for read command");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3d787b78/test/unit/org/apache/cassandra/service/RowResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/RowResolverTest.java b/test/unit/org/apache/cassandra/service/RowResolverTest.java
index 3c530f1..2cc7860 100644
--- a/test/unit/org/apache/cassandra/service/RowResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/RowResolverTest.java
@@ -46,7 +46,7 @@ public class RowResolverTest extends SchemaLoader
ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
cf2.addColumn(column("c1", "v2", 1));
- ColumnFamily resolved = RowRepairResolver.resolveSuperset(Arrays.asList(cf1, cf2));
+ ColumnFamily resolved = RowDataResolver.resolveSuperset(Arrays.asList(cf1, cf2));
assertColumns(resolved, "c1");
assertColumns(ColumnFamily.diff(cf1, resolved), "c1");
assertNull(ColumnFamily.diff(cf2, resolved));
@@ -61,7 +61,7 @@ public class RowResolverTest extends SchemaLoader
ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
cf2.addColumn(column("c2", "v2", 1));
- ColumnFamily resolved = RowRepairResolver.resolveSuperset(Arrays.asList(cf1, cf2));
+ ColumnFamily resolved = RowDataResolver.resolveSuperset(Arrays.asList(cf1, cf2));
assertColumns(resolved, "c1", "c2");
assertColumns(ColumnFamily.diff(cf1, resolved), "c2");
assertColumns(ColumnFamily.diff(cf2, resolved), "c1");
@@ -73,7 +73,7 @@ public class RowResolverTest extends SchemaLoader
ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
cf2.addColumn(column("c2", "v2", 1));
- ColumnFamily resolved = RowRepairResolver.resolveSuperset(Arrays.asList(null, cf2));
+ ColumnFamily resolved = RowDataResolver.resolveSuperset(Arrays.asList(null, cf2));
assertColumns(resolved, "c2");
assertColumns(ColumnFamily.diff(null, resolved), "c2");
assertNull(ColumnFamily.diff(cf2, resolved));
@@ -85,7 +85,7 @@ public class RowResolverTest extends SchemaLoader
ColumnFamily cf1 = ColumnFamily.create("Keyspace1", "Standard1");
cf1.addColumn(column("c1", "v1", 0));
- ColumnFamily resolved = RowRepairResolver.resolveSuperset(Arrays.asList(cf1, null));
+ ColumnFamily resolved = RowDataResolver.resolveSuperset(Arrays.asList(cf1, null));
assertColumns(resolved, "c1");
assertNull(ColumnFamily.diff(cf1, resolved));
assertColumns(ColumnFamily.diff(null, resolved), "c1");
@@ -94,7 +94,7 @@ public class RowResolverTest extends SchemaLoader
@Test
public void testResolveSupersetNullBoth()
{
- assertNull(RowRepairResolver.resolveSuperset(Arrays.<ColumnFamily>asList(null, null)));
+ assertNull(RowDataResolver.resolveSuperset(Arrays.<ColumnFamily>asList(null, null)));
}
@Test
@@ -107,7 +107,7 @@ public class RowResolverTest extends SchemaLoader
ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
cf2.delete(new DeletionInfo(1L, (int) (System.currentTimeMillis() / 1000)));
- ColumnFamily resolved = RowRepairResolver.resolveSuperset(Arrays.asList(cf1, cf2));
+ ColumnFamily resolved = RowDataResolver.resolveSuperset(Arrays.asList(cf1, cf2));
// no columns in the cf
assertColumns(resolved);
assertTrue(resolved.isMarkedForDelete());
@@ -119,7 +119,7 @@ public class RowResolverTest extends SchemaLoader
ColumnFamily scf2 = ColumnFamily.create("Keyspace1", "Super1");
scf2.delete(new DeletionInfo(1L, (int) (System.currentTimeMillis() / 1000)));
- ColumnFamily superResolved = RowRepairResolver.resolveSuperset(Arrays.asList(scf1, scf2));
+ ColumnFamily superResolved = RowDataResolver.resolveSuperset(Arrays.asList(scf1, scf2));
// no columns in the cf
assertColumns(superResolved);
assertTrue(superResolved.isMarkedForDelete());
@@ -138,7 +138,7 @@ public class RowResolverTest extends SchemaLoader
ColumnFamily scf2 = ColumnFamily.create("Keyspace1", "Super1");
scf2.delete(new DeletionInfo(2L, (int) (System.currentTimeMillis() / 1000)));
- ColumnFamily superResolved = RowRepairResolver.resolveSuperset(Arrays.asList(scf1, scf2));
+ ColumnFamily superResolved = RowDataResolver.resolveSuperset(Arrays.asList(scf1, scf2));
// no columns in the cf
assertColumns(superResolved);
assertTrue(superResolved.isMarkedForDelete());
@@ -165,7 +165,7 @@ public class RowResolverTest extends SchemaLoader
ColumnFamily cf4 = ColumnFamily.create("Keyspace1", "Standard1");
cf4.delete(new DeletionInfo(2L, (int) (System.currentTimeMillis() / 1000)));
- ColumnFamily resolved = RowRepairResolver.resolveSuperset(Arrays.asList(cf1, cf2, cf3, cf4));
+ ColumnFamily resolved = RowDataResolver.resolveSuperset(Arrays.asList(cf1, cf2, cf3, cf4));
// will have deleted marker and one column
assertColumns(resolved, "two");
assertColumn(resolved, "two", "B", 3);
@@ -188,7 +188,7 @@ public class RowResolverTest extends SchemaLoader
ColumnFamily scf4 = ColumnFamily.create("Keyspace1", "Super1");
scf4.delete(new DeletionInfo(2L, (int) (System.currentTimeMillis() / 1000)));
- ColumnFamily superResolved = RowRepairResolver.resolveSuperset(Arrays.asList(scf1, scf2, scf3, scf4));
+ ColumnFamily superResolved = RowDataResolver.resolveSuperset(Arrays.asList(scf1, scf2, scf3, scf4));
// will have deleted marker and two super cols
assertColumns(superResolved, "super1", "super2");