You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2017/08/29 11:36:00 UTC
[1/6] cassandra git commit: Fix AssertionError in short read
protection
Repository: cassandra
Updated Branches:
refs/heads/cassandra-3.0 dfbe3fabd -> a7cb009f8
refs/heads/cassandra-3.11 809f3b30e -> 826ae9c91
refs/heads/trunk 326f3a7c7 -> 278906c6c
Fix AssertionError in short read protection
patch by Aleksey Yeschenko; reviewed by Benedict Elliott Smith for
CASSANDRA-13747
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a7cb009f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a7cb009f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a7cb009f
Branch: refs/heads/cassandra-3.0
Commit: a7cb009f8a3f4d0e0293111bfcfff3d404a37a89
Parents: dfbe3fa
Author: Aleksey Yeschenko <al...@yeschenko.com>
Authored: Sun Aug 6 19:42:47 2017 +0100
Committer: Aleksey Yeschenko <al...@yeschenko.com>
Committed: Tue Aug 29 12:22:39 2017 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../UnfilteredPartitionIterators.java | 6 ---
.../db/transform/EmptyPartitionsDiscarder.java | 35 +++++++++++++++
.../apache/cassandra/db/transform/Filter.java | 28 +++---------
.../db/transform/FilteredPartitions.java | 18 +++++---
.../cassandra/db/transform/FilteredRows.java | 2 +-
.../apache/cassandra/metrics/TableMetrics.java | 4 ++
.../apache/cassandra/service/DataResolver.java | 47 ++++++++++++++------
8 files changed, 94 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5ccd5cd..6609b05 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.15
+ * Fix AssertionError in short read protection (CASSANDRA-13747)
* Don't skip corrupted sstables on startup (CASSANDRA-13620)
* Fix the merging of cells with different user type versions (CASSANDRA-13776)
* Copy session properties on cqlsh.py do_login (CASSANDRA-13640)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index 1abbb19..4e0ac1b 100644
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -77,12 +77,6 @@ public abstract class UnfilteredPartitionIterators
return Transformation.apply(toReturn, new Close());
}
- public static PartitionIterator mergeAndFilter(List<UnfilteredPartitionIterator> iterators, int nowInSec, MergeListener listener)
- {
- // TODO: we could have a somewhat faster version if we were to merge the UnfilteredRowIterators directly as RowIterators
- return filter(merge(iterators, nowInSec, listener), nowInSec);
- }
-
public static PartitionIterator filter(final UnfilteredPartitionIterator iterator, final int nowInSec)
{
return FilteredPartitions.filter(iterator, nowInSec);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/transform/EmptyPartitionsDiscarder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/EmptyPartitionsDiscarder.java b/src/java/org/apache/cassandra/db/transform/EmptyPartitionsDiscarder.java
new file mode 100644
index 0000000..5e41cec
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/transform/EmptyPartitionsDiscarder.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.transform;
+
+import org.apache.cassandra.db.rows.BaseRowIterator;
+
+public final class EmptyPartitionsDiscarder extends Transformation<BaseRowIterator<?>>
+{
+ @Override
+ protected BaseRowIterator applyToPartition(BaseRowIterator iterator)
+ {
+ if (iterator.isEmpty())
+ {
+ iterator.close();
+ return null;
+ }
+
+ return iterator;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/transform/Filter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/Filter.java b/src/java/org/apache/cassandra/db/transform/Filter.java
index 48c8b1a..747983f 100644
--- a/src/java/org/apache/cassandra/db/transform/Filter.java
+++ b/src/java/org/apache/cassandra/db/transform/Filter.java
@@ -23,27 +23,21 @@ package org.apache.cassandra.db.transform;
import org.apache.cassandra.db.DeletionPurger;
import org.apache.cassandra.db.rows.*;
-final class Filter extends Transformation
+public final class Filter extends Transformation
{
- private final boolean filterEmpty; // generally maps to !isForThrift, but also false for direct row filtration
private final int nowInSec;
- public Filter(boolean filterEmpty, int nowInSec)
+
+ public Filter(int nowInSec)
{
- this.filterEmpty = filterEmpty;
this.nowInSec = nowInSec;
}
@Override
protected RowIterator applyToPartition(BaseRowIterator iterator)
{
- RowIterator filtered = iterator instanceof UnfilteredRows
- ? new FilteredRows(this, (UnfilteredRows) iterator)
- : new FilteredRows((UnfilteredRowIterator) iterator, this);
-
- if (filterEmpty && closeIfEmpty(filtered))
- return null;
-
- return filtered;
+ return iterator instanceof UnfilteredRows
+ ? new FilteredRows(this, (UnfilteredRows) iterator)
+ : new FilteredRows((UnfilteredRowIterator) iterator, this);
}
@Override
@@ -67,14 +61,4 @@ final class Filter extends Transformation
{
return null;
}
-
- private static boolean closeIfEmpty(BaseRowIterator<?> iter)
- {
- if (iter.isEmpty())
- {
- iter.close();
- return true;
- }
- return false;
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java b/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
index 09e36b4..ad9446d 100644
--- a/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
+++ b/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
@@ -50,11 +50,19 @@ public final class FilteredPartitions extends BasePartitions<RowIterator, BasePa
/**
* Filter any RangeTombstoneMarker from the iterator's iterators, transforming it into a PartitionIterator.
*/
- public static PartitionIterator filter(UnfilteredPartitionIterator iterator, int nowInSecs)
+ public static FilteredPartitions filter(UnfilteredPartitionIterator iterator, int nowInSecs)
{
- Filter filter = new Filter(!iterator.isForThrift(), nowInSecs);
- if (iterator instanceof UnfilteredPartitions)
- return new FilteredPartitions(filter, (UnfilteredPartitions) iterator);
- return new FilteredPartitions(iterator, filter);
+ FilteredPartitions filtered = filter(iterator, new Filter(nowInSecs));
+
+ return iterator.isForThrift()
+ ? filtered
+ : (FilteredPartitions) Transformation.apply(filtered, new EmptyPartitionsDiscarder());
+ }
+
+ public static FilteredPartitions filter(UnfilteredPartitionIterator iterator, Filter filter)
+ {
+ return iterator instanceof UnfilteredPartitions
+ ? new FilteredPartitions(filter, (UnfilteredPartitions) iterator)
+ : new FilteredPartitions(iterator, filter);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/transform/FilteredRows.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/FilteredRows.java b/src/java/org/apache/cassandra/db/transform/FilteredRows.java
index 818d3bb..5b635eb 100644
--- a/src/java/org/apache/cassandra/db/transform/FilteredRows.java
+++ b/src/java/org/apache/cassandra/db/transform/FilteredRows.java
@@ -55,6 +55,6 @@ public final class FilteredRows extends BaseRows<Row, BaseRowIterator<?>> implem
*/
public static RowIterator filter(UnfilteredRowIterator iterator, int nowInSecs)
{
- return new Filter(false, nowInSecs).applyToPartition(iterator);
+ return new Filter(nowInSecs).applyToPartition(iterator);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index a493836..fe88a63 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -151,6 +151,8 @@ public class TableMetrics
public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Write");
public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Range");
+ public final Meter shortReadProtectionRequests;
+
public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;
/**
* stores metrics that will be rolled into a single global metric
@@ -645,6 +647,8 @@ public class TableMetrics
casPrepare = new LatencyMetrics(factory, "CasPrepare", cfs.keyspace.metric.casPrepare);
casPropose = new LatencyMetrics(factory, "CasPropose", cfs.keyspace.metric.casPropose);
casCommit = new LatencyMetrics(factory, "CasCommit", cfs.keyspace.metric.casCommit);
+
+ shortReadProtectionRequests = Metrics.meter(factory.createMetricName("ShortReadProtectionRequests"));
}
public void updateSSTableIterated(int count)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 26b1b2a..72c4950 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -27,16 +27,13 @@ import com.google.common.collect.Iterables;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.*;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.ClusteringIndexFilter;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.transform.MoreRows;
-import org.apache.cassandra.db.transform.Transformation;
+import org.apache.cassandra.db.transform.*;
import org.apache.cassandra.exceptions.ReadTimeoutException;
import org.apache.cassandra.net.*;
import org.apache.cassandra.tracing.Tracing;
@@ -72,10 +69,29 @@ public class DataResolver extends ResponseResolver
sources[i] = msg.from;
}
- // Even though every responses should honor the limit, we might have more than requested post reconciliation,
- // so ensure we're respecting the limit.
+ /*
+ * Even though every response, individually, will honor the limit, it is possible that we will, after the merge,
+ * have more rows than the client requested. To make sure that we still conform to the original limit,
+ * we apply a top-level post-reconciliation counter to the merged partition iterator.
+ *
+ * Short read protection logic (ShortReadRowProtection.moreContents()) relies on this counter to be applied
+ * to the current partition to work. For this reason we have to apply the counter transformation before
+ * empty partition discard logic kicks in - for it will eagerly consume the iterator.
+ *
+ * That's why the order here is: 1) merge; 2) filter rows; 3) count; 4) discard empty partitions
+ *
+ * See CASSANDRA-13747 for more details.
+ */
+
DataLimits.Counter counter = command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition());
- return counter.applyTo(mergeWithShortReadProtection(iters, sources, counter));
+
+ UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters, sources, counter);
+ FilteredPartitions filtered = FilteredPartitions.filter(merged, new Filter(command.nowInSec()));
+ PartitionIterator counted = counter.applyTo(filtered);
+
+ return command.isForThrift()
+ ? counted
+ : Transformation.apply(counted, new EmptyPartitionsDiscarder());
}
public void compareResponses()
@@ -87,11 +103,13 @@ public class DataResolver extends ResponseResolver
}
}
- private PartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results, InetAddress[] sources, DataLimits.Counter resultCounter)
+ private UnfilteredPartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results,
+ InetAddress[] sources,
+ DataLimits.Counter resultCounter)
{
// If we have only one results, there is no read repair to do and we can't get short reads
if (results.size() == 1)
- return UnfilteredPartitionIterators.filter(results.get(0), command.nowInSec());
+ return results.get(0);
UnfilteredPartitionIterators.MergeListener listener = new RepairMergeListener(sources);
@@ -103,7 +121,7 @@ public class DataResolver extends ResponseResolver
results.set(i, Transformation.apply(results.get(i), new ShortReadProtection(sources[i], resultCounter)));
}
- return UnfilteredPartitionIterators.mergeAndFilter(results, command.nowInSec(), listener);
+ return UnfilteredPartitionIterators.merge(results, command.nowInSec(), listener);
}
private class RepairMergeListener implements UnfilteredPartitionIterators.MergeListener
@@ -510,7 +528,7 @@ public class DataResolver extends ResponseResolver
// counting iterator.
int n = postReconciliationCounter.countedInCurrentPartition();
int x = counter.countedInCurrentPartition();
- int toQuery = Math.max(((n * n) / x) - n, 1);
+ int toQuery = Math.max(((n * n) / Math.max(x, 1)) - n, 1);
DataLimits retryLimits = command.limits().forShortReadRetry(toQuery);
ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey);
@@ -523,6 +541,9 @@ public class DataResolver extends ResponseResolver
partitionKey,
retryFilter);
+ Tracing.trace("Requesting {} extra rows from {} for short read protection", toQuery, source);
+ Schema.instance.getColumnFamilyStoreInstance(cmd.metadata().cfId).metric.shortReadProtectionRequests.mark();
+
return doShortReadRetry(cmd);
}
@@ -531,7 +552,7 @@ public class DataResolver extends ResponseResolver
DataResolver resolver = new DataResolver(keyspace, retryCommand, ConsistencyLevel.ONE, 1);
ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source));
if (StorageProxy.canDoLocalRequest(source))
- StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler));
+ StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler));
else
MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(MessagingService.current_version), source, handler);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[6/6] cassandra git commit: Merge branch 'cassandra-3.11' into trunk
Posted by al...@apache.org.
Merge branch 'cassandra-3.11' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/278906c6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/278906c6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/278906c6
Branch: refs/heads/trunk
Commit: 278906c6c0424c1ce0d922c24747c97978b0aa14
Parents: 326f3a7 826ae9c
Author: Aleksey Yeschenko <al...@yeschenko.com>
Authored: Tue Aug 29 12:33:33 2017 +0100
Committer: Aleksey Yeschenko <al...@yeschenko.com>
Committed: Tue Aug 29 12:33:50 2017 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../UnfilteredPartitionIterators.java | 7 ---
.../db/transform/EmptyPartitionsDiscarder.java | 35 +++++++++++++++
.../apache/cassandra/db/transform/Filter.java | 28 +++---------
.../db/transform/FilteredPartitions.java | 15 ++++---
.../cassandra/db/transform/FilteredRows.java | 2 +-
.../apache/cassandra/metrics/TableMetrics.java | 4 ++
.../apache/cassandra/service/DataResolver.java | 45 ++++++++++++++------
.../apache/cassandra/db/ReadCommandTest.java | 23 +++++-----
9 files changed, 101 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/278906c6/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/278906c6/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/278906c6/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
index ed643bb,ad9446d..fa12c9c
--- a/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
+++ b/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
@@@ -50,11 -50,19 +50,16 @@@ public final class FilteredPartitions e
/**
* Filter any RangeTombstoneMarker from the iterator's iterators, transforming it into a PartitionIterator.
*/
- public static PartitionIterator filter(UnfilteredPartitionIterator iterator, int nowInSecs)
+ public static FilteredPartitions filter(UnfilteredPartitionIterator iterator, int nowInSecs)
{
- Filter filter = new Filter(true, nowInSecs);
- if (iterator instanceof UnfilteredPartitions)
- return new FilteredPartitions(filter, (UnfilteredPartitions) iterator);
- return new FilteredPartitions(iterator, filter);
+ FilteredPartitions filtered = filter(iterator, new Filter(nowInSecs));
-
- return iterator.isForThrift()
- ? filtered
- : (FilteredPartitions) Transformation.apply(filtered, new EmptyPartitionsDiscarder());
++ return (FilteredPartitions) Transformation.apply(filtered, new EmptyPartitionsDiscarder());
+ }
+
+ public static FilteredPartitions filter(UnfilteredPartitionIterator iterator, Filter filter)
+ {
+ return iterator instanceof UnfilteredPartitions
+ ? new FilteredPartitions(filter, (UnfilteredPartitions) iterator)
+ : new FilteredPartitions(iterator, filter);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/278906c6/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/metrics/TableMetrics.java
index 58b017e,b0f667c..7e6ca25
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@@ -240,33 -201,8 +240,35 @@@ public class TableMetric
}
});
+ public static final Gauge<Long> globalBytesRepaired = Metrics.register(globalFactory.createMetricName("BytesRepaired"),
+ new Gauge<Long>()
+ {
+ public Long getValue()
+ {
+ return totalNonSystemTablesSize(SSTableReader::isRepaired).left;
+ }
+ });
+
+ public static final Gauge<Long> globalBytesUnrepaired = Metrics.register(globalFactory.createMetricName("BytesUnrepaired"),
+ new Gauge<Long>()
+ {
+ public Long getValue()
+ {
+ return totalNonSystemTablesSize(s -> !s.isRepaired() && !s.isPendingRepair()).left;
+ }
+ });
+
+ public static final Gauge<Long> globalBytesPendingRepair = Metrics.register(globalFactory.createMetricName("BytesPendingRepair"),
+ new Gauge<Long>()
+ {
+ public Long getValue()
+ {
+ return totalNonSystemTablesSize(SSTableReader::isPendingRepair).left;
+ }
+ });
+
+ public final Meter shortReadProtectionRequests;
+
public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;
/**
* stores metrics that will be rolled into a single global metric
@@@ -810,25 -697,7 +812,27 @@@
casPropose = new LatencyMetrics(factory, "CasPropose", cfs.keyspace.metric.casPropose);
casCommit = new LatencyMetrics(factory, "CasCommit", cfs.keyspace.metric.casCommit);
+ repairsStarted = createTableCounter("RepairJobsStarted");
+ repairsCompleted = createTableCounter("RepairJobsCompleted");
+
+ anticompactionTime = createTableTimer("AnticompactionTime", cfs.keyspace.metric.anticompactionTime);
+ validationTime = createTableTimer("ValidationTime", cfs.keyspace.metric.validationTime);
+ syncTime = createTableTimer("SyncTime", cfs.keyspace.metric.repairSyncTime);
+
+ bytesValidated = createTableHistogram("BytesValidated", cfs.keyspace.metric.bytesValidated, false);
+ partitionsValidated = createTableHistogram("PartitionsValidated", cfs.keyspace.metric.partitionsValidated, false);
+ bytesAnticompacted = createTableCounter("BytesAnticompacted");
+ bytesMutatedAnticompaction = createTableCounter("BytesMutatedAnticompaction");
+ mutatedAnticompactionGauge = createTableGauge("MutatedAnticompactionGauge", () ->
+ {
+ double bytesMutated = bytesMutatedAnticompaction.getCount();
+ double bytesAnticomp = bytesAnticompacted.getCount();
+ if (bytesAnticomp + bytesMutated > 0)
+ return bytesMutated / (bytesAnticomp + bytesMutated);
+ return 0.0;
+ });
++
+ shortReadProtectionRequests = Metrics.meter(factory.createMetricName("ShortReadProtectionRequests"));
}
public void updateSSTableIterated(int count)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/278906c6/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/DataResolver.java
index 78bbe16,32b6d79..f4a472d
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@@ -27,13 -27,9 +27,12 @@@ import com.google.common.collect.Iterab
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.*;
+import org.apache.cassandra.schema.ColumnMetadata;
++import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
- import org.apache.cassandra.db.filter.ClusteringIndexFilter;
- import org.apache.cassandra.db.filter.ColumnFilter;
- import org.apache.cassandra.db.filter.DataLimits;
+ import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.filter.DataLimits.Counter;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.rows.*;
@@@ -76,10 -71,29 +74,26 @@@ public class DataResolver extends Respo
sources[i] = msg.from;
}
- // Even though every responses should honor the limit, we might have more than requested post reconciliation,
- // so ensure we're respecting the limit.
+ /*
+ * Even though every response, individually, will honor the limit, it is possible that we will, after the merge,
+ * have more rows than the client requested. To make sure that we still conform to the original limit,
+ * we apply a top-level post-reconciliation counter to the merged partition iterator.
+ *
+ * Short read protection logic (ShortReadRowProtection.moreContents()) relies on this counter to be applied
+ * to the current partition to work. For this reason we have to apply the counter transformation before
+ * empty partition discard logic kicks in - for it will eagerly consume the iterator.
+ *
+ * That's why the order here is: 1) merge; 2) filter rows; 3) count; 4) discard empty partitions
+ *
+ * See CASSANDRA-13747 for more details.
+ */
+
DataLimits.Counter counter = command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition());
- return counter.applyTo(mergeWithShortReadProtection(iters, sources, counter));
+
+ UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters, sources, counter);
+ FilteredPartitions filtered = FilteredPartitions.filter(merged, new Filter(command.nowInSec()));
+ PartitionIterator counted = counter.applyTo(filtered);
-
- return command.isForThrift()
- ? counted
- : Transformation.apply(counted, new EmptyPartitionsDiscarder());
++ return Transformation.apply(counted, new EmptyPartitionsDiscarder());
}
public void compareResponses()
@@@ -541,6 -557,9 +557,9 @@@
partitionKey,
retryFilter);
+ Tracing.trace("Requesting {} extra rows from {} for short read protection", toQuery, source);
- Schema.instance.getColumnFamilyStoreInstance(cmd.metadata().cfId).metric.shortReadProtectionRequests.mark();
++ Schema.instance.getColumnFamilyStoreInstance(cmd.metadata().id).metric.shortReadProtectionRequests.mark();
+
return doShortReadRetry(cmd);
}
@@@ -581,9 -600,9 +600,9 @@@
DataResolver resolver = new DataResolver(keyspace, retryCommand, ConsistencyLevel.ONE, 1, queryStartNanoTime);
ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source), queryStartNanoTime);
if (StorageProxy.canDoLocalRequest(source))
- StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler));
+ StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler));
else
- MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(MessagingService.current_version), source, handler);
+ MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(), source, handler);
// We don't call handler.get() because we want to preserve tombstones since we're still in the middle of merging node results.
handler.awaitResults();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/278906c6/test/unit/org/apache/cassandra/db/ReadCommandTest.java
----------------------------------------------------------------------
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[3/6] cassandra git commit: Fix AssertionError in short read
protection
Posted by al...@apache.org.
Fix AssertionError in short read protection
patch by Aleksey Yeschenko; reviewed by Benedict Elliott Smith for
CASSANDRA-13747
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a7cb009f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a7cb009f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a7cb009f
Branch: refs/heads/trunk
Commit: a7cb009f8a3f4d0e0293111bfcfff3d404a37a89
Parents: dfbe3fa
Author: Aleksey Yeschenko <al...@yeschenko.com>
Authored: Sun Aug 6 19:42:47 2017 +0100
Committer: Aleksey Yeschenko <al...@yeschenko.com>
Committed: Tue Aug 29 12:22:39 2017 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../UnfilteredPartitionIterators.java | 6 ---
.../db/transform/EmptyPartitionsDiscarder.java | 35 +++++++++++++++
.../apache/cassandra/db/transform/Filter.java | 28 +++---------
.../db/transform/FilteredPartitions.java | 18 +++++---
.../cassandra/db/transform/FilteredRows.java | 2 +-
.../apache/cassandra/metrics/TableMetrics.java | 4 ++
.../apache/cassandra/service/DataResolver.java | 47 ++++++++++++++------
8 files changed, 94 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5ccd5cd..6609b05 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.15
+ * Fix AssertionError in short read protection (CASSANDRA-13747)
* Don't skip corrupted sstables on startup (CASSANDRA-13620)
* Fix the merging of cells with different user type versions (CASSANDRA-13776)
* Copy session properties on cqlsh.py do_login (CASSANDRA-13640)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index 1abbb19..4e0ac1b 100644
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -77,12 +77,6 @@ public abstract class UnfilteredPartitionIterators
return Transformation.apply(toReturn, new Close());
}
- public static PartitionIterator mergeAndFilter(List<UnfilteredPartitionIterator> iterators, int nowInSec, MergeListener listener)
- {
- // TODO: we could have a somewhat faster version if we were to merge the UnfilteredRowIterators directly as RowIterators
- return filter(merge(iterators, nowInSec, listener), nowInSec);
- }
-
public static PartitionIterator filter(final UnfilteredPartitionIterator iterator, final int nowInSec)
{
return FilteredPartitions.filter(iterator, nowInSec);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/transform/EmptyPartitionsDiscarder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/EmptyPartitionsDiscarder.java b/src/java/org/apache/cassandra/db/transform/EmptyPartitionsDiscarder.java
new file mode 100644
index 0000000..5e41cec
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/transform/EmptyPartitionsDiscarder.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.transform;
+
+import org.apache.cassandra.db.rows.BaseRowIterator;
+
+public final class EmptyPartitionsDiscarder extends Transformation<BaseRowIterator<?>>
+{
+ @Override
+ protected BaseRowIterator applyToPartition(BaseRowIterator iterator)
+ {
+ if (iterator.isEmpty())
+ {
+ iterator.close();
+ return null;
+ }
+
+ return iterator;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/transform/Filter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/Filter.java b/src/java/org/apache/cassandra/db/transform/Filter.java
index 48c8b1a..747983f 100644
--- a/src/java/org/apache/cassandra/db/transform/Filter.java
+++ b/src/java/org/apache/cassandra/db/transform/Filter.java
@@ -23,27 +23,21 @@ package org.apache.cassandra.db.transform;
import org.apache.cassandra.db.DeletionPurger;
import org.apache.cassandra.db.rows.*;
-final class Filter extends Transformation
+public final class Filter extends Transformation
{
- private final boolean filterEmpty; // generally maps to !isForThrift, but also false for direct row filtration
private final int nowInSec;
- public Filter(boolean filterEmpty, int nowInSec)
+
+ public Filter(int nowInSec)
{
- this.filterEmpty = filterEmpty;
this.nowInSec = nowInSec;
}
@Override
protected RowIterator applyToPartition(BaseRowIterator iterator)
{
- RowIterator filtered = iterator instanceof UnfilteredRows
- ? new FilteredRows(this, (UnfilteredRows) iterator)
- : new FilteredRows((UnfilteredRowIterator) iterator, this);
-
- if (filterEmpty && closeIfEmpty(filtered))
- return null;
-
- return filtered;
+ return iterator instanceof UnfilteredRows
+ ? new FilteredRows(this, (UnfilteredRows) iterator)
+ : new FilteredRows((UnfilteredRowIterator) iterator, this);
}
@Override
@@ -67,14 +61,4 @@ final class Filter extends Transformation
{
return null;
}
-
- private static boolean closeIfEmpty(BaseRowIterator<?> iter)
- {
- if (iter.isEmpty())
- {
- iter.close();
- return true;
- }
- return false;
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java b/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
index 09e36b4..ad9446d 100644
--- a/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
+++ b/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
@@ -50,11 +50,19 @@ public final class FilteredPartitions extends BasePartitions<RowIterator, BasePa
/**
* Filter any RangeTombstoneMarker from the iterator's iterators, transforming it into a PartitionIterator.
*/
- public static PartitionIterator filter(UnfilteredPartitionIterator iterator, int nowInSecs)
+ public static FilteredPartitions filter(UnfilteredPartitionIterator iterator, int nowInSecs)
{
- Filter filter = new Filter(!iterator.isForThrift(), nowInSecs);
- if (iterator instanceof UnfilteredPartitions)
- return new FilteredPartitions(filter, (UnfilteredPartitions) iterator);
- return new FilteredPartitions(iterator, filter);
+ FilteredPartitions filtered = filter(iterator, new Filter(nowInSecs));
+
+ return iterator.isForThrift()
+ ? filtered
+ : (FilteredPartitions) Transformation.apply(filtered, new EmptyPartitionsDiscarder());
+ }
+
+ public static FilteredPartitions filter(UnfilteredPartitionIterator iterator, Filter filter)
+ {
+ return iterator instanceof UnfilteredPartitions
+ ? new FilteredPartitions(filter, (UnfilteredPartitions) iterator)
+ : new FilteredPartitions(iterator, filter);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/transform/FilteredRows.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/FilteredRows.java b/src/java/org/apache/cassandra/db/transform/FilteredRows.java
index 818d3bb..5b635eb 100644
--- a/src/java/org/apache/cassandra/db/transform/FilteredRows.java
+++ b/src/java/org/apache/cassandra/db/transform/FilteredRows.java
@@ -55,6 +55,6 @@ public final class FilteredRows extends BaseRows<Row, BaseRowIterator<?>> implem
*/
public static RowIterator filter(UnfilteredRowIterator iterator, int nowInSecs)
{
- return new Filter(false, nowInSecs).applyToPartition(iterator);
+ return new Filter(nowInSecs).applyToPartition(iterator);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index a493836..fe88a63 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -151,6 +151,8 @@ public class TableMetrics
public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Write");
public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Range");
+ public final Meter shortReadProtectionRequests;
+
public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;
/**
* stores metrics that will be rolled into a single global metric
@@ -645,6 +647,8 @@ public class TableMetrics
casPrepare = new LatencyMetrics(factory, "CasPrepare", cfs.keyspace.metric.casPrepare);
casPropose = new LatencyMetrics(factory, "CasPropose", cfs.keyspace.metric.casPropose);
casCommit = new LatencyMetrics(factory, "CasCommit", cfs.keyspace.metric.casCommit);
+
+ shortReadProtectionRequests = Metrics.meter(factory.createMetricName("ShortReadProtectionRequests"));
}
public void updateSSTableIterated(int count)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 26b1b2a..72c4950 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -27,16 +27,13 @@ import com.google.common.collect.Iterables;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.*;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.ClusteringIndexFilter;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.transform.MoreRows;
-import org.apache.cassandra.db.transform.Transformation;
+import org.apache.cassandra.db.transform.*;
import org.apache.cassandra.exceptions.ReadTimeoutException;
import org.apache.cassandra.net.*;
import org.apache.cassandra.tracing.Tracing;
@@ -72,10 +69,29 @@ public class DataResolver extends ResponseResolver
sources[i] = msg.from;
}
- // Even though every responses should honor the limit, we might have more than requested post reconciliation,
- // so ensure we're respecting the limit.
+ /*
+ * Even though every response, individually, will honor the limit, it is possible that we will, after the merge,
+ * have more rows than the client requested. To make sure that we still conform to the original limit,
+ * we apply a top-level post-reconciliation counter to the merged partition iterator.
+ *
+ * Short read protection logic (ShortReadRowProtection.moreContents()) relies on this counter to be applied
+ * to the current partition to work. For this reason we have to apply the counter transformation before
+ * empty partition discard logic kicks in - for it will eagerly consume the iterator.
+ *
+ * That's why the order here is: 1) merge; 2) filter rows; 3) count; 4) discard empty partitions
+ *
+ * See CASSANDRA-13747 for more details.
+ */
+
DataLimits.Counter counter = command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition());
- return counter.applyTo(mergeWithShortReadProtection(iters, sources, counter));
+
+ UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters, sources, counter);
+ FilteredPartitions filtered = FilteredPartitions.filter(merged, new Filter(command.nowInSec()));
+ PartitionIterator counted = counter.applyTo(filtered);
+
+ return command.isForThrift()
+ ? counted
+ : Transformation.apply(counted, new EmptyPartitionsDiscarder());
}
public void compareResponses()
@@ -87,11 +103,13 @@ public class DataResolver extends ResponseResolver
}
}
- private PartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results, InetAddress[] sources, DataLimits.Counter resultCounter)
+ private UnfilteredPartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results,
+ InetAddress[] sources,
+ DataLimits.Counter resultCounter)
{
// If we have only one results, there is no read repair to do and we can't get short reads
if (results.size() == 1)
- return UnfilteredPartitionIterators.filter(results.get(0), command.nowInSec());
+ return results.get(0);
UnfilteredPartitionIterators.MergeListener listener = new RepairMergeListener(sources);
@@ -103,7 +121,7 @@ public class DataResolver extends ResponseResolver
results.set(i, Transformation.apply(results.get(i), new ShortReadProtection(sources[i], resultCounter)));
}
- return UnfilteredPartitionIterators.mergeAndFilter(results, command.nowInSec(), listener);
+ return UnfilteredPartitionIterators.merge(results, command.nowInSec(), listener);
}
private class RepairMergeListener implements UnfilteredPartitionIterators.MergeListener
@@ -510,7 +528,7 @@ public class DataResolver extends ResponseResolver
// counting iterator.
int n = postReconciliationCounter.countedInCurrentPartition();
int x = counter.countedInCurrentPartition();
- int toQuery = Math.max(((n * n) / x) - n, 1);
+ int toQuery = Math.max(((n * n) / Math.max(x, 1)) - n, 1);
DataLimits retryLimits = command.limits().forShortReadRetry(toQuery);
ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey);
@@ -523,6 +541,9 @@ public class DataResolver extends ResponseResolver
partitionKey,
retryFilter);
+ Tracing.trace("Requesting {} extra rows from {} for short read protection", toQuery, source);
+ Schema.instance.getColumnFamilyStoreInstance(cmd.metadata().cfId).metric.shortReadProtectionRequests.mark();
+
return doShortReadRetry(cmd);
}
@@ -531,7 +552,7 @@ public class DataResolver extends ResponseResolver
DataResolver resolver = new DataResolver(keyspace, retryCommand, ConsistencyLevel.ONE, 1);
ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source));
if (StorageProxy.canDoLocalRequest(source))
- StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler));
+ StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler));
else
MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(MessagingService.current_version), source, handler);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[5/6] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.11
Posted by al...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/826ae9c9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/826ae9c9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/826ae9c9
Branch: refs/heads/cassandra-3.11
Commit: 826ae9c91e11ebb889b3f1788b9357c2c717f9a0
Parents: 809f3b3 a7cb009
Author: Aleksey Yeschenko <al...@yeschenko.com>
Authored: Tue Aug 29 12:30:40 2017 +0100
Committer: Aleksey Yeschenko <al...@yeschenko.com>
Committed: Tue Aug 29 12:31:27 2017 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../UnfilteredPartitionIterators.java | 7 ---
.../db/transform/EmptyPartitionsDiscarder.java | 35 ++++++++++++++
.../apache/cassandra/db/transform/Filter.java | 28 +++--------
.../db/transform/FilteredPartitions.java | 18 +++++--
.../cassandra/db/transform/FilteredRows.java | 2 +-
.../apache/cassandra/metrics/TableMetrics.java | 4 ++
.../apache/cassandra/service/DataResolver.java | 51 ++++++++++++++------
.../apache/cassandra/db/ReadCommandTest.java | 23 ++++-----
9 files changed, 107 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/826ae9c9/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index b0dbd60,6609b05..c4aee3a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,11 -1,5 +1,12 @@@
-3.0.15
+3.11.1
+ * Fix cassandra-stress hang issues when an error during cluster connection happens (CASSANDRA-12938)
+ * Better bootstrap failure message when blocked by (potential) range movement (CASSANDRA-13744)
+ * "ignore" option is ignored in sstableloader (CASSANDRA-13721)
+ * Deadlock in AbstractCommitLogSegmentManager (CASSANDRA-13652)
+ * Duplicate the buffer before passing it to analyser in SASI operation (CASSANDRA-13512)
+ * Properly evict pstmts from prepared statements cache (CASSANDRA-13641)
+Merged from 3.0:
+ * Fix AssertionError in short read protection (CASSANDRA-13747)
* Don't skip corrupted sstables on startup (CASSANDRA-13620)
* Fix the merging of cells with different user type versions (CASSANDRA-13776)
* Copy session properties on cqlsh.py do_login (CASSANDRA-13640)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/826ae9c9/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index fc225e8,4e0ac1b..778c71d
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@@ -78,31 -77,6 +78,24 @@@ public abstract class UnfilteredPartiti
return Transformation.apply(toReturn, new Close());
}
+ public static UnfilteredPartitionIterator concat(final List<UnfilteredPartitionIterator> iterators)
+ {
+ if (iterators.size() == 1)
+ return iterators.get(0);
+
+ class Extend implements MorePartitions<UnfilteredPartitionIterator>
+ {
+ int i = 1;
+ public UnfilteredPartitionIterator moreContents()
+ {
+ if (i >= iterators.size())
+ return null;
+ return iterators.get(i++);
+ }
+ }
+ return MorePartitions.extend(iterators.get(0), new Extend());
+ }
+
-
- public static PartitionIterator mergeAndFilter(List<UnfilteredPartitionIterator> iterators, int nowInSec, MergeListener listener)
- {
- // TODO: we could have a somewhat faster version if we were to merge the UnfilteredRowIterators directly as RowIterators
- return filter(merge(iterators, nowInSec, listener), nowInSec);
- }
-
public static PartitionIterator filter(final UnfilteredPartitionIterator iterator, final int nowInSec)
{
return FilteredPartitions.filter(iterator, nowInSec);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/826ae9c9/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/metrics/TableMetrics.java
index 7a84eca,fe88a63..b0f667c
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@@ -167,40 -151,8 +167,42 @@@ public class TableMetric
public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Write");
public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Range");
+ public final static Gauge<Double> globalPercentRepaired = Metrics.register(globalFactory.createMetricName("PercentRepaired"),
+ new Gauge<Double>()
+ {
+ public Double getValue()
+ {
+ double repaired = 0;
+ double total = 0;
+ for (String keyspace : Schema.instance.getNonSystemKeyspaces())
+ {
+ Keyspace k = Schema.instance.getKeyspaceInstance(keyspace);
+ if (SchemaConstants.DISTRIBUTED_KEYSPACE_NAME.equals(k.getName()))
+ continue;
+ if (k.getReplicationStrategy().getReplicationFactor() < 2)
+ continue;
+
+ for (ColumnFamilyStore cf : k.getColumnFamilyStores())
+ {
+ if (!SecondaryIndexManager.isIndexColumnFamily(cf.name))
+ {
+ for (SSTableReader sstable : cf.getSSTables(SSTableSet.CANONICAL))
+ {
+ if (sstable.isRepaired())
+ {
+ repaired += sstable.uncompressedLength();
+ }
+ total += sstable.uncompressedLength();
+ }
+ }
+ }
+ }
+ return total > 0 ? (repaired / total) * 100 : 100.0;
+ }
+ });
+
+ public final Meter shortReadProtectionRequests;
+
public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;
/**
* stores metrics that will be rolled into a single global metric
http://git-wip-us.apache.org/repos/asf/cassandra/blob/826ae9c9/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/DataResolver.java
index 116dadd,72c4950..32b6d79
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@@ -27,18 -27,13 +27,13 @@@ import com.google.common.collect.Iterab
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
- import org.apache.cassandra.config.CFMetaData;
- import org.apache.cassandra.config.ColumnDefinition;
- import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.config.*;
import org.apache.cassandra.db.*;
--import org.apache.cassandra.db.filter.ClusteringIndexFilter;
- import org.apache.cassandra.db.filter.ColumnFilter;
--import org.apache.cassandra.db.filter.DataLimits;
++import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.filter.DataLimits.Counter;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.rows.*;
- import org.apache.cassandra.db.transform.MoreRows;
- import org.apache.cassandra.db.transform.Transformation;
+ import org.apache.cassandra.db.transform.*;
import org.apache.cassandra.exceptions.ReadTimeoutException;
import org.apache.cassandra.net.*;
import org.apache.cassandra.tracing.Tracing;
@@@ -104,10 -118,10 +120,10 @@@ public class DataResolver extends Respo
if (!command.limits().isUnlimited())
{
for (int i = 0; i < results.size(); i++)
- results.set(i, Transformation.apply(results.get(i), new ShortReadProtection(sources[i], resultCounter)));
+ results.set(i, Transformation.apply(results.get(i), new ShortReadProtection(sources[i], resultCounter, queryStartNanoTime)));
}
- return UnfilteredPartitionIterators.mergeAndFilter(results, command.nowInSec(), listener);
+ return UnfilteredPartitionIterators.merge(results, command.nowInSec(), listener);
}
private class RepairMergeListener implements UnfilteredPartitionIterators.MergeListener
@@@ -526,9 -526,9 +542,9 @@@
// we should request m rows so that m * x/n = n-x, that is m = (n^2/x) - n.
// Also note that it's ok if we retrieve more results that necessary since our top level iterator is a
// counting iterator.
- int n = postReconciliationCounter.countedInCurrentPartition();
- int x = counter.countedInCurrentPartition();
+ int n = countedInCurrentPartition(postReconciliationCounter);
+ int x = countedInCurrentPartition(counter);
- int toQuery = Math.max(((n * n) / x) - n, 1);
+ int toQuery = Math.max(((n * n) / Math.max(x, 1)) - n, 1);
DataLimits retryLimits = command.limits().forShortReadRetry(toQuery);
ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey);
@@@ -544,44 -547,12 +563,44 @@@
return doShortReadRetry(cmd);
}
+ /**
+ * Returns the number of results counted by the counter.
+ *
+ * @param counter the counter.
+ * @return the number of results counted by the counter
+ */
+ private int counted(Counter counter)
+ {
+ // We are interested by the number of rows but for GROUP BY queries 'counted' returns the number of
+ // groups.
+ if (command.limits().isGroupByLimit())
+ return counter.rowCounted();
+
+ return counter.counted();
+ }
+
+ /**
+ * Returns the number of results counted in the partition by the counter.
+ *
+ * @param counter the counter.
+ * @return the number of results counted in the partition by the counter
+ */
+ private int countedInCurrentPartition(Counter counter)
+ {
+ // We are interested by the number of rows but for GROUP BY queries 'countedInCurrentPartition' returns
+ // the number of groups in the current partition.
+ if (command.limits().isGroupByLimit())
+ return counter.rowCountedInCurrentPartition();
+
+ return counter.countedInCurrentPartition();
+ }
+
private UnfilteredRowIterator doShortReadRetry(SinglePartitionReadCommand retryCommand)
{
- DataResolver resolver = new DataResolver(keyspace, retryCommand, ConsistencyLevel.ONE, 1);
- ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source));
+ DataResolver resolver = new DataResolver(keyspace, retryCommand, ConsistencyLevel.ONE, 1, queryStartNanoTime);
+ ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source), queryStartNanoTime);
if (StorageProxy.canDoLocalRequest(source))
- StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler));
+ StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler));
else
MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(MessagingService.current_version), source, handler);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/826ae9c9/test/unit/org/apache/cassandra/db/ReadCommandTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/ReadCommandTest.java
index 2aef2a7,0000000..9264297
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
@@@ -1,311 -1,0 +1,312 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.partitions.FilteredPartition;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+
+public class ReadCommandTest
+{
+ private static final String KEYSPACE = "ReadCommandTest";
+ private static final String CF1 = "Standard1";
+ private static final String CF2 = "Standard2";
+ private static final String CF3 = "Standard3";
+
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
+ {
+ DatabaseDescriptor.daemonInitialization();
+
+ CFMetaData metadata1 = SchemaLoader.standardCFMD(KEYSPACE, CF1);
+
+ CFMetaData metadata2 = CFMetaData.Builder.create(KEYSPACE, CF2)
+ .addPartitionKey("key", BytesType.instance)
+ .addClusteringColumn("col", AsciiType.instance)
+ .addRegularColumn("a", AsciiType.instance)
+ .addRegularColumn("b", AsciiType.instance).build();
+
+ CFMetaData metadata3 = CFMetaData.Builder.create(KEYSPACE, CF3)
+ .addPartitionKey("key", BytesType.instance)
+ .addClusteringColumn("col", AsciiType.instance)
+ .addRegularColumn("a", AsciiType.instance)
+ .addRegularColumn("b", AsciiType.instance)
+ .addRegularColumn("c", AsciiType.instance)
+ .addRegularColumn("d", AsciiType.instance)
+ .addRegularColumn("e", AsciiType.instance)
+ .addRegularColumn("f", AsciiType.instance).build();
+
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE,
+ KeyspaceParams.simple(1),
+ metadata1,
+ metadata2,
+ metadata3);
+ }
+
+ @Test
+ public void testPartitionRangeAbort() throws Exception
+ {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF1);
+
+ new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key1"))
+ .clustering("Column1")
+ .add("val", ByteBufferUtil.bytes("abcd"))
+ .build()
+ .apply();
+
+ cfs.forceBlockingFlush();
+
+ new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key2"))
+ .clustering("Column1")
+ .add("val", ByteBufferUtil.bytes("abcd"))
+ .build()
+ .apply();
+
+ ReadCommand readCommand = Util.cmd(cfs).build();
+ assertEquals(2, Util.getAll(readCommand).size());
+
+ readCommand.abort();
+ assertEquals(0, Util.getAll(readCommand).size());
+ }
+
+ @Test
+ public void testSinglePartitionSliceAbort() throws Exception
+ {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2);
+
+ cfs.truncateBlocking();
+
+ new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key"))
+ .clustering("cc")
+ .add("a", ByteBufferUtil.bytes("abcd"))
+ .build()
+ .apply();
+
+ cfs.forceBlockingFlush();
+
+ new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key"))
+ .clustering("dd")
+ .add("a", ByteBufferUtil.bytes("abcd"))
+ .build()
+ .apply();
+
+ ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).build();
+
+ List<FilteredPartition> partitions = Util.getAll(readCommand);
+ assertEquals(1, partitions.size());
+ assertEquals(2, partitions.get(0).rowCount());
+
+ readCommand.abort();
+ assertEquals(0, Util.getAll(readCommand).size());
+ }
+
+ @Test
+ public void testSinglePartitionNamesAbort() throws Exception
+ {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2);
+
+ cfs.truncateBlocking();
+
+ new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key"))
+ .clustering("cc")
+ .add("a", ByteBufferUtil.bytes("abcd"))
+ .build()
+ .apply();
+
+ cfs.forceBlockingFlush();
+
+ new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key"))
+ .clustering("dd")
+ .add("a", ByteBufferUtil.bytes("abcd"))
+ .build()
+ .apply();
+
+ ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).includeRow("cc").includeRow("dd").build();
+
+ List<FilteredPartition> partitions = Util.getAll(readCommand);
+ assertEquals(1, partitions.size());
+ assertEquals(2, partitions.get(0).rowCount());
+
+ readCommand.abort();
+ assertEquals(0, Util.getAll(readCommand).size());
+ }
+
+ @Test
+ public void testSinglePartitionGroupMerge() throws Exception
+ {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF3);
+
+ String[][][] groups = new String[][][] {
+ new String[][] {
+ new String[] { "1", "key1", "aa", "a" }, // "1" indicates to create the data, "-1" to delete the row
+ new String[] { "1", "key2", "bb", "b" },
+ new String[] { "1", "key3", "cc", "c" }
+ },
+ new String[][] {
+ new String[] { "1", "key3", "dd", "d" },
+ new String[] { "1", "key2", "ee", "e" },
+ new String[] { "1", "key1", "ff", "f" }
+ },
+ new String[][] {
+ new String[] { "1", "key6", "aa", "a" },
+ new String[] { "1", "key5", "bb", "b" },
+ new String[] { "1", "key4", "cc", "c" }
+ },
+ new String[][] {
+ new String[] { "-1", "key6", "aa", "a" },
+ new String[] { "-1", "key2", "bb", "b" }
+ }
+ };
+
+ // Given the data above, when the keys are sorted and the deletions removed, we should
+ // get these clustering rows in this order
+ String[] expectedRows = new String[] { "aa", "ff", "ee", "cc", "dd", "cc", "bb"};
+
+ List<ByteBuffer> buffers = new ArrayList<>(groups.length);
+ int nowInSeconds = FBUtilities.nowInSeconds();
+ ColumnFilter columnFilter = ColumnFilter.allColumnsBuilder(cfs.metadata).build();
+ RowFilter rowFilter = RowFilter.create();
+ Slice slice = Slice.make(ClusteringBound.BOTTOM, ClusteringBound.TOP);
+ ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(Slices.with(cfs.metadata.comparator, slice), false);
+
+ for (String[][] group : groups)
+ {
+ cfs.truncateBlocking();
+
+ List<SinglePartitionReadCommand> commands = new ArrayList<>(group.length);
+
+ for (String[] data : group)
+ {
+ if (data[0].equals("1"))
+ {
+ new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes(data[1]))
+ .clustering(data[2])
+ .add(data[3], ByteBufferUtil.bytes("blah"))
+ .build()
+ .apply();
+ }
+ else
+ {
+ RowUpdateBuilder.deleteRow(cfs.metadata, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(data[1]), data[2]).apply();
+ }
+ commands.add(SinglePartitionReadCommand.create(cfs.metadata, nowInSeconds, columnFilter, rowFilter, DataLimits.NONE, Util.dk(data[1]), sliceFilter));
+ }
+
+ cfs.forceBlockingFlush();
+
+ ReadQuery query = new SinglePartitionReadCommand.Group(commands, DataLimits.NONE);
+
+ try (ReadExecutionController executionController = query.executionController();
+ UnfilteredPartitionIterator iter = query.executeLocally(executionController);
+ DataOutputBuffer buffer = new DataOutputBuffer())
+ {
+ UnfilteredPartitionIterators.serializerForIntraNode().serialize(iter,
+ columnFilter,
+ buffer,
+ MessagingService.current_version);
+ buffers.add(buffer.buffer());
+ }
+ }
+
+ // deserialize, merge and check the results are all there
+ List<UnfilteredPartitionIterator> iterators = new ArrayList<>();
+
+ for (ByteBuffer buffer : buffers)
+ {
+ try (DataInputBuffer in = new DataInputBuffer(buffer, true))
+ {
+ iterators.add(UnfilteredPartitionIterators.serializerForIntraNode().deserialize(in,
+ MessagingService.current_version,
+ cfs.metadata,
+ columnFilter,
+ SerializationHelper.Flag.LOCAL));
+ }
+ }
+
- try(PartitionIterator partitionIterator = UnfilteredPartitionIterators.mergeAndFilter(iterators,
- nowInSeconds,
- new UnfilteredPartitionIterators.MergeListener()
- {
- public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)
++ UnfilteredPartitionIterators.MergeListener listener =
++ new UnfilteredPartitionIterators.MergeListener()
+ {
- return null;
- }
++ public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)
++ {
++ return null;
++ }
+
- public void close()
- {
++ public void close()
++ {
+
- }
- }))
++ }
++ };
++
++ try (PartitionIterator partitionIterator = UnfilteredPartitionIterators.filter(UnfilteredPartitionIterators.merge(iterators, nowInSeconds, listener), nowInSeconds))
+ {
+
+ int i = 0;
+ int numPartitions = 0;
+ while (partitionIterator.hasNext())
+ {
+ numPartitions++;
+ try(RowIterator rowIterator = partitionIterator.next())
+ {
+ while (rowIterator.hasNext())
+ {
+ Row row = rowIterator.next();
+ assertEquals("col=" + expectedRows[i++], row.clustering().toString(cfs.metadata));
+ //System.out.print(row.toString(cfs.metadata, true));
+ }
+ }
+ }
+
+ assertEquals(5, numPartitions);
+ assertEquals(expectedRows.length, i);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[4/6] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.11
Posted by al...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/826ae9c9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/826ae9c9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/826ae9c9
Branch: refs/heads/trunk
Commit: 826ae9c91e11ebb889b3f1788b9357c2c717f9a0
Parents: 809f3b3 a7cb009
Author: Aleksey Yeschenko <al...@yeschenko.com>
Authored: Tue Aug 29 12:30:40 2017 +0100
Committer: Aleksey Yeschenko <al...@yeschenko.com>
Committed: Tue Aug 29 12:31:27 2017 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../UnfilteredPartitionIterators.java | 7 ---
.../db/transform/EmptyPartitionsDiscarder.java | 35 ++++++++++++++
.../apache/cassandra/db/transform/Filter.java | 28 +++--------
.../db/transform/FilteredPartitions.java | 18 +++++--
.../cassandra/db/transform/FilteredRows.java | 2 +-
.../apache/cassandra/metrics/TableMetrics.java | 4 ++
.../apache/cassandra/service/DataResolver.java | 51 ++++++++++++++------
.../apache/cassandra/db/ReadCommandTest.java | 23 ++++-----
9 files changed, 107 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/826ae9c9/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index b0dbd60,6609b05..c4aee3a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,11 -1,5 +1,12 @@@
-3.0.15
+3.11.1
+ * Fix cassandra-stress hang issues when an error during cluster connection happens (CASSANDRA-12938)
+ * Better bootstrap failure message when blocked by (potential) range movement (CASSANDRA-13744)
+ * "ignore" option is ignored in sstableloader (CASSANDRA-13721)
+ * Deadlock in AbstractCommitLogSegmentManager (CASSANDRA-13652)
+ * Duplicate the buffer before passing it to analyser in SASI operation (CASSANDRA-13512)
+ * Properly evict pstmts from prepared statements cache (CASSANDRA-13641)
+Merged from 3.0:
+ * Fix AssertionError in short read protection (CASSANDRA-13747)
* Don't skip corrupted sstables on startup (CASSANDRA-13620)
* Fix the merging of cells with different user type versions (CASSANDRA-13776)
* Copy session properties on cqlsh.py do_login (CASSANDRA-13640)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/826ae9c9/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index fc225e8,4e0ac1b..778c71d
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@@ -78,31 -77,6 +78,24 @@@ public abstract class UnfilteredPartiti
return Transformation.apply(toReturn, new Close());
}
+ public static UnfilteredPartitionIterator concat(final List<UnfilteredPartitionIterator> iterators)
+ {
+ if (iterators.size() == 1)
+ return iterators.get(0);
+
+ class Extend implements MorePartitions<UnfilteredPartitionIterator>
+ {
+ int i = 1;
+ public UnfilteredPartitionIterator moreContents()
+ {
+ if (i >= iterators.size())
+ return null;
+ return iterators.get(i++);
+ }
+ }
+ return MorePartitions.extend(iterators.get(0), new Extend());
+ }
+
-
- public static PartitionIterator mergeAndFilter(List<UnfilteredPartitionIterator> iterators, int nowInSec, MergeListener listener)
- {
- // TODO: we could have a somewhat faster version if we were to merge the UnfilteredRowIterators directly as RowIterators
- return filter(merge(iterators, nowInSec, listener), nowInSec);
- }
-
public static PartitionIterator filter(final UnfilteredPartitionIterator iterator, final int nowInSec)
{
return FilteredPartitions.filter(iterator, nowInSec);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/826ae9c9/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/metrics/TableMetrics.java
index 7a84eca,fe88a63..b0f667c
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@@ -167,40 -151,8 +167,42 @@@ public class TableMetric
public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Write");
public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Range");
+ public final static Gauge<Double> globalPercentRepaired = Metrics.register(globalFactory.createMetricName("PercentRepaired"),
+ new Gauge<Double>()
+ {
+ public Double getValue()
+ {
+ double repaired = 0;
+ double total = 0;
+ for (String keyspace : Schema.instance.getNonSystemKeyspaces())
+ {
+ Keyspace k = Schema.instance.getKeyspaceInstance(keyspace);
+ if (SchemaConstants.DISTRIBUTED_KEYSPACE_NAME.equals(k.getName()))
+ continue;
+ if (k.getReplicationStrategy().getReplicationFactor() < 2)
+ continue;
+
+ for (ColumnFamilyStore cf : k.getColumnFamilyStores())
+ {
+ if (!SecondaryIndexManager.isIndexColumnFamily(cf.name))
+ {
+ for (SSTableReader sstable : cf.getSSTables(SSTableSet.CANONICAL))
+ {
+ if (sstable.isRepaired())
+ {
+ repaired += sstable.uncompressedLength();
+ }
+ total += sstable.uncompressedLength();
+ }
+ }
+ }
+ }
+ return total > 0 ? (repaired / total) * 100 : 100.0;
+ }
+ });
+
+ public final Meter shortReadProtectionRequests;
+
public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;
/**
* stores metrics that will be rolled into a single global metric
http://git-wip-us.apache.org/repos/asf/cassandra/blob/826ae9c9/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/DataResolver.java
index 116dadd,72c4950..32b6d79
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@@ -27,18 -27,13 +27,13 @@@ import com.google.common.collect.Iterab
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
- import org.apache.cassandra.config.CFMetaData;
- import org.apache.cassandra.config.ColumnDefinition;
- import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.config.*;
import org.apache.cassandra.db.*;
--import org.apache.cassandra.db.filter.ClusteringIndexFilter;
- import org.apache.cassandra.db.filter.ColumnFilter;
--import org.apache.cassandra.db.filter.DataLimits;
++import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.filter.DataLimits.Counter;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.rows.*;
- import org.apache.cassandra.db.transform.MoreRows;
- import org.apache.cassandra.db.transform.Transformation;
+ import org.apache.cassandra.db.transform.*;
import org.apache.cassandra.exceptions.ReadTimeoutException;
import org.apache.cassandra.net.*;
import org.apache.cassandra.tracing.Tracing;
@@@ -104,10 -118,10 +120,10 @@@ public class DataResolver extends Respo
if (!command.limits().isUnlimited())
{
for (int i = 0; i < results.size(); i++)
- results.set(i, Transformation.apply(results.get(i), new ShortReadProtection(sources[i], resultCounter)));
+ results.set(i, Transformation.apply(results.get(i), new ShortReadProtection(sources[i], resultCounter, queryStartNanoTime)));
}
- return UnfilteredPartitionIterators.mergeAndFilter(results, command.nowInSec(), listener);
+ return UnfilteredPartitionIterators.merge(results, command.nowInSec(), listener);
}
private class RepairMergeListener implements UnfilteredPartitionIterators.MergeListener
@@@ -526,9 -526,9 +542,9 @@@
// we should request m rows so that m * x/n = n-x, that is m = (n^2/x) - n.
// Also note that it's ok if we retrieve more results that necessary since our top level iterator is a
// counting iterator.
- int n = postReconciliationCounter.countedInCurrentPartition();
- int x = counter.countedInCurrentPartition();
+ int n = countedInCurrentPartition(postReconciliationCounter);
+ int x = countedInCurrentPartition(counter);
- int toQuery = Math.max(((n * n) / x) - n, 1);
+ int toQuery = Math.max(((n * n) / Math.max(x, 1)) - n, 1);
DataLimits retryLimits = command.limits().forShortReadRetry(toQuery);
ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey);
@@@ -544,44 -547,12 +563,44 @@@
return doShortReadRetry(cmd);
}
+ /**
+ * Returns the number of results counted by the counter.
+ *
+ * @param counter the counter.
+ * @return the number of results counted by the counter
+ */
+ private int counted(Counter counter)
+ {
+ // We are interested by the number of rows but for GROUP BY queries 'counted' returns the number of
+ // groups.
+ if (command.limits().isGroupByLimit())
+ return counter.rowCounted();
+
+ return counter.counted();
+ }
+
+ /**
+ * Returns the number of results counted in the partition by the counter.
+ *
+ * @param counter the counter.
+ * @return the number of results counted in the partition by the counter
+ */
+ private int countedInCurrentPartition(Counter counter)
+ {
+ // We are interested by the number of rows but for GROUP BY queries 'countedInCurrentPartition' returns
+ // the number of groups in the current partition.
+ if (command.limits().isGroupByLimit())
+ return counter.rowCountedInCurrentPartition();
+
+ return counter.countedInCurrentPartition();
+ }
+
private UnfilteredRowIterator doShortReadRetry(SinglePartitionReadCommand retryCommand)
{
- DataResolver resolver = new DataResolver(keyspace, retryCommand, ConsistencyLevel.ONE, 1);
- ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source));
+ DataResolver resolver = new DataResolver(keyspace, retryCommand, ConsistencyLevel.ONE, 1, queryStartNanoTime);
+ ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source), queryStartNanoTime);
if (StorageProxy.canDoLocalRequest(source))
- StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler));
+ StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler));
else
MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(MessagingService.current_version), source, handler);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/826ae9c9/test/unit/org/apache/cassandra/db/ReadCommandTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/ReadCommandTest.java
index 2aef2a7,0000000..9264297
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
@@@ -1,311 -1,0 +1,312 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.partitions.FilteredPartition;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+
+public class ReadCommandTest
+{
+ private static final String KEYSPACE = "ReadCommandTest";
+ private static final String CF1 = "Standard1";
+ private static final String CF2 = "Standard2";
+ private static final String CF3 = "Standard3";
+
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
+ {
+ DatabaseDescriptor.daemonInitialization();
+
+ CFMetaData metadata1 = SchemaLoader.standardCFMD(KEYSPACE, CF1);
+
+ CFMetaData metadata2 = CFMetaData.Builder.create(KEYSPACE, CF2)
+ .addPartitionKey("key", BytesType.instance)
+ .addClusteringColumn("col", AsciiType.instance)
+ .addRegularColumn("a", AsciiType.instance)
+ .addRegularColumn("b", AsciiType.instance).build();
+
+ CFMetaData metadata3 = CFMetaData.Builder.create(KEYSPACE, CF3)
+ .addPartitionKey("key", BytesType.instance)
+ .addClusteringColumn("col", AsciiType.instance)
+ .addRegularColumn("a", AsciiType.instance)
+ .addRegularColumn("b", AsciiType.instance)
+ .addRegularColumn("c", AsciiType.instance)
+ .addRegularColumn("d", AsciiType.instance)
+ .addRegularColumn("e", AsciiType.instance)
+ .addRegularColumn("f", AsciiType.instance).build();
+
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE,
+ KeyspaceParams.simple(1),
+ metadata1,
+ metadata2,
+ metadata3);
+ }
+
+ @Test
+ public void testPartitionRangeAbort() throws Exception
+ {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF1);
+
+ new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key1"))
+ .clustering("Column1")
+ .add("val", ByteBufferUtil.bytes("abcd"))
+ .build()
+ .apply();
+
+ cfs.forceBlockingFlush();
+
+ new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key2"))
+ .clustering("Column1")
+ .add("val", ByteBufferUtil.bytes("abcd"))
+ .build()
+ .apply();
+
+ ReadCommand readCommand = Util.cmd(cfs).build();
+ assertEquals(2, Util.getAll(readCommand).size());
+
+ readCommand.abort();
+ assertEquals(0, Util.getAll(readCommand).size());
+ }
+
+ @Test
+ public void testSinglePartitionSliceAbort() throws Exception
+ {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2);
+
+ cfs.truncateBlocking();
+
+ new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key"))
+ .clustering("cc")
+ .add("a", ByteBufferUtil.bytes("abcd"))
+ .build()
+ .apply();
+
+ cfs.forceBlockingFlush();
+
+ new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key"))
+ .clustering("dd")
+ .add("a", ByteBufferUtil.bytes("abcd"))
+ .build()
+ .apply();
+
+ ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).build();
+
+ List<FilteredPartition> partitions = Util.getAll(readCommand);
+ assertEquals(1, partitions.size());
+ assertEquals(2, partitions.get(0).rowCount());
+
+ readCommand.abort();
+ assertEquals(0, Util.getAll(readCommand).size());
+ }
+
+ @Test
+ public void testSinglePartitionNamesAbort() throws Exception
+ {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2);
+
+ cfs.truncateBlocking();
+
+ new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key"))
+ .clustering("cc")
+ .add("a", ByteBufferUtil.bytes("abcd"))
+ .build()
+ .apply();
+
+ cfs.forceBlockingFlush();
+
+ new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key"))
+ .clustering("dd")
+ .add("a", ByteBufferUtil.bytes("abcd"))
+ .build()
+ .apply();
+
+ ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).includeRow("cc").includeRow("dd").build();
+
+ List<FilteredPartition> partitions = Util.getAll(readCommand);
+ assertEquals(1, partitions.size());
+ assertEquals(2, partitions.get(0).rowCount());
+
+ readCommand.abort();
+ assertEquals(0, Util.getAll(readCommand).size());
+ }
+
+ @Test
+ public void testSinglePartitionGroupMerge() throws Exception
+ {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF3);
+
+ String[][][] groups = new String[][][] {
+ new String[][] {
+ new String[] { "1", "key1", "aa", "a" }, // "1" indicates to create the data, "-1" to delete the row
+ new String[] { "1", "key2", "bb", "b" },
+ new String[] { "1", "key3", "cc", "c" }
+ },
+ new String[][] {
+ new String[] { "1", "key3", "dd", "d" },
+ new String[] { "1", "key2", "ee", "e" },
+ new String[] { "1", "key1", "ff", "f" }
+ },
+ new String[][] {
+ new String[] { "1", "key6", "aa", "a" },
+ new String[] { "1", "key5", "bb", "b" },
+ new String[] { "1", "key4", "cc", "c" }
+ },
+ new String[][] {
+ new String[] { "-1", "key6", "aa", "a" },
+ new String[] { "-1", "key2", "bb", "b" }
+ }
+ };
+
+ // Given the data above, when the keys are sorted and the deletions removed, we should
+ // get these clustering rows in this order
+ String[] expectedRows = new String[] { "aa", "ff", "ee", "cc", "dd", "cc", "bb"};
+
+ List<ByteBuffer> buffers = new ArrayList<>(groups.length);
+ int nowInSeconds = FBUtilities.nowInSeconds();
+ ColumnFilter columnFilter = ColumnFilter.allColumnsBuilder(cfs.metadata).build();
+ RowFilter rowFilter = RowFilter.create();
+ Slice slice = Slice.make(ClusteringBound.BOTTOM, ClusteringBound.TOP);
+ ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(Slices.with(cfs.metadata.comparator, slice), false);
+
+ for (String[][] group : groups)
+ {
+ cfs.truncateBlocking();
+
+ List<SinglePartitionReadCommand> commands = new ArrayList<>(group.length);
+
+ for (String[] data : group)
+ {
+ if (data[0].equals("1"))
+ {
+ new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes(data[1]))
+ .clustering(data[2])
+ .add(data[3], ByteBufferUtil.bytes("blah"))
+ .build()
+ .apply();
+ }
+ else
+ {
+ RowUpdateBuilder.deleteRow(cfs.metadata, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(data[1]), data[2]).apply();
+ }
+ commands.add(SinglePartitionReadCommand.create(cfs.metadata, nowInSeconds, columnFilter, rowFilter, DataLimits.NONE, Util.dk(data[1]), sliceFilter));
+ }
+
+ cfs.forceBlockingFlush();
+
+ ReadQuery query = new SinglePartitionReadCommand.Group(commands, DataLimits.NONE);
+
+ try (ReadExecutionController executionController = query.executionController();
+ UnfilteredPartitionIterator iter = query.executeLocally(executionController);
+ DataOutputBuffer buffer = new DataOutputBuffer())
+ {
+ UnfilteredPartitionIterators.serializerForIntraNode().serialize(iter,
+ columnFilter,
+ buffer,
+ MessagingService.current_version);
+ buffers.add(buffer.buffer());
+ }
+ }
+
+ // deserialize, merge and check the results are all there
+ List<UnfilteredPartitionIterator> iterators = new ArrayList<>();
+
+ for (ByteBuffer buffer : buffers)
+ {
+ try (DataInputBuffer in = new DataInputBuffer(buffer, true))
+ {
+ iterators.add(UnfilteredPartitionIterators.serializerForIntraNode().deserialize(in,
+ MessagingService.current_version,
+ cfs.metadata,
+ columnFilter,
+ SerializationHelper.Flag.LOCAL));
+ }
+ }
+
- try(PartitionIterator partitionIterator = UnfilteredPartitionIterators.mergeAndFilter(iterators,
- nowInSeconds,
- new UnfilteredPartitionIterators.MergeListener()
- {
- public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)
++ UnfilteredPartitionIterators.MergeListener listener =
++ new UnfilteredPartitionIterators.MergeListener()
+ {
- return null;
- }
++ public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)
++ {
++ return null;
++ }
+
- public void close()
- {
++ public void close()
++ {
+
- }
- }))
++ }
++ };
++
++ try (PartitionIterator partitionIterator = UnfilteredPartitionIterators.filter(UnfilteredPartitionIterators.merge(iterators, nowInSeconds, listener), nowInSeconds))
+ {
+
+ int i = 0;
+ int numPartitions = 0;
+ while (partitionIterator.hasNext())
+ {
+ numPartitions++;
+ try(RowIterator rowIterator = partitionIterator.next())
+ {
+ while (rowIterator.hasNext())
+ {
+ Row row = rowIterator.next();
+ assertEquals("col=" + expectedRows[i++], row.clustering().toString(cfs.metadata));
+ //System.out.print(row.toString(cfs.metadata, true));
+ }
+ }
+ }
+
+ assertEquals(5, numPartitions);
+ assertEquals(expectedRows.length, i);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[2/6] cassandra git commit: Fix AssertionError in short read
protection
Posted by al...@apache.org.
Fix AssertionError in short read protection
patch by Aleksey Yeschenko; reviewed by Benedict Elliott Smith for
CASSANDRA-13747
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a7cb009f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a7cb009f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a7cb009f
Branch: refs/heads/cassandra-3.11
Commit: a7cb009f8a3f4d0e0293111bfcfff3d404a37a89
Parents: dfbe3fa
Author: Aleksey Yeschenko <al...@yeschenko.com>
Authored: Sun Aug 6 19:42:47 2017 +0100
Committer: Aleksey Yeschenko <al...@yeschenko.com>
Committed: Tue Aug 29 12:22:39 2017 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../UnfilteredPartitionIterators.java | 6 ---
.../db/transform/EmptyPartitionsDiscarder.java | 35 +++++++++++++++
.../apache/cassandra/db/transform/Filter.java | 28 +++---------
.../db/transform/FilteredPartitions.java | 18 +++++---
.../cassandra/db/transform/FilteredRows.java | 2 +-
.../apache/cassandra/metrics/TableMetrics.java | 4 ++
.../apache/cassandra/service/DataResolver.java | 47 ++++++++++++++------
8 files changed, 94 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5ccd5cd..6609b05 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.15
+ * Fix AssertionError in short read protection (CASSANDRA-13747)
* Don't skip corrupted sstables on startup (CASSANDRA-13620)
* Fix the merging of cells with different user type versions (CASSANDRA-13776)
* Copy session properties on cqlsh.py do_login (CASSANDRA-13640)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index 1abbb19..4e0ac1b 100644
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -77,12 +77,6 @@ public abstract class UnfilteredPartitionIterators
return Transformation.apply(toReturn, new Close());
}
- public static PartitionIterator mergeAndFilter(List<UnfilteredPartitionIterator> iterators, int nowInSec, MergeListener listener)
- {
- // TODO: we could have a somewhat faster version if we were to merge the UnfilteredRowIterators directly as RowIterators
- return filter(merge(iterators, nowInSec, listener), nowInSec);
- }
-
public static PartitionIterator filter(final UnfilteredPartitionIterator iterator, final int nowInSec)
{
return FilteredPartitions.filter(iterator, nowInSec);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/transform/EmptyPartitionsDiscarder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/EmptyPartitionsDiscarder.java b/src/java/org/apache/cassandra/db/transform/EmptyPartitionsDiscarder.java
new file mode 100644
index 0000000..5e41cec
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/transform/EmptyPartitionsDiscarder.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.transform;
+
+import org.apache.cassandra.db.rows.BaseRowIterator;
+
+public final class EmptyPartitionsDiscarder extends Transformation<BaseRowIterator<?>>
+{
+ @Override
+ protected BaseRowIterator applyToPartition(BaseRowIterator iterator)
+ {
+ if (iterator.isEmpty())
+ {
+ iterator.close();
+ return null;
+ }
+
+ return iterator;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/transform/Filter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/Filter.java b/src/java/org/apache/cassandra/db/transform/Filter.java
index 48c8b1a..747983f 100644
--- a/src/java/org/apache/cassandra/db/transform/Filter.java
+++ b/src/java/org/apache/cassandra/db/transform/Filter.java
@@ -23,27 +23,21 @@ package org.apache.cassandra.db.transform;
import org.apache.cassandra.db.DeletionPurger;
import org.apache.cassandra.db.rows.*;
-final class Filter extends Transformation
+public final class Filter extends Transformation
{
- private final boolean filterEmpty; // generally maps to !isForThrift, but also false for direct row filtration
private final int nowInSec;
- public Filter(boolean filterEmpty, int nowInSec)
+
+ public Filter(int nowInSec)
{
- this.filterEmpty = filterEmpty;
this.nowInSec = nowInSec;
}
@Override
protected RowIterator applyToPartition(BaseRowIterator iterator)
{
- RowIterator filtered = iterator instanceof UnfilteredRows
- ? new FilteredRows(this, (UnfilteredRows) iterator)
- : new FilteredRows((UnfilteredRowIterator) iterator, this);
-
- if (filterEmpty && closeIfEmpty(filtered))
- return null;
-
- return filtered;
+ return iterator instanceof UnfilteredRows
+ ? new FilteredRows(this, (UnfilteredRows) iterator)
+ : new FilteredRows((UnfilteredRowIterator) iterator, this);
}
@Override
@@ -67,14 +61,4 @@ final class Filter extends Transformation
{
return null;
}
-
- private static boolean closeIfEmpty(BaseRowIterator<?> iter)
- {
- if (iter.isEmpty())
- {
- iter.close();
- return true;
- }
- return false;
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java b/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
index 09e36b4..ad9446d 100644
--- a/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
+++ b/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
@@ -50,11 +50,19 @@ public final class FilteredPartitions extends BasePartitions<RowIterator, BasePa
/**
* Filter any RangeTombstoneMarker from the iterator's iterators, transforming it into a PartitionIterator.
*/
- public static PartitionIterator filter(UnfilteredPartitionIterator iterator, int nowInSecs)
+ public static FilteredPartitions filter(UnfilteredPartitionIterator iterator, int nowInSecs)
{
- Filter filter = new Filter(!iterator.isForThrift(), nowInSecs);
- if (iterator instanceof UnfilteredPartitions)
- return new FilteredPartitions(filter, (UnfilteredPartitions) iterator);
- return new FilteredPartitions(iterator, filter);
+ FilteredPartitions filtered = filter(iterator, new Filter(nowInSecs));
+
+ return iterator.isForThrift()
+ ? filtered
+ : (FilteredPartitions) Transformation.apply(filtered, new EmptyPartitionsDiscarder());
+ }
+
+ public static FilteredPartitions filter(UnfilteredPartitionIterator iterator, Filter filter)
+ {
+ return iterator instanceof UnfilteredPartitions
+ ? new FilteredPartitions(filter, (UnfilteredPartitions) iterator)
+ : new FilteredPartitions(iterator, filter);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/transform/FilteredRows.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/FilteredRows.java b/src/java/org/apache/cassandra/db/transform/FilteredRows.java
index 818d3bb..5b635eb 100644
--- a/src/java/org/apache/cassandra/db/transform/FilteredRows.java
+++ b/src/java/org/apache/cassandra/db/transform/FilteredRows.java
@@ -55,6 +55,6 @@ public final class FilteredRows extends BaseRows<Row, BaseRowIterator<?>> implem
*/
public static RowIterator filter(UnfilteredRowIterator iterator, int nowInSecs)
{
- return new Filter(false, nowInSecs).applyToPartition(iterator);
+ return new Filter(nowInSecs).applyToPartition(iterator);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index a493836..fe88a63 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -151,6 +151,8 @@ public class TableMetrics
public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Write");
public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Range");
+ public final Meter shortReadProtectionRequests;
+
public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;
/**
* stores metrics that will be rolled into a single global metric
@@ -645,6 +647,8 @@ public class TableMetrics
casPrepare = new LatencyMetrics(factory, "CasPrepare", cfs.keyspace.metric.casPrepare);
casPropose = new LatencyMetrics(factory, "CasPropose", cfs.keyspace.metric.casPropose);
casCommit = new LatencyMetrics(factory, "CasCommit", cfs.keyspace.metric.casCommit);
+
+ shortReadProtectionRequests = Metrics.meter(factory.createMetricName("ShortReadProtectionRequests"));
}
public void updateSSTableIterated(int count)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 26b1b2a..72c4950 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -27,16 +27,13 @@ import com.google.common.collect.Iterables;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.*;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.ClusteringIndexFilter;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.transform.MoreRows;
-import org.apache.cassandra.db.transform.Transformation;
+import org.apache.cassandra.db.transform.*;
import org.apache.cassandra.exceptions.ReadTimeoutException;
import org.apache.cassandra.net.*;
import org.apache.cassandra.tracing.Tracing;
@@ -72,10 +69,29 @@ public class DataResolver extends ResponseResolver
sources[i] = msg.from;
}
- // Even though every responses should honor the limit, we might have more than requested post reconciliation,
- // so ensure we're respecting the limit.
+ /*
+ * Even though every response, individually, will honor the limit, it is possible that we will, after the merge,
+ * have more rows than the client requested. To make sure that we still conform to the original limit,
+ * we apply a top-level post-reconciliation counter to the merged partition iterator.
+ *
+ * Short read protection logic (ShortReadRowProtection.moreContents()) relies on this counter to be applied
+ * to the current partition to work. For this reason we have to apply the counter transformation before
+ * empty partition discard logic kicks in - for it will eagerly consume the iterator.
+ *
+ * That's why the order here is: 1) merge; 2) filter rows; 3) count; 4) discard empty partitions
+ *
+ * See CASSANDRA-13747 for more details.
+ */
+
DataLimits.Counter counter = command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition());
- return counter.applyTo(mergeWithShortReadProtection(iters, sources, counter));
+
+ UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters, sources, counter);
+ FilteredPartitions filtered = FilteredPartitions.filter(merged, new Filter(command.nowInSec()));
+ PartitionIterator counted = counter.applyTo(filtered);
+
+ return command.isForThrift()
+ ? counted
+ : Transformation.apply(counted, new EmptyPartitionsDiscarder());
}
public void compareResponses()
@@ -87,11 +103,13 @@ public class DataResolver extends ResponseResolver
}
}
- private PartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results, InetAddress[] sources, DataLimits.Counter resultCounter)
+ private UnfilteredPartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results,
+ InetAddress[] sources,
+ DataLimits.Counter resultCounter)
{
// If we have only one results, there is no read repair to do and we can't get short reads
if (results.size() == 1)
- return UnfilteredPartitionIterators.filter(results.get(0), command.nowInSec());
+ return results.get(0);
UnfilteredPartitionIterators.MergeListener listener = new RepairMergeListener(sources);
@@ -103,7 +121,7 @@ public class DataResolver extends ResponseResolver
results.set(i, Transformation.apply(results.get(i), new ShortReadProtection(sources[i], resultCounter)));
}
- return UnfilteredPartitionIterators.mergeAndFilter(results, command.nowInSec(), listener);
+ return UnfilteredPartitionIterators.merge(results, command.nowInSec(), listener);
}
private class RepairMergeListener implements UnfilteredPartitionIterators.MergeListener
@@ -510,7 +528,7 @@ public class DataResolver extends ResponseResolver
// counting iterator.
int n = postReconciliationCounter.countedInCurrentPartition();
int x = counter.countedInCurrentPartition();
- int toQuery = Math.max(((n * n) / x) - n, 1);
+ int toQuery = Math.max(((n * n) / Math.max(x, 1)) - n, 1);
DataLimits retryLimits = command.limits().forShortReadRetry(toQuery);
ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey);
@@ -523,6 +541,9 @@ public class DataResolver extends ResponseResolver
partitionKey,
retryFilter);
+ Tracing.trace("Requesting {} extra rows from {} for short read protection", toQuery, source);
+ Schema.instance.getColumnFamilyStoreInstance(cmd.metadata().cfId).metric.shortReadProtectionRequests.mark();
+
return doShortReadRetry(cmd);
}
@@ -531,7 +552,7 @@ public class DataResolver extends ResponseResolver
DataResolver resolver = new DataResolver(keyspace, retryCommand, ConsistencyLevel.ONE, 1);
ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source));
if (StorageProxy.canDoLocalRequest(source))
- StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler));
+ StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler));
else
MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(MessagingService.current_version), source, handler);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org