You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/10/31 18:49:41 UTC

git commit: Fix short read protection for CQL3

Updated Branches:
  refs/heads/trunk dc37dea74 -> 353309f01


Fix short read protection for CQL3

patch by slebresne; reviewed by jbellis for CASSANDRA-4882


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/353309f0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/353309f0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/353309f0

Branch: refs/heads/trunk
Commit: 353309f01313c2f7fd7ce882444b175dcd62baef
Parents: dc37dea
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Oct 31 18:47:35 2012 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Oct 31 18:49:34 2012 +0100

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 src/java/org/apache/cassandra/db/ReadCommand.java  |    3 +
 .../cassandra/db/SliceByNamesReadCommand.java      |    5 +
 .../apache/cassandra/db/SliceFromReadCommand.java  |   46 +++-------
 .../cassandra/db/filter/SliceQueryFilter.java      |   70 ++++++++++++---
 .../cassandra/service/AbstractRowResolver.java     |    5 -
 .../cassandra/service/IResponseResolver.java       |    2 -
 .../service/RangeSliceResponseResolver.java        |    5 -
 .../org/apache/cassandra/service/ReadCallback.java |    2 +-
 .../apache/cassandra/service/RepairCallback.java   |    4 +-
 .../cassandra/service/RowRepairResolver.java       |   20 +++--
 .../org/apache/cassandra/service/StorageProxy.java |    2 +-
 12 files changed, 94 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/353309f0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7f4a728..0bb732b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -46,6 +46,7 @@
  * Remove system tables accounting from schema (CASSANDRA-4850)
  * Force provided columns in clustering key order in 'CLUSTERING ORDER BY' (CASSANDRA-4881)
  * Fix composite index bug (CASSANDRA-4884)
+ * Fix short read protection for CQL3 (CASSANDRA-4882)
 Merged from 1.1:
  * add get[Row|Key]CacheEntries to CacheServiceMBean (CASSANDRA-4859)
  * fix get_paged_slice to wrap to next row correctly (CASSANDRA-4816)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/353309f0/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 74d8fba..0a73909 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.filter.IFilter;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.IVersionedSerializer;
@@ -79,6 +80,8 @@ public abstract class ReadCommand implements IReadCommand
 
     public abstract Row getRow(Table table) throws IOException;
 
+    public abstract IFilter filter();
+
     protected AbstractType<?> getComparator()
     {
         return ColumnFamily.getComparatorFor(table, getColumnFamilyName(), queryPath.superColumnName);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/353309f0/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
index 4ca62c8..ba149b7 100644
--- a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
@@ -74,6 +74,11 @@ public class SliceByNamesReadCommand extends ReadCommand
                ", filter=" + filter +
                ')';
     }
+
+    public IFilter filter()
+    {
+        return filter;
+    }
 }
 
 class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadCommand>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/353309f0/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
index 2c8444f..3fa0b51 100644
--- a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
@@ -71,20 +71,23 @@ public class SliceFromReadCommand extends ReadCommand
     @Override
     public ReadCommand maybeGenerateRetryCommand(RepairCallback handler, Row row)
     {
-        int maxLiveColumns = handler.getMaxLiveColumns();
-        int liveColumnsInRow = row != null ? row.getLiveColumnCount() : 0;
+        int maxLiveColumns = handler.getMaxLiveCount();
 
         int count = filter.count;
         assert maxLiveColumns <= count;
         // We generate a retry if at least one node reply with count live columns but after merge we have less
         // than the total number of column we are interested in (which may be < count on a retry)
-        if ((maxLiveColumns == count) && (liveColumnsInRow < getOriginalRequestedCount()))
+        if (maxLiveColumns != count)
+            return null;
+
+        int liveCountInRow = row == null || row.cf == null ? 0 : filter.getLiveCount(row.cf);
+        if (liveCountInRow < getOriginalRequestedCount())
         {
-            // We asked t (= count) live columns and got l (=liveColumnsInRow) ones.
+            // We asked t (= count) live columns and got l (=liveCountInRow) ones.
             // From that, we can estimate that on this row, for x requested
             // columns, only l/t end up live after reconciliation. So for next
             // round we want to ask x column so that x * (l/t) == t, i.e. x = t^2/l.
-            int retryCount = liveColumnsInRow == 0 ? count + 1 : ((count * count) / liveColumnsInRow) + 1;
+            int retryCount = liveCountInRow == 0 ? count + 1 : ((count * count) / liveCountInRow) + 1;
             SliceQueryFilter newFilter = filter.withUpdatedCount(retryCount);
             return new RetriedSliceFromReadCommand(table, key, queryPath, newFilter, getOriginalRequestedCount());
         }
@@ -98,35 +101,12 @@ public class SliceFromReadCommand extends ReadCommand
         if ((row == null) || (row.cf == null))
             return;
 
-        int liveColumnsInRow = row.cf.getLiveColumnCount();
+        filter.trim(row.cf, getOriginalRequestedCount());
+    }
 
-        if (liveColumnsInRow > getOriginalRequestedCount())
-        {
-            int columnsToTrim = liveColumnsInRow - getOriginalRequestedCount();
-
-            logger.debug("trimming {} live columns to the originally requested {}", row.cf.getLiveColumnCount(), getOriginalRequestedCount());
-
-            Collection<IColumn> columns;
-            if (filter.reversed)
-                columns = row.cf.getSortedColumns();
-            else
-                columns = row.cf.getReverseSortedColumns();
-
-            Collection<ByteBuffer> toRemove = new HashSet<ByteBuffer>();
-
-            Iterator<IColumn> columnIterator = columns.iterator();
-            while (columnIterator.hasNext() && (toRemove.size() < columnsToTrim))
-            {
-                IColumn column = columnIterator.next();
-                if (column.isLive())
-                    toRemove.add(column.name());
-            }
-
-            for (ByteBuffer columnName : toRemove)
-            {
-                row.cf.remove(columnName);
-            }
-        }
+    public IFilter filter()
+    {
+        return filter;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/353309f0/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
index e5ab58f..e045bb5 100644
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
@@ -19,11 +19,7 @@ package org.apache.cassandra.db.filter;
 
 import java.io.*;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
+import java.util.*;
 
 import com.google.common.collect.AbstractIterator;
 import com.google.common.collect.Lists;
@@ -145,21 +141,14 @@ public class SliceQueryFilter implements IFilter
 
     public void collectReducedColumns(IColumnContainer container, Iterator<IColumn> reducedColumns, int gcBefore)
     {
-        AbstractType<?> comparator = container.getComparator();
-
-        if (compositesToGroup < 0)
-            columnCounter = new ColumnCounter();
-        else if (compositesToGroup == 0)
-            columnCounter = new ColumnCounter.GroupByPrefix(null, 0);
-        else
-            columnCounter = new ColumnCounter.GroupByPrefix((CompositeType)comparator, compositesToGroup);
+        columnCounter = getColumnCounter(container);
 
         while (reducedColumns.hasNext())
         {
             IColumn column = reducedColumns.next();
             if (logger.isTraceEnabled())
                 logger.trace(String.format("collecting %s of %s: %s",
-                                           columnCounter.live(), count, column.getString(comparator)));
+                                           columnCounter.live(), count, column.getString(container.getComparator())));
 
             columnCounter.count(column, container);
 
@@ -175,6 +164,59 @@ public class SliceQueryFilter implements IFilter
         }
     }
 
+    public int getLiveCount(ColumnFamily cf)
+    {
+        ColumnCounter counter = getColumnCounter(cf);
+        for (IColumn column : cf)
+            counter.count(column, cf);
+        return counter.live();
+    }
+
+    private ColumnCounter getColumnCounter(IColumnContainer container)
+    {
+        AbstractType<?> comparator = container.getComparator();
+        if (compositesToGroup < 0)
+            return new ColumnCounter();
+        else if (compositesToGroup == 0)
+            return new ColumnCounter.GroupByPrefix(null, 0);
+        else
+            return new ColumnCounter.GroupByPrefix((CompositeType)comparator, compositesToGroup);
+    }
+
+    public void trim(ColumnFamily cf, int trimTo)
+    {
+        ColumnCounter counter = getColumnCounter(cf);
+
+        Collection<ByteBuffer> toRemove = null;
+        boolean trimRemaining = false;
+
+        Collection<IColumn> columns = reversed
+                                    ? cf.getReverseSortedColumns()
+                                    : cf.getSortedColumns();
+
+        for (IColumn column : columns)
+        {
+            if (trimRemaining)
+            {
+                toRemove.add(column.name());
+                continue;
+            }
+
+            counter.count(column, cf);
+            if (counter.live() > trimTo)
+            {
+                toRemove = new HashSet<ByteBuffer>();
+                toRemove.add(column.name());
+                trimRemaining = true;
+            }
+        }
+
+        for (ByteBuffer columnName : toRemove)
+        {
+            cf.remove(columnName);
+        }
+    }
+
     public ByteBuffer start()
     {
         return this.slices[0].start;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/353309f0/src/java/org/apache/cassandra/service/AbstractRowResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractRowResolver.java b/src/java/org/apache/cassandra/service/AbstractRowResolver.java
index 472a4dc..bdffc0b 100644
--- a/src/java/org/apache/cassandra/service/AbstractRowResolver.java
+++ b/src/java/org/apache/cassandra/service/AbstractRowResolver.java
@@ -52,9 +52,4 @@ public abstract class AbstractRowResolver implements IResponseResolver<ReadRespo
     {
         return replies;
     }
-
-    public int getMaxLiveColumns()
-    {
-        throw new UnsupportedOperationException();
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/353309f0/src/java/org/apache/cassandra/service/IResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/IResponseResolver.java b/src/java/org/apache/cassandra/service/IResponseResolver.java
index 2f6a67e..4ac226f 100644
--- a/src/java/org/apache/cassandra/service/IResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/IResponseResolver.java
@@ -42,6 +42,4 @@ public interface IResponseResolver<TMessage, TResolved> {
 
     public void preprocess(MessageIn<TMessage> message);
     public Iterable<MessageIn<TMessage>> getMessages();
-
-    public int getMaxLiveColumns();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/353309f0/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java b/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
index 456dff7..0d24fbf 100644
--- a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
@@ -122,11 +122,6 @@ public class RangeSliceResponseResolver implements IResponseResolver<RangeSliceR
         return responses;
     }
 
-    public int getMaxLiveColumns()
-    {
-        throw new UnsupportedOperationException();
-    }
-
     private class Reducer extends MergeIterator.Reducer<Pair<Row,InetAddress>, Row>
     {
         List<ColumnFamily> versions = new ArrayList<ColumnFamily>(sources.size());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/353309f0/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index 1bfff2e..8df2e10 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -219,7 +219,7 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
                     logger.debug("Digest mismatch:", e);
 
                 ReadCommand readCommand = (ReadCommand) command;
-                final RowRepairResolver repairResolver = new RowRepairResolver(readCommand.table, readCommand.key);
+                final RowRepairResolver repairResolver = new RowRepairResolver(readCommand.table, readCommand.key, readCommand.filter());
                 IAsyncCallback repairHandler = new AsyncRepairCallback(repairResolver, endpoints.size());
 
                 MessageOut<ReadCommand> message = ((ReadCommand) command).createMessage();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/353309f0/src/java/org/apache/cassandra/service/RepairCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RepairCallback.java b/src/java/org/apache/cassandra/service/RepairCallback.java
index b078d26..9388328 100644
--- a/src/java/org/apache/cassandra/service/RepairCallback.java
+++ b/src/java/org/apache/cassandra/service/RepairCallback.java
@@ -79,8 +79,8 @@ public class RepairCallback implements IAsyncCallback
         return true;
     }
 
-    public int getMaxLiveColumns()
+    public int getMaxLiveCount()
     {
-        return resolver.getMaxLiveColumns();
+        return resolver.getMaxLiveCount();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/353309f0/src/java/org/apache/cassandra/service/RowRepairResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RowRepairResolver.java b/src/java/org/apache/cassandra/service/RowRepairResolver.java
index 5618cf0..2c6fe1e 100644
--- a/src/java/org/apache/cassandra/service/RowRepairResolver.java
+++ b/src/java/org/apache/cassandra/service/RowRepairResolver.java
@@ -28,8 +28,10 @@ import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.db.filter.IFilter;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
 import org.apache.cassandra.net.IAsyncResult;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
@@ -39,12 +41,14 @@ import org.apache.cassandra.utils.FBUtilities;
 
 public class RowRepairResolver extends AbstractRowResolver
 {
-    protected int maxLiveColumns = 0;
+    private int maxLiveCount = 0;
     public List<IAsyncResult> repairResults = Collections.emptyList();
+    private final SliceQueryFilter filter; // can be null if names query
 
-    public RowRepairResolver(String table, ByteBuffer key)
+    public RowRepairResolver(String table, ByteBuffer key, IFilter qFilter)
     {
         super(key, table);
+        this.filter = qFilter instanceof SliceQueryFilter ? (SliceQueryFilter)qFilter : null;
     }
 
     /*
@@ -74,10 +78,10 @@ public class RowRepairResolver extends AbstractRowResolver
                 versions.add(cf);
                 endpoints.add(message.from);
 
-                // compute maxLiveColumns to prevent short reads -- see https://issues.apache.org/jira/browse/CASSANDRA-2643
-                int liveColumns = cf == null ? 0 : cf.getLiveColumnCount();
-                if (liveColumns > maxLiveColumns)
-                    maxLiveColumns = liveColumns;
+                // compute maxLiveCount to prevent short reads -- see https://issues.apache.org/jira/browse/CASSANDRA-2643
+                int liveCount = cf == null ? 0 : (filter == null ? cf.getLiveColumnCount() : filter.getLiveCount(cf));
+                if (liveCount > maxLiveCount)
+                    maxLiveCount = liveCount;
             }
 
             resolved = resolveSuperset(versions);
@@ -170,8 +174,8 @@ public class RowRepairResolver extends AbstractRowResolver
         throw new UnsupportedOperationException();
     }
 
-    public int getMaxLiveColumns()
+    public int getMaxLiveCount()
     {
-        return maxLiveColumns;
+        return maxLiveCount;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/353309f0/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index b08f5b8..4b700be 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -943,7 +943,7 @@ public class StorageProxy implements StorageProxyMBean
                 catch (DigestMismatchException ex)
                 {
                     logger.debug("Digest mismatch: {}", ex.toString());
-                    RowRepairResolver resolver = new RowRepairResolver(command.table, command.key);
+                    RowRepairResolver resolver = new RowRepairResolver(command.table, command.key, command.filter());
                     RepairCallback repairHandler = new RepairCallback(resolver, handler.endpoints);
 
                     if (repairCommands == null)