You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/10/31 18:49:41 UTC
git commit: Fix short read protection for CQL3
Updated Branches:
refs/heads/trunk dc37dea74 -> 353309f01
Fix short read protection for CQL3
patch by slebresne; reviewed by jbellis for CASSANDRA-4882
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/353309f0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/353309f0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/353309f0
Branch: refs/heads/trunk
Commit: 353309f01313c2f7fd7ce882444b175dcd62baef
Parents: dc37dea
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Oct 31 18:47:35 2012 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Oct 31 18:49:34 2012 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/ReadCommand.java | 3 +
.../cassandra/db/SliceByNamesReadCommand.java | 5 +
.../apache/cassandra/db/SliceFromReadCommand.java | 46 +++-------
.../cassandra/db/filter/SliceQueryFilter.java | 70 ++++++++++++---
.../cassandra/service/AbstractRowResolver.java | 5 -
.../cassandra/service/IResponseResolver.java | 2 -
.../service/RangeSliceResponseResolver.java | 5 -
.../org/apache/cassandra/service/ReadCallback.java | 2 +-
.../apache/cassandra/service/RepairCallback.java | 4 +-
.../cassandra/service/RowRepairResolver.java | 20 +++--
.../org/apache/cassandra/service/StorageProxy.java | 2 +-
12 files changed, 94 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/353309f0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7f4a728..0bb732b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -46,6 +46,7 @@
* Remove system tables accounting from schema (CASSANDRA-4850)
* Force provided columns in clustering key order in 'CLUSTERING ORDER BY' (CASSANDRA-4881)
* Fix composite index bug (CASSANDRA-4884)
+ * Fix short read protection for CQL3 (CASSANDRA-4882)
Merged from 1.1:
* add get[Row|Key]CacheEntries to CacheServiceMBean (CASSANDRA-4859)
* fix get_paged_slice to wrap to next row correctly (CASSANDRA-4816)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/353309f0/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 74d8fba..0a73909 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.filter.IFilter;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.io.IVersionedSerializer;
@@ -79,6 +80,8 @@ public abstract class ReadCommand implements IReadCommand
public abstract Row getRow(Table table) throws IOException;
+ public abstract IFilter filter();
+
protected AbstractType<?> getComparator()
{
return ColumnFamily.getComparatorFor(table, getColumnFamilyName(), queryPath.superColumnName);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/353309f0/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
index 4ca62c8..ba149b7 100644
--- a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
@@ -74,6 +74,11 @@ public class SliceByNamesReadCommand extends ReadCommand
", filter=" + filter +
')';
}
+
+ public IFilter filter()
+ {
+ return filter;
+ }
}
class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadCommand>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/353309f0/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
index 2c8444f..3fa0b51 100644
--- a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
@@ -71,20 +71,23 @@ public class SliceFromReadCommand extends ReadCommand
@Override
public ReadCommand maybeGenerateRetryCommand(RepairCallback handler, Row row)
{
- int maxLiveColumns = handler.getMaxLiveColumns();
- int liveColumnsInRow = row != null ? row.getLiveColumnCount() : 0;
+ int maxLiveColumns = handler.getMaxLiveCount();
int count = filter.count;
assert maxLiveColumns <= count;
// We generate a retry if at least one node reply with count live columns but after merge we have less
// than the total number of column we are interested in (which may be < count on a retry)
- if ((maxLiveColumns == count) && (liveColumnsInRow < getOriginalRequestedCount()))
+ if (maxLiveColumns != count)
+ return null;
+
+ int liveCountInRow = row == null || row.cf == null ? 0 : filter.getLiveCount(row.cf);
+ if (liveCountInRow < getOriginalRequestedCount())
{
- // We asked t (= count) live columns and got l (=liveColumnsInRow) ones.
+ // We asked t (= count) live columns and got l (=liveCountInRow) ones.
// From that, we can estimate that on this row, for x requested
// columns, only l/t end up live after reconciliation. So for next
// round we want to ask x column so that x * (l/t) == t, i.e. x = t^2/l.
- int retryCount = liveColumnsInRow == 0 ? count + 1 : ((count * count) / liveColumnsInRow) + 1;
+ int retryCount = liveCountInRow == 0 ? count + 1 : ((count * count) / liveCountInRow) + 1;
SliceQueryFilter newFilter = filter.withUpdatedCount(retryCount);
return new RetriedSliceFromReadCommand(table, key, queryPath, newFilter, getOriginalRequestedCount());
}
@@ -98,35 +101,12 @@ public class SliceFromReadCommand extends ReadCommand
if ((row == null) || (row.cf == null))
return;
- int liveColumnsInRow = row.cf.getLiveColumnCount();
+ filter.trim(row.cf, getOriginalRequestedCount());
+ }
- if (liveColumnsInRow > getOriginalRequestedCount())
- {
- int columnsToTrim = liveColumnsInRow - getOriginalRequestedCount();
-
- logger.debug("trimming {} live columns to the originally requested {}", row.cf.getLiveColumnCount(), getOriginalRequestedCount());
-
- Collection<IColumn> columns;
- if (filter.reversed)
- columns = row.cf.getSortedColumns();
- else
- columns = row.cf.getReverseSortedColumns();
-
- Collection<ByteBuffer> toRemove = new HashSet<ByteBuffer>();
-
- Iterator<IColumn> columnIterator = columns.iterator();
- while (columnIterator.hasNext() && (toRemove.size() < columnsToTrim))
- {
- IColumn column = columnIterator.next();
- if (column.isLive())
- toRemove.add(column.name());
- }
-
- for (ByteBuffer columnName : toRemove)
- {
- row.cf.remove(columnName);
- }
- }
+ public IFilter filter()
+ {
+ return filter;
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/353309f0/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
index e5ab58f..e045bb5 100644
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
@@ -19,11 +19,7 @@ package org.apache.cassandra.db.filter;
import java.io.*;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
+import java.util.*;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Lists;
@@ -145,21 +141,14 @@ public class SliceQueryFilter implements IFilter
public void collectReducedColumns(IColumnContainer container, Iterator<IColumn> reducedColumns, int gcBefore)
{
- AbstractType<?> comparator = container.getComparator();
-
- if (compositesToGroup < 0)
- columnCounter = new ColumnCounter();
- else if (compositesToGroup == 0)
- columnCounter = new ColumnCounter.GroupByPrefix(null, 0);
- else
- columnCounter = new ColumnCounter.GroupByPrefix((CompositeType)comparator, compositesToGroup);
+ columnCounter = getColumnCounter(container);
while (reducedColumns.hasNext())
{
IColumn column = reducedColumns.next();
if (logger.isTraceEnabled())
logger.trace(String.format("collecting %s of %s: %s",
- columnCounter.live(), count, column.getString(comparator)));
+ columnCounter.live(), count, column.getString(container.getComparator())));
columnCounter.count(column, container);
@@ -175,6 +164,59 @@ public class SliceQueryFilter implements IFilter
}
}
+ public int getLiveCount(ColumnFamily cf)
+ {
+ ColumnCounter counter = getColumnCounter(cf);
+ for (IColumn column : cf)
+ counter.count(column, cf);
+ return counter.live();
+ }
+
+ private ColumnCounter getColumnCounter(IColumnContainer container)
+ {
+ AbstractType<?> comparator = container.getComparator();
+ if (compositesToGroup < 0)
+ return new ColumnCounter();
+ else if (compositesToGroup == 0)
+ return new ColumnCounter.GroupByPrefix(null, 0);
+ else
+ return new ColumnCounter.GroupByPrefix((CompositeType)comparator, compositesToGroup);
+ }
+
+ public void trim(ColumnFamily cf, int trimTo)
+ {
+ ColumnCounter counter = getColumnCounter(cf);
+
+ Collection<ByteBuffer> toRemove = null;
+ boolean trimRemaining = false;
+
+ Collection<IColumn> columns = reversed
+ ? cf.getReverseSortedColumns()
+ : cf.getSortedColumns();
+
+ for (IColumn column : columns)
+ {
+ if (trimRemaining)
+ {
+ toRemove.add(column.name());
+ continue;
+ }
+
+ counter.count(column, cf);
+ if (counter.live() > trimTo)
+ {
+ toRemove = new HashSet<ByteBuffer>();
+ toRemove.add(column.name());
+ trimRemaining = true;
+ }
+ }
+
+ for (ByteBuffer columnName : toRemove)
+ {
+ cf.remove(columnName);
+ }
+ }
+
public ByteBuffer start()
{
return this.slices[0].start;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/353309f0/src/java/org/apache/cassandra/service/AbstractRowResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractRowResolver.java b/src/java/org/apache/cassandra/service/AbstractRowResolver.java
index 472a4dc..bdffc0b 100644
--- a/src/java/org/apache/cassandra/service/AbstractRowResolver.java
+++ b/src/java/org/apache/cassandra/service/AbstractRowResolver.java
@@ -52,9 +52,4 @@ public abstract class AbstractRowResolver implements IResponseResolver<ReadRespo
{
return replies;
}
-
- public int getMaxLiveColumns()
- {
- throw new UnsupportedOperationException();
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/353309f0/src/java/org/apache/cassandra/service/IResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/IResponseResolver.java b/src/java/org/apache/cassandra/service/IResponseResolver.java
index 2f6a67e..4ac226f 100644
--- a/src/java/org/apache/cassandra/service/IResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/IResponseResolver.java
@@ -42,6 +42,4 @@ public interface IResponseResolver<TMessage, TResolved> {
public void preprocess(MessageIn<TMessage> message);
public Iterable<MessageIn<TMessage>> getMessages();
-
- public int getMaxLiveColumns();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/353309f0/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java b/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
index 456dff7..0d24fbf 100644
--- a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
@@ -122,11 +122,6 @@ public class RangeSliceResponseResolver implements IResponseResolver<RangeSliceR
return responses;
}
- public int getMaxLiveColumns()
- {
- throw new UnsupportedOperationException();
- }
-
private class Reducer extends MergeIterator.Reducer<Pair<Row,InetAddress>, Row>
{
List<ColumnFamily> versions = new ArrayList<ColumnFamily>(sources.size());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/353309f0/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index 1bfff2e..8df2e10 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -219,7 +219,7 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
logger.debug("Digest mismatch:", e);
ReadCommand readCommand = (ReadCommand) command;
- final RowRepairResolver repairResolver = new RowRepairResolver(readCommand.table, readCommand.key);
+ final RowRepairResolver repairResolver = new RowRepairResolver(readCommand.table, readCommand.key, readCommand.filter());
IAsyncCallback repairHandler = new AsyncRepairCallback(repairResolver, endpoints.size());
MessageOut<ReadCommand> message = ((ReadCommand) command).createMessage();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/353309f0/src/java/org/apache/cassandra/service/RepairCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RepairCallback.java b/src/java/org/apache/cassandra/service/RepairCallback.java
index b078d26..9388328 100644
--- a/src/java/org/apache/cassandra/service/RepairCallback.java
+++ b/src/java/org/apache/cassandra/service/RepairCallback.java
@@ -79,8 +79,8 @@ public class RepairCallback implements IAsyncCallback
return true;
}
- public int getMaxLiveColumns()
+ public int getMaxLiveCount()
{
- return resolver.getMaxLiveColumns();
+ return resolver.getMaxLiveCount();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/353309f0/src/java/org/apache/cassandra/service/RowRepairResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RowRepairResolver.java b/src/java/org/apache/cassandra/service/RowRepairResolver.java
index 5618cf0..2c6fe1e 100644
--- a/src/java/org/apache/cassandra/service/RowRepairResolver.java
+++ b/src/java/org/apache/cassandra/service/RowRepairResolver.java
@@ -28,8 +28,10 @@ import com.google.common.collect.Iterables;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.db.filter.IFilter;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
import org.apache.cassandra.net.IAsyncResult;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
@@ -39,12 +41,14 @@ import org.apache.cassandra.utils.FBUtilities;
public class RowRepairResolver extends AbstractRowResolver
{
- protected int maxLiveColumns = 0;
+ private int maxLiveCount = 0;
public List<IAsyncResult> repairResults = Collections.emptyList();
+ private final SliceQueryFilter filter; // can be null if names query
- public RowRepairResolver(String table, ByteBuffer key)
+ public RowRepairResolver(String table, ByteBuffer key, IFilter qFilter)
{
super(key, table);
+ this.filter = qFilter instanceof SliceQueryFilter ? (SliceQueryFilter)qFilter : null;
}
/*
@@ -74,10 +78,10 @@ public class RowRepairResolver extends AbstractRowResolver
versions.add(cf);
endpoints.add(message.from);
- // compute maxLiveColumns to prevent short reads -- see https://issues.apache.org/jira/browse/CASSANDRA-2643
- int liveColumns = cf == null ? 0 : cf.getLiveColumnCount();
- if (liveColumns > maxLiveColumns)
- maxLiveColumns = liveColumns;
+ // compute maxLiveCount to prevent short reads -- see https://issues.apache.org/jira/browse/CASSANDRA-2643
+ int liveCount = cf == null ? 0 : (filter == null ? cf.getLiveColumnCount() : filter.getLiveCount(cf));
+ if (liveCount > maxLiveCount)
+ maxLiveCount = liveCount;
}
resolved = resolveSuperset(versions);
@@ -170,8 +174,8 @@ public class RowRepairResolver extends AbstractRowResolver
throw new UnsupportedOperationException();
}
- public int getMaxLiveColumns()
+ public int getMaxLiveCount()
{
- return maxLiveColumns;
+ return maxLiveCount;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/353309f0/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index b08f5b8..4b700be 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -943,7 +943,7 @@ public class StorageProxy implements StorageProxyMBean
catch (DigestMismatchException ex)
{
logger.debug("Digest mismatch: {}", ex.toString());
- RowRepairResolver resolver = new RowRepairResolver(command.table, command.key);
+ RowRepairResolver resolver = new RowRepairResolver(command.table, command.key, command.filter());
RepairCallback repairHandler = new RepairCallback(resolver, handler.endpoints);
if (repairCommands == null)