You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2017/08/29 11:36:02 UTC

[3/6] cassandra git commit: Fix AssertionError in short read protection

Fix AssertionError in short read protection

patch by Aleksey Yeschenko; reviewed by Benedict Elliott Smith for
CASSANDRA-13747


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a7cb009f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a7cb009f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a7cb009f

Branch: refs/heads/trunk
Commit: a7cb009f8a3f4d0e0293111bfcfff3d404a37a89
Parents: dfbe3fa
Author: Aleksey Yeschenko <al...@yeschenko.com>
Authored: Sun Aug 6 19:42:47 2017 +0100
Committer: Aleksey Yeschenko <al...@yeschenko.com>
Committed: Tue Aug 29 12:22:39 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../UnfilteredPartitionIterators.java           |  6 ---
 .../db/transform/EmptyPartitionsDiscarder.java  | 35 +++++++++++++++
 .../apache/cassandra/db/transform/Filter.java   | 28 +++---------
 .../db/transform/FilteredPartitions.java        | 18 +++++---
 .../cassandra/db/transform/FilteredRows.java    |  2 +-
 .../apache/cassandra/metrics/TableMetrics.java  |  4 ++
 .../apache/cassandra/service/DataResolver.java  | 47 ++++++++++++++------
 8 files changed, 94 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5ccd5cd..6609b05 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.15
+ * Fix AssertionError in short read protection (CASSANDRA-13747)
  * Don't skip corrupted sstables on startup (CASSANDRA-13620)
  * Fix the merging of cells with different user type versions (CASSANDRA-13776)
  * Copy session properties on cqlsh.py do_login (CASSANDRA-13640)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index 1abbb19..4e0ac1b 100644
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -77,12 +77,6 @@ public abstract class UnfilteredPartitionIterators
         return Transformation.apply(toReturn, new Close());
     }
 
-    public static PartitionIterator mergeAndFilter(List<UnfilteredPartitionIterator> iterators, int nowInSec, MergeListener listener)
-    {
-        // TODO: we could have a somewhat faster version if we were to merge the UnfilteredRowIterators directly as RowIterators
-        return filter(merge(iterators, nowInSec, listener), nowInSec);
-    }
-
     public static PartitionIterator filter(final UnfilteredPartitionIterator iterator, final int nowInSec)
     {
         return FilteredPartitions.filter(iterator, nowInSec);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/transform/EmptyPartitionsDiscarder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/EmptyPartitionsDiscarder.java b/src/java/org/apache/cassandra/db/transform/EmptyPartitionsDiscarder.java
new file mode 100644
index 0000000..5e41cec
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/transform/EmptyPartitionsDiscarder.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.transform;
+
+import org.apache.cassandra.db.rows.BaseRowIterator;
+
+public final class EmptyPartitionsDiscarder extends Transformation<BaseRowIterator<?>>
+{
+    @Override
+    protected BaseRowIterator applyToPartition(BaseRowIterator iterator)
+    {
+        if (iterator.isEmpty())
+        {
+            iterator.close();
+            return null;
+        }
+
+        return iterator;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/transform/Filter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/Filter.java b/src/java/org/apache/cassandra/db/transform/Filter.java
index 48c8b1a..747983f 100644
--- a/src/java/org/apache/cassandra/db/transform/Filter.java
+++ b/src/java/org/apache/cassandra/db/transform/Filter.java
@@ -23,27 +23,21 @@ package org.apache.cassandra.db.transform;
 import org.apache.cassandra.db.DeletionPurger;
 import org.apache.cassandra.db.rows.*;
 
-final class Filter extends Transformation
+public final class Filter extends Transformation
 {
-    private final boolean filterEmpty; // generally maps to !isForThrift, but also false for direct row filtration
     private final int nowInSec;
-    public Filter(boolean filterEmpty, int nowInSec)
+
+    public Filter(int nowInSec)
     {
-        this.filterEmpty = filterEmpty;
         this.nowInSec = nowInSec;
     }
 
     @Override
     protected RowIterator applyToPartition(BaseRowIterator iterator)
     {
-        RowIterator filtered = iterator instanceof UnfilteredRows
-                               ? new FilteredRows(this, (UnfilteredRows) iterator)
-                               : new FilteredRows((UnfilteredRowIterator) iterator, this);
-
-        if (filterEmpty && closeIfEmpty(filtered))
-            return null;
-
-        return filtered;
+        return iterator instanceof UnfilteredRows
+             ? new FilteredRows(this, (UnfilteredRows) iterator)
+             : new FilteredRows((UnfilteredRowIterator) iterator, this);
     }
 
     @Override
@@ -67,14 +61,4 @@ final class Filter extends Transformation
     {
         return null;
     }
-
-    private static boolean closeIfEmpty(BaseRowIterator<?> iter)
-    {
-        if (iter.isEmpty())
-        {
-            iter.close();
-            return true;
-        }
-        return false;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java b/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
index 09e36b4..ad9446d 100644
--- a/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
+++ b/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
@@ -50,11 +50,19 @@ public final class FilteredPartitions extends BasePartitions<RowIterator, BasePa
     /**
      * Filter any RangeTombstoneMarker from the iterator's iterators, transforming it into a PartitionIterator.
      */
-    public static PartitionIterator filter(UnfilteredPartitionIterator iterator, int nowInSecs)
+    public static FilteredPartitions filter(UnfilteredPartitionIterator iterator, int nowInSecs)
     {
-        Filter filter = new Filter(!iterator.isForThrift(), nowInSecs);
-        if (iterator instanceof UnfilteredPartitions)
-            return new FilteredPartitions(filter, (UnfilteredPartitions) iterator);
-        return new FilteredPartitions(iterator, filter);
+        FilteredPartitions filtered = filter(iterator, new Filter(nowInSecs));
+
+        return iterator.isForThrift()
+             ? filtered
+             : (FilteredPartitions) Transformation.apply(filtered, new EmptyPartitionsDiscarder());
+    }
+
+    public static FilteredPartitions filter(UnfilteredPartitionIterator iterator, Filter filter)
+    {
+        return iterator instanceof UnfilteredPartitions
+             ? new FilteredPartitions(filter, (UnfilteredPartitions) iterator)
+             : new FilteredPartitions(iterator, filter);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/transform/FilteredRows.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/FilteredRows.java b/src/java/org/apache/cassandra/db/transform/FilteredRows.java
index 818d3bb..5b635eb 100644
--- a/src/java/org/apache/cassandra/db/transform/FilteredRows.java
+++ b/src/java/org/apache/cassandra/db/transform/FilteredRows.java
@@ -55,6 +55,6 @@ public final class FilteredRows extends BaseRows<Row, BaseRowIterator<?>> implem
      */
     public static RowIterator filter(UnfilteredRowIterator iterator, int nowInSecs)
     {
-        return new Filter(false, nowInSecs).applyToPartition(iterator);
+        return new Filter(nowInSecs).applyToPartition(iterator);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index a493836..fe88a63 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -151,6 +151,8 @@ public class TableMetrics
     public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Write");
     public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Range");
 
+    public final Meter shortReadProtectionRequests;
+
     public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;
     /**
      * stores metrics that will be rolled into a single global metric
@@ -645,6 +647,8 @@ public class TableMetrics
         casPrepare = new LatencyMetrics(factory, "CasPrepare", cfs.keyspace.metric.casPrepare);
         casPropose = new LatencyMetrics(factory, "CasPropose", cfs.keyspace.metric.casPropose);
         casCommit = new LatencyMetrics(factory, "CasCommit", cfs.keyspace.metric.casCommit);
+
+        shortReadProtectionRequests = Metrics.meter(factory.createMetricName("ShortReadProtectionRequests"));
     }
 
     public void updateSSTableIterated(int count)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 26b1b2a..72c4950 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -27,16 +27,13 @@ import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ClusteringIndexFilter;
 import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.transform.MoreRows;
-import org.apache.cassandra.db.transform.Transformation;
+import org.apache.cassandra.db.transform.*;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.net.*;
 import org.apache.cassandra.tracing.Tracing;
@@ -72,10 +69,29 @@ public class DataResolver extends ResponseResolver
             sources[i] = msg.from;
         }
 
-        // Even though every responses should honor the limit, we might have more than requested post reconciliation,
-        // so ensure we're respecting the limit.
+        /*
+         * Even though every response, individually, will honor the limit, it is possible that we will, after the merge,
+         * have more rows than the client requested. To make sure that we still conform to the original limit,
+         * we apply a top-level post-reconciliation counter to the merged partition iterator.
+         *
+         * Short read protection logic (ShortReadRowProtection.moreContents()) relies on this counter to be applied
+         * to the current partition to work. For this reason we have to apply the counter transformation before
+         * empty partition discard logic kicks in - for it will eagerly consume the iterator.
+         *
+         * That's why the order here is: 1) merge; 2) filter rows; 3) count; 4) discard empty partitions
+         *
+         * See CASSANDRA-13747 for more details.
+         */
+
         DataLimits.Counter counter = command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition());
-        return counter.applyTo(mergeWithShortReadProtection(iters, sources, counter));
+
+        UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters, sources, counter);
+        FilteredPartitions filtered = FilteredPartitions.filter(merged, new Filter(command.nowInSec()));
+        PartitionIterator counted = counter.applyTo(filtered);
+
+        return command.isForThrift()
+             ? counted
+             : Transformation.apply(counted, new EmptyPartitionsDiscarder());
     }
 
     public void compareResponses()
@@ -87,11 +103,13 @@ public class DataResolver extends ResponseResolver
         }
     }
 
-    private PartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results, InetAddress[] sources, DataLimits.Counter resultCounter)
+    private UnfilteredPartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results,
+                                                                     InetAddress[] sources,
+                                                                     DataLimits.Counter resultCounter)
     {
         // If we have only one results, there is no read repair to do and we can't get short reads
         if (results.size() == 1)
-            return UnfilteredPartitionIterators.filter(results.get(0), command.nowInSec());
+            return results.get(0);
 
         UnfilteredPartitionIterators.MergeListener listener = new RepairMergeListener(sources);
 
@@ -103,7 +121,7 @@ public class DataResolver extends ResponseResolver
                 results.set(i, Transformation.apply(results.get(i), new ShortReadProtection(sources[i], resultCounter)));
         }
 
-        return UnfilteredPartitionIterators.mergeAndFilter(results, command.nowInSec(), listener);
+        return UnfilteredPartitionIterators.merge(results, command.nowInSec(), listener);
     }
 
     private class RepairMergeListener implements UnfilteredPartitionIterators.MergeListener
@@ -510,7 +528,7 @@ public class DataResolver extends ResponseResolver
                 // counting iterator.
                 int n = postReconciliationCounter.countedInCurrentPartition();
                 int x = counter.countedInCurrentPartition();
-                int toQuery = Math.max(((n * n) / x) - n, 1);
+                int toQuery = Math.max(((n * n) / Math.max(x, 1)) - n, 1);
 
                 DataLimits retryLimits = command.limits().forShortReadRetry(toQuery);
                 ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey);
@@ -523,6 +541,9 @@ public class DataResolver extends ResponseResolver
                                                                                    partitionKey,
                                                                                    retryFilter);
 
+                Tracing.trace("Requesting {} extra rows from {} for short read protection", toQuery, source);
+                Schema.instance.getColumnFamilyStoreInstance(cmd.metadata().cfId).metric.shortReadProtectionRequests.mark();
+
                 return doShortReadRetry(cmd);
             }
 
@@ -531,7 +552,7 @@ public class DataResolver extends ResponseResolver
                 DataResolver resolver = new DataResolver(keyspace, retryCommand, ConsistencyLevel.ONE, 1);
                 ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source));
                 if (StorageProxy.canDoLocalRequest(source))
-                      StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler));
+                    StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler));
                 else
                     MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(MessagingService.current_version), source, handler);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org