You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2017/08/29 11:36:02 UTC
[3/6] cassandra git commit: Fix AssertionError in short read
protection
Fix AssertionError in short read protection
patch by Aleksey Yeschenko; reviewed by Benedict Elliott Smith for
CASSANDRA-13747
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a7cb009f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a7cb009f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a7cb009f
Branch: refs/heads/trunk
Commit: a7cb009f8a3f4d0e0293111bfcfff3d404a37a89
Parents: dfbe3fa
Author: Aleksey Yeschenko <al...@yeschenko.com>
Authored: Sun Aug 6 19:42:47 2017 +0100
Committer: Aleksey Yeschenko <al...@yeschenko.com>
Committed: Tue Aug 29 12:22:39 2017 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../UnfilteredPartitionIterators.java | 6 ---
.../db/transform/EmptyPartitionsDiscarder.java | 35 +++++++++++++++
.../apache/cassandra/db/transform/Filter.java | 28 +++---------
.../db/transform/FilteredPartitions.java | 18 +++++---
.../cassandra/db/transform/FilteredRows.java | 2 +-
.../apache/cassandra/metrics/TableMetrics.java | 4 ++
.../apache/cassandra/service/DataResolver.java | 47 ++++++++++++++------
8 files changed, 94 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5ccd5cd..6609b05 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.15
+ * Fix AssertionError in short read protection (CASSANDRA-13747)
* Don't skip corrupted sstables on startup (CASSANDRA-13620)
* Fix the merging of cells with different user type versions (CASSANDRA-13776)
* Copy session properties on cqlsh.py do_login (CASSANDRA-13640)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index 1abbb19..4e0ac1b 100644
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -77,12 +77,6 @@ public abstract class UnfilteredPartitionIterators
return Transformation.apply(toReturn, new Close());
}
- public static PartitionIterator mergeAndFilter(List<UnfilteredPartitionIterator> iterators, int nowInSec, MergeListener listener)
- {
- // TODO: we could have a somewhat faster version if we were to merge the UnfilteredRowIterators directly as RowIterators
- return filter(merge(iterators, nowInSec, listener), nowInSec);
- }
-
public static PartitionIterator filter(final UnfilteredPartitionIterator iterator, final int nowInSec)
{
return FilteredPartitions.filter(iterator, nowInSec);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/transform/EmptyPartitionsDiscarder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/EmptyPartitionsDiscarder.java b/src/java/org/apache/cassandra/db/transform/EmptyPartitionsDiscarder.java
new file mode 100644
index 0000000..5e41cec
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/transform/EmptyPartitionsDiscarder.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.transform;
+
+import org.apache.cassandra.db.rows.BaseRowIterator;
+
+public final class EmptyPartitionsDiscarder extends Transformation<BaseRowIterator<?>>
+{
+ @Override
+ protected BaseRowIterator applyToPartition(BaseRowIterator iterator)
+ {
+ if (iterator.isEmpty())
+ {
+ iterator.close();
+ return null;
+ }
+
+ return iterator;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/transform/Filter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/Filter.java b/src/java/org/apache/cassandra/db/transform/Filter.java
index 48c8b1a..747983f 100644
--- a/src/java/org/apache/cassandra/db/transform/Filter.java
+++ b/src/java/org/apache/cassandra/db/transform/Filter.java
@@ -23,27 +23,21 @@ package org.apache.cassandra.db.transform;
import org.apache.cassandra.db.DeletionPurger;
import org.apache.cassandra.db.rows.*;
-final class Filter extends Transformation
+public final class Filter extends Transformation
{
- private final boolean filterEmpty; // generally maps to !isForThrift, but also false for direct row filtration
private final int nowInSec;
- public Filter(boolean filterEmpty, int nowInSec)
+
+ public Filter(int nowInSec)
{
- this.filterEmpty = filterEmpty;
this.nowInSec = nowInSec;
}
@Override
protected RowIterator applyToPartition(BaseRowIterator iterator)
{
- RowIterator filtered = iterator instanceof UnfilteredRows
- ? new FilteredRows(this, (UnfilteredRows) iterator)
- : new FilteredRows((UnfilteredRowIterator) iterator, this);
-
- if (filterEmpty && closeIfEmpty(filtered))
- return null;
-
- return filtered;
+ return iterator instanceof UnfilteredRows
+ ? new FilteredRows(this, (UnfilteredRows) iterator)
+ : new FilteredRows((UnfilteredRowIterator) iterator, this);
}
@Override
@@ -67,14 +61,4 @@ final class Filter extends Transformation
{
return null;
}
-
- private static boolean closeIfEmpty(BaseRowIterator<?> iter)
- {
- if (iter.isEmpty())
- {
- iter.close();
- return true;
- }
- return false;
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java b/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
index 09e36b4..ad9446d 100644
--- a/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
+++ b/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
@@ -50,11 +50,19 @@ public final class FilteredPartitions extends BasePartitions<RowIterator, BasePa
/**
* Filter any RangeTombstoneMarker from the iterator's iterators, transforming it into a PartitionIterator.
*/
- public static PartitionIterator filter(UnfilteredPartitionIterator iterator, int nowInSecs)
+ public static FilteredPartitions filter(UnfilteredPartitionIterator iterator, int nowInSecs)
{
- Filter filter = new Filter(!iterator.isForThrift(), nowInSecs);
- if (iterator instanceof UnfilteredPartitions)
- return new FilteredPartitions(filter, (UnfilteredPartitions) iterator);
- return new FilteredPartitions(iterator, filter);
+ FilteredPartitions filtered = filter(iterator, new Filter(nowInSecs));
+
+ return iterator.isForThrift()
+ ? filtered
+ : (FilteredPartitions) Transformation.apply(filtered, new EmptyPartitionsDiscarder());
+ }
+
+ public static FilteredPartitions filter(UnfilteredPartitionIterator iterator, Filter filter)
+ {
+ return iterator instanceof UnfilteredPartitions
+ ? new FilteredPartitions(filter, (UnfilteredPartitions) iterator)
+ : new FilteredPartitions(iterator, filter);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/transform/FilteredRows.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/FilteredRows.java b/src/java/org/apache/cassandra/db/transform/FilteredRows.java
index 818d3bb..5b635eb 100644
--- a/src/java/org/apache/cassandra/db/transform/FilteredRows.java
+++ b/src/java/org/apache/cassandra/db/transform/FilteredRows.java
@@ -55,6 +55,6 @@ public final class FilteredRows extends BaseRows<Row, BaseRowIterator<?>> implem
*/
public static RowIterator filter(UnfilteredRowIterator iterator, int nowInSecs)
{
- return new Filter(false, nowInSecs).applyToPartition(iterator);
+ return new Filter(nowInSecs).applyToPartition(iterator);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index a493836..fe88a63 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -151,6 +151,8 @@ public class TableMetrics
public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Write");
public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Range");
+ public final Meter shortReadProtectionRequests;
+
public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;
/**
* stores metrics that will be rolled into a single global metric
@@ -645,6 +647,8 @@ public class TableMetrics
casPrepare = new LatencyMetrics(factory, "CasPrepare", cfs.keyspace.metric.casPrepare);
casPropose = new LatencyMetrics(factory, "CasPropose", cfs.keyspace.metric.casPropose);
casCommit = new LatencyMetrics(factory, "CasCommit", cfs.keyspace.metric.casCommit);
+
+ shortReadProtectionRequests = Metrics.meter(factory.createMetricName("ShortReadProtectionRequests"));
}
public void updateSSTableIterated(int count)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 26b1b2a..72c4950 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -27,16 +27,13 @@ import com.google.common.collect.Iterables;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.*;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.ClusteringIndexFilter;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.transform.MoreRows;
-import org.apache.cassandra.db.transform.Transformation;
+import org.apache.cassandra.db.transform.*;
import org.apache.cassandra.exceptions.ReadTimeoutException;
import org.apache.cassandra.net.*;
import org.apache.cassandra.tracing.Tracing;
@@ -72,10 +69,29 @@ public class DataResolver extends ResponseResolver
sources[i] = msg.from;
}
- // Even though every responses should honor the limit, we might have more than requested post reconciliation,
- // so ensure we're respecting the limit.
+ /*
+ * Even though every response, individually, will honor the limit, it is possible that we will, after the merge,
+ * have more rows than the client requested. To make sure that we still conform to the original limit,
+ * we apply a top-level post-reconciliation counter to the merged partition iterator.
+ *
+ * Short read protection logic (ShortReadRowProtection.moreContents()) relies on this counter to be applied
+ * to the current partition to work. For this reason we have to apply the counter transformation before
+ * empty partition discard logic kicks in - for it will eagerly consume the iterator.
+ *
+ * That's why the order here is: 1) merge; 2) filter rows; 3) count; 4) discard empty partitions
+ *
+ * See CASSANDRA-13747 for more details.
+ */
+
DataLimits.Counter counter = command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition());
- return counter.applyTo(mergeWithShortReadProtection(iters, sources, counter));
+
+ UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters, sources, counter);
+ FilteredPartitions filtered = FilteredPartitions.filter(merged, new Filter(command.nowInSec()));
+ PartitionIterator counted = counter.applyTo(filtered);
+
+ return command.isForThrift()
+ ? counted
+ : Transformation.apply(counted, new EmptyPartitionsDiscarder());
}
public void compareResponses()
@@ -87,11 +103,13 @@ public class DataResolver extends ResponseResolver
}
}
- private PartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results, InetAddress[] sources, DataLimits.Counter resultCounter)
+ private UnfilteredPartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results,
+ InetAddress[] sources,
+ DataLimits.Counter resultCounter)
{
// If we have only one results, there is no read repair to do and we can't get short reads
if (results.size() == 1)
- return UnfilteredPartitionIterators.filter(results.get(0), command.nowInSec());
+ return results.get(0);
UnfilteredPartitionIterators.MergeListener listener = new RepairMergeListener(sources);
@@ -103,7 +121,7 @@ public class DataResolver extends ResponseResolver
results.set(i, Transformation.apply(results.get(i), new ShortReadProtection(sources[i], resultCounter)));
}
- return UnfilteredPartitionIterators.mergeAndFilter(results, command.nowInSec(), listener);
+ return UnfilteredPartitionIterators.merge(results, command.nowInSec(), listener);
}
private class RepairMergeListener implements UnfilteredPartitionIterators.MergeListener
@@ -510,7 +528,7 @@ public class DataResolver extends ResponseResolver
// counting iterator.
int n = postReconciliationCounter.countedInCurrentPartition();
int x = counter.countedInCurrentPartition();
- int toQuery = Math.max(((n * n) / x) - n, 1);
+ int toQuery = Math.max(((n * n) / Math.max(x, 1)) - n, 1);
DataLimits retryLimits = command.limits().forShortReadRetry(toQuery);
ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey);
@@ -523,6 +541,9 @@ public class DataResolver extends ResponseResolver
partitionKey,
retryFilter);
+ Tracing.trace("Requesting {} extra rows from {} for short read protection", toQuery, source);
+ Schema.instance.getColumnFamilyStoreInstance(cmd.metadata().cfId).metric.shortReadProtectionRequests.mark();
+
return doShortReadRetry(cmd);
}
@@ -531,7 +552,7 @@ public class DataResolver extends ResponseResolver
DataResolver resolver = new DataResolver(keyspace, retryCommand, ConsistencyLevel.ONE, 1);
ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source));
if (StorageProxy.canDoLocalRequest(source))
- StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler));
+ StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler));
else
MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(MessagingService.current_version), source, handler);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org