You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2017/08/29 11:36:00 UTC

[1/6] cassandra git commit: Fix AssertionError in short read protection

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 dfbe3fabd -> a7cb009f8
  refs/heads/cassandra-3.11 809f3b30e -> 826ae9c91
  refs/heads/trunk 326f3a7c7 -> 278906c6c


Fix AssertionError in short read protection

patch by Aleksey Yeschenko; reviewed by Benedict Elliott Smith for
CASSANDRA-13747


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a7cb009f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a7cb009f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a7cb009f

Branch: refs/heads/cassandra-3.0
Commit: a7cb009f8a3f4d0e0293111bfcfff3d404a37a89
Parents: dfbe3fa
Author: Aleksey Yeschenko <al...@yeschenko.com>
Authored: Sun Aug 6 19:42:47 2017 +0100
Committer: Aleksey Yeschenko <al...@yeschenko.com>
Committed: Tue Aug 29 12:22:39 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../UnfilteredPartitionIterators.java           |  6 ---
 .../db/transform/EmptyPartitionsDiscarder.java  | 35 +++++++++++++++
 .../apache/cassandra/db/transform/Filter.java   | 28 +++---------
 .../db/transform/FilteredPartitions.java        | 18 +++++---
 .../cassandra/db/transform/FilteredRows.java    |  2 +-
 .../apache/cassandra/metrics/TableMetrics.java  |  4 ++
 .../apache/cassandra/service/DataResolver.java  | 47 ++++++++++++++------
 8 files changed, 94 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5ccd5cd..6609b05 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.15
+ * Fix AssertionError in short read protection (CASSANDRA-13747)
  * Don't skip corrupted sstables on startup (CASSANDRA-13620)
  * Fix the merging of cells with different user type versions (CASSANDRA-13776)
  * Copy session properties on cqlsh.py do_login (CASSANDRA-13640)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index 1abbb19..4e0ac1b 100644
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -77,12 +77,6 @@ public abstract class UnfilteredPartitionIterators
         return Transformation.apply(toReturn, new Close());
     }
 
-    public static PartitionIterator mergeAndFilter(List<UnfilteredPartitionIterator> iterators, int nowInSec, MergeListener listener)
-    {
-        // TODO: we could have a somewhat faster version if we were to merge the UnfilteredRowIterators directly as RowIterators
-        return filter(merge(iterators, nowInSec, listener), nowInSec);
-    }
-
     public static PartitionIterator filter(final UnfilteredPartitionIterator iterator, final int nowInSec)
     {
         return FilteredPartitions.filter(iterator, nowInSec);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/transform/EmptyPartitionsDiscarder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/EmptyPartitionsDiscarder.java b/src/java/org/apache/cassandra/db/transform/EmptyPartitionsDiscarder.java
new file mode 100644
index 0000000..5e41cec
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/transform/EmptyPartitionsDiscarder.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.transform;
+
+import org.apache.cassandra.db.rows.BaseRowIterator;
+
+public final class EmptyPartitionsDiscarder extends Transformation<BaseRowIterator<?>>
+{
+    @Override
+    protected BaseRowIterator applyToPartition(BaseRowIterator iterator)
+    {
+        if (iterator.isEmpty())
+        {
+            iterator.close();
+            return null;
+        }
+
+        return iterator;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/transform/Filter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/Filter.java b/src/java/org/apache/cassandra/db/transform/Filter.java
index 48c8b1a..747983f 100644
--- a/src/java/org/apache/cassandra/db/transform/Filter.java
+++ b/src/java/org/apache/cassandra/db/transform/Filter.java
@@ -23,27 +23,21 @@ package org.apache.cassandra.db.transform;
 import org.apache.cassandra.db.DeletionPurger;
 import org.apache.cassandra.db.rows.*;
 
-final class Filter extends Transformation
+public final class Filter extends Transformation
 {
-    private final boolean filterEmpty; // generally maps to !isForThrift, but also false for direct row filtration
     private final int nowInSec;
-    public Filter(boolean filterEmpty, int nowInSec)
+
+    public Filter(int nowInSec)
     {
-        this.filterEmpty = filterEmpty;
         this.nowInSec = nowInSec;
     }
 
     @Override
     protected RowIterator applyToPartition(BaseRowIterator iterator)
     {
-        RowIterator filtered = iterator instanceof UnfilteredRows
-                               ? new FilteredRows(this, (UnfilteredRows) iterator)
-                               : new FilteredRows((UnfilteredRowIterator) iterator, this);
-
-        if (filterEmpty && closeIfEmpty(filtered))
-            return null;
-
-        return filtered;
+        return iterator instanceof UnfilteredRows
+             ? new FilteredRows(this, (UnfilteredRows) iterator)
+             : new FilteredRows((UnfilteredRowIterator) iterator, this);
     }
 
     @Override
@@ -67,14 +61,4 @@ final class Filter extends Transformation
     {
         return null;
     }
-
-    private static boolean closeIfEmpty(BaseRowIterator<?> iter)
-    {
-        if (iter.isEmpty())
-        {
-            iter.close();
-            return true;
-        }
-        return false;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java b/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
index 09e36b4..ad9446d 100644
--- a/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
+++ b/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
@@ -50,11 +50,19 @@ public final class FilteredPartitions extends BasePartitions<RowIterator, BasePa
     /**
      * Filter any RangeTombstoneMarker from the iterator's iterators, transforming it into a PartitionIterator.
      */
-    public static PartitionIterator filter(UnfilteredPartitionIterator iterator, int nowInSecs)
+    public static FilteredPartitions filter(UnfilteredPartitionIterator iterator, int nowInSecs)
     {
-        Filter filter = new Filter(!iterator.isForThrift(), nowInSecs);
-        if (iterator instanceof UnfilteredPartitions)
-            return new FilteredPartitions(filter, (UnfilteredPartitions) iterator);
-        return new FilteredPartitions(iterator, filter);
+        FilteredPartitions filtered = filter(iterator, new Filter(nowInSecs));
+
+        return iterator.isForThrift()
+             ? filtered
+             : (FilteredPartitions) Transformation.apply(filtered, new EmptyPartitionsDiscarder());
+    }
+
+    public static FilteredPartitions filter(UnfilteredPartitionIterator iterator, Filter filter)
+    {
+        return iterator instanceof UnfilteredPartitions
+             ? new FilteredPartitions(filter, (UnfilteredPartitions) iterator)
+             : new FilteredPartitions(iterator, filter);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/transform/FilteredRows.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/FilteredRows.java b/src/java/org/apache/cassandra/db/transform/FilteredRows.java
index 818d3bb..5b635eb 100644
--- a/src/java/org/apache/cassandra/db/transform/FilteredRows.java
+++ b/src/java/org/apache/cassandra/db/transform/FilteredRows.java
@@ -55,6 +55,6 @@ public final class FilteredRows extends BaseRows<Row, BaseRowIterator<?>> implem
      */
     public static RowIterator filter(UnfilteredRowIterator iterator, int nowInSecs)
     {
-        return new Filter(false, nowInSecs).applyToPartition(iterator);
+        return new Filter(nowInSecs).applyToPartition(iterator);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index a493836..fe88a63 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -151,6 +151,8 @@ public class TableMetrics
     public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Write");
     public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Range");
 
+    public final Meter shortReadProtectionRequests;
+
     public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;
     /**
      * stores metrics that will be rolled into a single global metric
@@ -645,6 +647,8 @@ public class TableMetrics
         casPrepare = new LatencyMetrics(factory, "CasPrepare", cfs.keyspace.metric.casPrepare);
         casPropose = new LatencyMetrics(factory, "CasPropose", cfs.keyspace.metric.casPropose);
         casCommit = new LatencyMetrics(factory, "CasCommit", cfs.keyspace.metric.casCommit);
+
+        shortReadProtectionRequests = Metrics.meter(factory.createMetricName("ShortReadProtectionRequests"));
     }
 
     public void updateSSTableIterated(int count)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 26b1b2a..72c4950 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -27,16 +27,13 @@ import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ClusteringIndexFilter;
 import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.transform.MoreRows;
-import org.apache.cassandra.db.transform.Transformation;
+import org.apache.cassandra.db.transform.*;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.net.*;
 import org.apache.cassandra.tracing.Tracing;
@@ -72,10 +69,29 @@ public class DataResolver extends ResponseResolver
             sources[i] = msg.from;
         }
 
-        // Even though every responses should honor the limit, we might have more than requested post reconciliation,
-        // so ensure we're respecting the limit.
+        /*
+         * Even though every response, individually, will honor the limit, it is possible that we will, after the merge,
+         * have more rows than the client requested. To make sure that we still conform to the original limit,
+         * we apply a top-level post-reconciliation counter to the merged partition iterator.
+         *
+         * Short read protection logic (ShortReadRowProtection.moreContents()) relies on this counter to be applied
+         * to the current partition to work. For this reason we have to apply the counter transformation before
+         * empty partition discard logic kicks in - for it will eagerly consume the iterator.
+         *
+         * That's why the order here is: 1) merge; 2) filter rows; 3) count; 4) discard empty partitions
+         *
+         * See CASSANDRA-13747 for more details.
+         */
+
         DataLimits.Counter counter = command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition());
-        return counter.applyTo(mergeWithShortReadProtection(iters, sources, counter));
+
+        UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters, sources, counter);
+        FilteredPartitions filtered = FilteredPartitions.filter(merged, new Filter(command.nowInSec()));
+        PartitionIterator counted = counter.applyTo(filtered);
+
+        return command.isForThrift()
+             ? counted
+             : Transformation.apply(counted, new EmptyPartitionsDiscarder());
     }
 
     public void compareResponses()
@@ -87,11 +103,13 @@ public class DataResolver extends ResponseResolver
         }
     }
 
-    private PartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results, InetAddress[] sources, DataLimits.Counter resultCounter)
+    private UnfilteredPartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results,
+                                                                     InetAddress[] sources,
+                                                                     DataLimits.Counter resultCounter)
     {
         // If we have only one results, there is no read repair to do and we can't get short reads
         if (results.size() == 1)
-            return UnfilteredPartitionIterators.filter(results.get(0), command.nowInSec());
+            return results.get(0);
 
         UnfilteredPartitionIterators.MergeListener listener = new RepairMergeListener(sources);
 
@@ -103,7 +121,7 @@ public class DataResolver extends ResponseResolver
                 results.set(i, Transformation.apply(results.get(i), new ShortReadProtection(sources[i], resultCounter)));
         }
 
-        return UnfilteredPartitionIterators.mergeAndFilter(results, command.nowInSec(), listener);
+        return UnfilteredPartitionIterators.merge(results, command.nowInSec(), listener);
     }
 
     private class RepairMergeListener implements UnfilteredPartitionIterators.MergeListener
@@ -510,7 +528,7 @@ public class DataResolver extends ResponseResolver
                 // counting iterator.
                 int n = postReconciliationCounter.countedInCurrentPartition();
                 int x = counter.countedInCurrentPartition();
-                int toQuery = Math.max(((n * n) / x) - n, 1);
+                int toQuery = Math.max(((n * n) / Math.max(x, 1)) - n, 1);
 
                 DataLimits retryLimits = command.limits().forShortReadRetry(toQuery);
                 ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey);
@@ -523,6 +541,9 @@ public class DataResolver extends ResponseResolver
                                                                                    partitionKey,
                                                                                    retryFilter);
 
+                Tracing.trace("Requesting {} extra rows from {} for short read protection", toQuery, source);
+                Schema.instance.getColumnFamilyStoreInstance(cmd.metadata().cfId).metric.shortReadProtectionRequests.mark();
+
                 return doShortReadRetry(cmd);
             }
 
@@ -531,7 +552,7 @@ public class DataResolver extends ResponseResolver
                 DataResolver resolver = new DataResolver(keyspace, retryCommand, ConsistencyLevel.ONE, 1);
                 ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source));
                 if (StorageProxy.canDoLocalRequest(source))
-                      StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler));
+                    StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler));
                 else
                     MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(MessagingService.current_version), source, handler);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[6/6] cassandra git commit: Merge branch 'cassandra-3.11' into trunk

Posted by al...@apache.org.
Merge branch 'cassandra-3.11' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/278906c6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/278906c6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/278906c6

Branch: refs/heads/trunk
Commit: 278906c6c0424c1ce0d922c24747c97978b0aa14
Parents: 326f3a7 826ae9c
Author: Aleksey Yeschenko <al...@yeschenko.com>
Authored: Tue Aug 29 12:33:33 2017 +0100
Committer: Aleksey Yeschenko <al...@yeschenko.com>
Committed: Tue Aug 29 12:33:50 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../UnfilteredPartitionIterators.java           |  7 ---
 .../db/transform/EmptyPartitionsDiscarder.java  | 35 +++++++++++++++
 .../apache/cassandra/db/transform/Filter.java   | 28 +++---------
 .../db/transform/FilteredPartitions.java        | 15 ++++---
 .../cassandra/db/transform/FilteredRows.java    |  2 +-
 .../apache/cassandra/metrics/TableMetrics.java  |  4 ++
 .../apache/cassandra/service/DataResolver.java  | 45 ++++++++++++++------
 .../apache/cassandra/db/ReadCommandTest.java    | 23 +++++-----
 9 files changed, 101 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/278906c6/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278906c6/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278906c6/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
index ed643bb,ad9446d..fa12c9c
--- a/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
+++ b/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
@@@ -50,11 -50,19 +50,16 @@@ public final class FilteredPartitions e
      /**
       * Filter any RangeTombstoneMarker from the iterator's iterators, transforming it into a PartitionIterator.
       */
-     public static PartitionIterator filter(UnfilteredPartitionIterator iterator, int nowInSecs)
+     public static FilteredPartitions filter(UnfilteredPartitionIterator iterator, int nowInSecs)
      {
-         Filter filter = new Filter(true, nowInSecs);
-         if (iterator instanceof UnfilteredPartitions)
-             return new FilteredPartitions(filter, (UnfilteredPartitions) iterator);
-         return new FilteredPartitions(iterator, filter);
+         FilteredPartitions filtered = filter(iterator, new Filter(nowInSecs));
 -
 -        return iterator.isForThrift()
 -             ? filtered
 -             : (FilteredPartitions) Transformation.apply(filtered, new EmptyPartitionsDiscarder());
++        return (FilteredPartitions) Transformation.apply(filtered, new EmptyPartitionsDiscarder());
+     }
+ 
+     public static FilteredPartitions filter(UnfilteredPartitionIterator iterator, Filter filter)
+     {
+         return iterator instanceof UnfilteredPartitions
+              ? new FilteredPartitions(filter, (UnfilteredPartitions) iterator)
+              : new FilteredPartitions(iterator, filter);
      }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278906c6/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/metrics/TableMetrics.java
index 58b017e,b0f667c..7e6ca25
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@@ -240,33 -201,8 +240,35 @@@ public class TableMetric
          }
      });
  
 +    public static final Gauge<Long> globalBytesRepaired = Metrics.register(globalFactory.createMetricName("BytesRepaired"),
 +                                                                           new Gauge<Long>()
 +    {
 +        public Long getValue()
 +        {
 +            return totalNonSystemTablesSize(SSTableReader::isRepaired).left;
 +        }
 +    });
 +
 +    public static final Gauge<Long> globalBytesUnrepaired = Metrics.register(globalFactory.createMetricName("BytesUnrepaired"),
 +                                                                             new Gauge<Long>()
 +    {
 +        public Long getValue()
 +        {
 +            return totalNonSystemTablesSize(s -> !s.isRepaired() && !s.isPendingRepair()).left;
 +        }
 +    });
 +
 +    public static final Gauge<Long> globalBytesPendingRepair = Metrics.register(globalFactory.createMetricName("BytesPendingRepair"),
 +                                                                                new Gauge<Long>()
 +    {
 +        public Long getValue()
 +        {
 +            return totalNonSystemTablesSize(SSTableReader::isPendingRepair).left;
 +        }
 +    });
 +
+     public final Meter shortReadProtectionRequests;
+ 
      public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;
      /**
       * stores metrics that will be rolled into a single global metric
@@@ -810,25 -697,7 +812,27 @@@
          casPropose = new LatencyMetrics(factory, "CasPropose", cfs.keyspace.metric.casPropose);
          casCommit = new LatencyMetrics(factory, "CasCommit", cfs.keyspace.metric.casCommit);
  
 +        repairsStarted = createTableCounter("RepairJobsStarted");
 +        repairsCompleted = createTableCounter("RepairJobsCompleted");
 +
 +        anticompactionTime = createTableTimer("AnticompactionTime", cfs.keyspace.metric.anticompactionTime);
 +        validationTime = createTableTimer("ValidationTime", cfs.keyspace.metric.validationTime);
 +        syncTime = createTableTimer("SyncTime", cfs.keyspace.metric.repairSyncTime);
 +
 +        bytesValidated = createTableHistogram("BytesValidated", cfs.keyspace.metric.bytesValidated, false);
 +        partitionsValidated = createTableHistogram("PartitionsValidated", cfs.keyspace.metric.partitionsValidated, false);
 +        bytesAnticompacted = createTableCounter("BytesAnticompacted");
 +        bytesMutatedAnticompaction = createTableCounter("BytesMutatedAnticompaction");
 +        mutatedAnticompactionGauge = createTableGauge("MutatedAnticompactionGauge", () ->
 +        {
 +            double bytesMutated = bytesMutatedAnticompaction.getCount();
 +            double bytesAnticomp = bytesAnticompacted.getCount();
 +            if (bytesAnticomp + bytesMutated > 0)
 +                return bytesMutated / (bytesAnticomp + bytesMutated);
 +            return 0.0;
 +        });
++
+         shortReadProtectionRequests = Metrics.meter(factory.createMetricName("ShortReadProtectionRequests"));
      }
  
      public void updateSSTableIterated(int count)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278906c6/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/DataResolver.java
index 78bbe16,32b6d79..f4a472d
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@@ -27,13 -27,9 +27,12 @@@ import com.google.common.collect.Iterab
  
  import org.apache.cassandra.concurrent.Stage;
  import org.apache.cassandra.concurrent.StageManager;
 -import org.apache.cassandra.config.*;
 +import org.apache.cassandra.schema.ColumnMetadata;
++import org.apache.cassandra.schema.Schema;
 +import org.apache.cassandra.schema.TableMetadata;
 +import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.db.*;
- import org.apache.cassandra.db.filter.ClusteringIndexFilter;
- import org.apache.cassandra.db.filter.ColumnFilter;
- import org.apache.cassandra.db.filter.DataLimits;
+ import org.apache.cassandra.db.filter.*;
  import org.apache.cassandra.db.filter.DataLimits.Counter;
  import org.apache.cassandra.db.partitions.*;
  import org.apache.cassandra.db.rows.*;
@@@ -76,10 -71,29 +74,26 @@@ public class DataResolver extends Respo
              sources[i] = msg.from;
          }
  
-         // Even though every responses should honor the limit, we might have more than requested post reconciliation,
-         // so ensure we're respecting the limit.
+         /*
+          * Even though every response, individually, will honor the limit, it is possible that we will, after the merge,
+          * have more rows than the client requested. To make sure that we still conform to the original limit,
+          * we apply a top-level post-reconciliation counter to the merged partition iterator.
+          *
+          * Short read protection logic (ShortReadRowProtection.moreContents()) relies on this counter to be applied
+          * to the current partition to work. For this reason we have to apply the counter transformation before
+          * empty partition discard logic kicks in - for it will eagerly consume the iterator.
+          *
+          * That's why the order here is: 1) merge; 2) filter rows; 3) count; 4) discard empty partitions
+          *
+          * See CASSANDRA-13747 for more details.
+          */
+ 
          DataLimits.Counter counter = command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition());
-         return counter.applyTo(mergeWithShortReadProtection(iters, sources, counter));
+ 
+         UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters, sources, counter);
+         FilteredPartitions filtered = FilteredPartitions.filter(merged, new Filter(command.nowInSec()));
+         PartitionIterator counted = counter.applyTo(filtered);
 -
 -        return command.isForThrift()
 -             ? counted
 -             : Transformation.apply(counted, new EmptyPartitionsDiscarder());
++        return Transformation.apply(counted, new EmptyPartitionsDiscarder());
      }
  
      public void compareResponses()
@@@ -541,6 -557,9 +557,9 @@@
                                                                                     partitionKey,
                                                                                     retryFilter);
  
+                 Tracing.trace("Requesting {} extra rows from {} for short read protection", toQuery, source);
 -                Schema.instance.getColumnFamilyStoreInstance(cmd.metadata().cfId).metric.shortReadProtectionRequests.mark();
++                Schema.instance.getColumnFamilyStoreInstance(cmd.metadata().id).metric.shortReadProtectionRequests.mark();
+ 
                  return doShortReadRetry(cmd);
              }
  
@@@ -581,9 -600,9 +600,9 @@@
                  DataResolver resolver = new DataResolver(keyspace, retryCommand, ConsistencyLevel.ONE, 1, queryStartNanoTime);
                  ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source), queryStartNanoTime);
                  if (StorageProxy.canDoLocalRequest(source))
-                       StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler));
+                     StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler));
                  else
 -                    MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(MessagingService.current_version), source, handler);
 +                    MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(), source, handler);
  
                  // We don't call handler.get() because we want to preserve tombstones since we're still in the middle of merging node results.
                  handler.awaitResults();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278906c6/test/unit/org/apache/cassandra/db/ReadCommandTest.java
----------------------------------------------------------------------


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[3/6] cassandra git commit: Fix AssertionError in short read protection

Posted by al...@apache.org.
Fix AssertionError in short read protection

patch by Aleksey Yeschenko; reviewed by Benedict Elliott Smith for
CASSANDRA-13747


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a7cb009f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a7cb009f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a7cb009f

Branch: refs/heads/trunk
Commit: a7cb009f8a3f4d0e0293111bfcfff3d404a37a89
Parents: dfbe3fa
Author: Aleksey Yeschenko <al...@yeschenko.com>
Authored: Sun Aug 6 19:42:47 2017 +0100
Committer: Aleksey Yeschenko <al...@yeschenko.com>
Committed: Tue Aug 29 12:22:39 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../UnfilteredPartitionIterators.java           |  6 ---
 .../db/transform/EmptyPartitionsDiscarder.java  | 35 +++++++++++++++
 .../apache/cassandra/db/transform/Filter.java   | 28 +++---------
 .../db/transform/FilteredPartitions.java        | 18 +++++---
 .../cassandra/db/transform/FilteredRows.java    |  2 +-
 .../apache/cassandra/metrics/TableMetrics.java  |  4 ++
 .../apache/cassandra/service/DataResolver.java  | 47 ++++++++++++++------
 8 files changed, 94 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5ccd5cd..6609b05 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.15
+ * Fix AssertionError in short read protection (CASSANDRA-13747)
  * Don't skip corrupted sstables on startup (CASSANDRA-13620)
  * Fix the merging of cells with different user type versions (CASSANDRA-13776)
  * Copy session properties on cqlsh.py do_login (CASSANDRA-13640)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index 1abbb19..4e0ac1b 100644
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -77,12 +77,6 @@ public abstract class UnfilteredPartitionIterators
         return Transformation.apply(toReturn, new Close());
     }
 
-    public static PartitionIterator mergeAndFilter(List<UnfilteredPartitionIterator> iterators, int nowInSec, MergeListener listener)
-    {
-        // TODO: we could have a somewhat faster version if we were to merge the UnfilteredRowIterators directly as RowIterators
-        return filter(merge(iterators, nowInSec, listener), nowInSec);
-    }
-
     public static PartitionIterator filter(final UnfilteredPartitionIterator iterator, final int nowInSec)
     {
         return FilteredPartitions.filter(iterator, nowInSec);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/transform/EmptyPartitionsDiscarder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/EmptyPartitionsDiscarder.java b/src/java/org/apache/cassandra/db/transform/EmptyPartitionsDiscarder.java
new file mode 100644
index 0000000..5e41cec
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/transform/EmptyPartitionsDiscarder.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.transform;
+
+import org.apache.cassandra.db.rows.BaseRowIterator;
+
+public final class EmptyPartitionsDiscarder extends Transformation<BaseRowIterator<?>>
+{
+    @Override
+    protected BaseRowIterator applyToPartition(BaseRowIterator iterator)
+    {
+        if (iterator.isEmpty())
+        {
+            iterator.close();
+            return null;
+        }
+
+        return iterator;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/transform/Filter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/Filter.java b/src/java/org/apache/cassandra/db/transform/Filter.java
index 48c8b1a..747983f 100644
--- a/src/java/org/apache/cassandra/db/transform/Filter.java
+++ b/src/java/org/apache/cassandra/db/transform/Filter.java
@@ -23,27 +23,21 @@ package org.apache.cassandra.db.transform;
 import org.apache.cassandra.db.DeletionPurger;
 import org.apache.cassandra.db.rows.*;
 
-final class Filter extends Transformation
+public final class Filter extends Transformation
 {
-    private final boolean filterEmpty; // generally maps to !isForThrift, but also false for direct row filtration
     private final int nowInSec;
-    public Filter(boolean filterEmpty, int nowInSec)
+
+    public Filter(int nowInSec)
     {
-        this.filterEmpty = filterEmpty;
         this.nowInSec = nowInSec;
     }
 
     @Override
     protected RowIterator applyToPartition(BaseRowIterator iterator)
     {
-        RowIterator filtered = iterator instanceof UnfilteredRows
-                               ? new FilteredRows(this, (UnfilteredRows) iterator)
-                               : new FilteredRows((UnfilteredRowIterator) iterator, this);
-
-        if (filterEmpty && closeIfEmpty(filtered))
-            return null;
-
-        return filtered;
+        return iterator instanceof UnfilteredRows
+             ? new FilteredRows(this, (UnfilteredRows) iterator)
+             : new FilteredRows((UnfilteredRowIterator) iterator, this);
     }
 
     @Override
@@ -67,14 +61,4 @@ final class Filter extends Transformation
     {
         return null;
     }
-
-    private static boolean closeIfEmpty(BaseRowIterator<?> iter)
-    {
-        if (iter.isEmpty())
-        {
-            iter.close();
-            return true;
-        }
-        return false;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java b/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
index 09e36b4..ad9446d 100644
--- a/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
+++ b/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
@@ -50,11 +50,19 @@ public final class FilteredPartitions extends BasePartitions<RowIterator, BasePa
     /**
      * Filter any RangeTombstoneMarker from the iterator's iterators, transforming it into a PartitionIterator.
      */
-    public static PartitionIterator filter(UnfilteredPartitionIterator iterator, int nowInSecs)
+    public static FilteredPartitions filter(UnfilteredPartitionIterator iterator, int nowInSecs)
     {
-        Filter filter = new Filter(!iterator.isForThrift(), nowInSecs);
-        if (iterator instanceof UnfilteredPartitions)
-            return new FilteredPartitions(filter, (UnfilteredPartitions) iterator);
-        return new FilteredPartitions(iterator, filter);
+        FilteredPartitions filtered = filter(iterator, new Filter(nowInSecs));
+
+        return iterator.isForThrift()
+             ? filtered
+             : (FilteredPartitions) Transformation.apply(filtered, new EmptyPartitionsDiscarder());
+    }
+
+    public static FilteredPartitions filter(UnfilteredPartitionIterator iterator, Filter filter)
+    {
+        return iterator instanceof UnfilteredPartitions
+             ? new FilteredPartitions(filter, (UnfilteredPartitions) iterator)
+             : new FilteredPartitions(iterator, filter);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/transform/FilteredRows.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/FilteredRows.java b/src/java/org/apache/cassandra/db/transform/FilteredRows.java
index 818d3bb..5b635eb 100644
--- a/src/java/org/apache/cassandra/db/transform/FilteredRows.java
+++ b/src/java/org/apache/cassandra/db/transform/FilteredRows.java
@@ -55,6 +55,6 @@ public final class FilteredRows extends BaseRows<Row, BaseRowIterator<?>> implem
      */
     public static RowIterator filter(UnfilteredRowIterator iterator, int nowInSecs)
     {
-        return new Filter(false, nowInSecs).applyToPartition(iterator);
+        return new Filter(nowInSecs).applyToPartition(iterator);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index a493836..fe88a63 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -151,6 +151,8 @@ public class TableMetrics
     public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Write");
     public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Range");
 
+    public final Meter shortReadProtectionRequests;
+
     public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;
     /**
      * stores metrics that will be rolled into a single global metric
@@ -645,6 +647,8 @@ public class TableMetrics
         casPrepare = new LatencyMetrics(factory, "CasPrepare", cfs.keyspace.metric.casPrepare);
         casPropose = new LatencyMetrics(factory, "CasPropose", cfs.keyspace.metric.casPropose);
         casCommit = new LatencyMetrics(factory, "CasCommit", cfs.keyspace.metric.casCommit);
+
+        shortReadProtectionRequests = Metrics.meter(factory.createMetricName("ShortReadProtectionRequests"));
     }
 
     public void updateSSTableIterated(int count)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 26b1b2a..72c4950 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -27,16 +27,13 @@ import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ClusteringIndexFilter;
 import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.transform.MoreRows;
-import org.apache.cassandra.db.transform.Transformation;
+import org.apache.cassandra.db.transform.*;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.net.*;
 import org.apache.cassandra.tracing.Tracing;
@@ -72,10 +69,29 @@ public class DataResolver extends ResponseResolver
             sources[i] = msg.from;
         }
 
-        // Even though every responses should honor the limit, we might have more than requested post reconciliation,
-        // so ensure we're respecting the limit.
+        /*
+         * Even though every response, individually, will honor the limit, it is possible that we will, after the merge,
+         * have more rows than the client requested. To make sure that we still conform to the original limit,
+         * we apply a top-level post-reconciliation counter to the merged partition iterator.
+         *
+         * Short read protection logic (ShortReadRowProtection.moreContents()) relies on this counter to be applied
+         * to the current partition to work. For this reason we have to apply the counter transformation before
+         * empty partition discard logic kicks in - for it will eagerly consume the iterator.
+         *
+         * That's why the order here is: 1) merge; 2) filter rows; 3) count; 4) discard empty partitions
+         *
+         * See CASSANDRA-13747 for more details.
+         */
+
         DataLimits.Counter counter = command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition());
-        return counter.applyTo(mergeWithShortReadProtection(iters, sources, counter));
+
+        UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters, sources, counter);
+        FilteredPartitions filtered = FilteredPartitions.filter(merged, new Filter(command.nowInSec()));
+        PartitionIterator counted = counter.applyTo(filtered);
+
+        return command.isForThrift()
+             ? counted
+             : Transformation.apply(counted, new EmptyPartitionsDiscarder());
     }
 
     public void compareResponses()
@@ -87,11 +103,13 @@ public class DataResolver extends ResponseResolver
         }
     }
 
-    private PartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results, InetAddress[] sources, DataLimits.Counter resultCounter)
+    private UnfilteredPartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results,
+                                                                     InetAddress[] sources,
+                                                                     DataLimits.Counter resultCounter)
     {
         // If we have only one results, there is no read repair to do and we can't get short reads
         if (results.size() == 1)
-            return UnfilteredPartitionIterators.filter(results.get(0), command.nowInSec());
+            return results.get(0);
 
         UnfilteredPartitionIterators.MergeListener listener = new RepairMergeListener(sources);
 
@@ -103,7 +121,7 @@ public class DataResolver extends ResponseResolver
                 results.set(i, Transformation.apply(results.get(i), new ShortReadProtection(sources[i], resultCounter)));
         }
 
-        return UnfilteredPartitionIterators.mergeAndFilter(results, command.nowInSec(), listener);
+        return UnfilteredPartitionIterators.merge(results, command.nowInSec(), listener);
     }
 
     private class RepairMergeListener implements UnfilteredPartitionIterators.MergeListener
@@ -510,7 +528,7 @@ public class DataResolver extends ResponseResolver
                 // counting iterator.
                 int n = postReconciliationCounter.countedInCurrentPartition();
                 int x = counter.countedInCurrentPartition();
-                int toQuery = Math.max(((n * n) / x) - n, 1);
+                int toQuery = Math.max(((n * n) / Math.max(x, 1)) - n, 1);
 
                 DataLimits retryLimits = command.limits().forShortReadRetry(toQuery);
                 ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey);
@@ -523,6 +541,9 @@ public class DataResolver extends ResponseResolver
                                                                                    partitionKey,
                                                                                    retryFilter);
 
+                Tracing.trace("Requesting {} extra rows from {} for short read protection", toQuery, source);
+                Schema.instance.getColumnFamilyStoreInstance(cmd.metadata().cfId).metric.shortReadProtectionRequests.mark();
+
                 return doShortReadRetry(cmd);
             }
 
@@ -531,7 +552,7 @@ public class DataResolver extends ResponseResolver
                 DataResolver resolver = new DataResolver(keyspace, retryCommand, ConsistencyLevel.ONE, 1);
                 ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source));
                 if (StorageProxy.canDoLocalRequest(source))
-                      StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler));
+                    StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler));
                 else
                     MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(MessagingService.current_version), source, handler);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[5/6] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by al...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/826ae9c9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/826ae9c9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/826ae9c9

Branch: refs/heads/cassandra-3.11
Commit: 826ae9c91e11ebb889b3f1788b9357c2c717f9a0
Parents: 809f3b3 a7cb009
Author: Aleksey Yeschenko <al...@yeschenko.com>
Authored: Tue Aug 29 12:30:40 2017 +0100
Committer: Aleksey Yeschenko <al...@yeschenko.com>
Committed: Tue Aug 29 12:31:27 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../UnfilteredPartitionIterators.java           |  7 ---
 .../db/transform/EmptyPartitionsDiscarder.java  | 35 ++++++++++++++
 .../apache/cassandra/db/transform/Filter.java   | 28 +++--------
 .../db/transform/FilteredPartitions.java        | 18 +++++--
 .../cassandra/db/transform/FilteredRows.java    |  2 +-
 .../apache/cassandra/metrics/TableMetrics.java  |  4 ++
 .../apache/cassandra/service/DataResolver.java  | 51 ++++++++++++++------
 .../apache/cassandra/db/ReadCommandTest.java    | 23 ++++-----
 9 files changed, 107 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/826ae9c9/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index b0dbd60,6609b05..c4aee3a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,11 -1,5 +1,12 @@@
 -3.0.15
 +3.11.1
 + * Fix cassandra-stress hang issues when an error during cluster connection happens (CASSANDRA-12938)
 + * Better bootstrap failure message when blocked by (potential) range movement (CASSANDRA-13744)
 + * "ignore" option is ignored in sstableloader (CASSANDRA-13721)
 + * Deadlock in AbstractCommitLogSegmentManager (CASSANDRA-13652)
 + * Duplicate the buffer before passing it to analyser in SASI operation (CASSANDRA-13512)
 + * Properly evict pstmts from prepared statements cache (CASSANDRA-13641)
 +Merged from 3.0:
+  * Fix AssertionError in short read protection (CASSANDRA-13747)
   * Don't skip corrupted sstables on startup (CASSANDRA-13620)
   * Fix the merging of cells with different user type versions (CASSANDRA-13776)
   * Copy session properties on cqlsh.py do_login (CASSANDRA-13640)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/826ae9c9/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index fc225e8,4e0ac1b..778c71d
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@@ -78,31 -77,6 +78,24 @@@ public abstract class UnfilteredPartiti
          return Transformation.apply(toReturn, new Close());
      }
  
 +    public static UnfilteredPartitionIterator concat(final List<UnfilteredPartitionIterator> iterators)
 +    {
 +        if (iterators.size() == 1)
 +            return iterators.get(0);
 +
 +        class Extend implements MorePartitions<UnfilteredPartitionIterator>
 +        {
 +            int i = 1;
 +            public UnfilteredPartitionIterator moreContents()
 +            {
 +                if (i >= iterators.size())
 +                    return null;
 +                return iterators.get(i++);
 +            }
 +        }
 +        return MorePartitions.extend(iterators.get(0), new Extend());
 +    }
 +
- 
-     public static PartitionIterator mergeAndFilter(List<UnfilteredPartitionIterator> iterators, int nowInSec, MergeListener listener)
-     {
-         // TODO: we could have a somewhat faster version if we were to merge the UnfilteredRowIterators directly as RowIterators
-         return filter(merge(iterators, nowInSec, listener), nowInSec);
-     }
- 
      public static PartitionIterator filter(final UnfilteredPartitionIterator iterator, final int nowInSec)
      {
          return FilteredPartitions.filter(iterator, nowInSec);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/826ae9c9/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/metrics/TableMetrics.java
index 7a84eca,fe88a63..b0f667c
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@@ -167,40 -151,8 +167,42 @@@ public class TableMetric
      public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Write");
      public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Range");
  
 +    public final static Gauge<Double> globalPercentRepaired = Metrics.register(globalFactory.createMetricName("PercentRepaired"),
 +            new Gauge<Double>()
 +    {
 +        public Double getValue()
 +        {
 +            double repaired = 0;
 +            double total = 0;
 +            for (String keyspace : Schema.instance.getNonSystemKeyspaces())
 +            {
 +                Keyspace k = Schema.instance.getKeyspaceInstance(keyspace);
 +                if (SchemaConstants.DISTRIBUTED_KEYSPACE_NAME.equals(k.getName()))
 +                    continue;
 +                if (k.getReplicationStrategy().getReplicationFactor() < 2)
 +                    continue;
 +
 +                for (ColumnFamilyStore cf : k.getColumnFamilyStores())
 +                {
 +                    if (!SecondaryIndexManager.isIndexColumnFamily(cf.name))
 +                    {
 +                        for (SSTableReader sstable : cf.getSSTables(SSTableSet.CANONICAL))
 +                        {
 +                            if (sstable.isRepaired())
 +                            {
 +                                repaired += sstable.uncompressedLength();
 +                            }
 +                            total += sstable.uncompressedLength();
 +                        }
 +                    }
 +                }
 +            }
 +            return total > 0 ? (repaired / total) * 100 : 100.0;
 +        }
 +    });
 +
+     public final Meter shortReadProtectionRequests;
+ 
      public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;
      /**
       * stores metrics that will be rolled into a single global metric

http://git-wip-us.apache.org/repos/asf/cassandra/blob/826ae9c9/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/DataResolver.java
index 116dadd,72c4950..32b6d79
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@@ -27,18 -27,13 +27,13 @@@ import com.google.common.collect.Iterab
  
  import org.apache.cassandra.concurrent.Stage;
  import org.apache.cassandra.concurrent.StageManager;
- import org.apache.cassandra.config.CFMetaData;
- import org.apache.cassandra.config.ColumnDefinition;
- import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.config.*;
  import org.apache.cassandra.db.*;
--import org.apache.cassandra.db.filter.ClusteringIndexFilter;
- import org.apache.cassandra.db.filter.ColumnFilter;
--import org.apache.cassandra.db.filter.DataLimits;
++import org.apache.cassandra.db.filter.*;
 +import org.apache.cassandra.db.filter.DataLimits.Counter;
  import org.apache.cassandra.db.partitions.*;
  import org.apache.cassandra.db.rows.*;
- import org.apache.cassandra.db.transform.MoreRows;
- import org.apache.cassandra.db.transform.Transformation;
+ import org.apache.cassandra.db.transform.*;
  import org.apache.cassandra.exceptions.ReadTimeoutException;
  import org.apache.cassandra.net.*;
  import org.apache.cassandra.tracing.Tracing;
@@@ -104,10 -118,10 +120,10 @@@ public class DataResolver extends Respo
          if (!command.limits().isUnlimited())
          {
              for (int i = 0; i < results.size(); i++)
 -                results.set(i, Transformation.apply(results.get(i), new ShortReadProtection(sources[i], resultCounter)));
 +                results.set(i, Transformation.apply(results.get(i), new ShortReadProtection(sources[i], resultCounter, queryStartNanoTime)));
          }
  
-         return UnfilteredPartitionIterators.mergeAndFilter(results, command.nowInSec(), listener);
+         return UnfilteredPartitionIterators.merge(results, command.nowInSec(), listener);
      }
  
      private class RepairMergeListener implements UnfilteredPartitionIterators.MergeListener
@@@ -526,9 -526,9 +542,9 @@@
                  // we should request m rows so that m * x/n = n-x, that is m = (n^2/x) - n.
                  // Also note that it's ok if we retrieve more results that necessary since our top level iterator is a
                  // counting iterator.
 -                int n = postReconciliationCounter.countedInCurrentPartition();
 -                int x = counter.countedInCurrentPartition();
 +                int n = countedInCurrentPartition(postReconciliationCounter);
 +                int x = countedInCurrentPartition(counter);
-                 int toQuery = Math.max(((n * n) / x) - n, 1);
+                 int toQuery = Math.max(((n * n) / Math.max(x, 1)) - n, 1);
  
                  DataLimits retryLimits = command.limits().forShortReadRetry(toQuery);
                  ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey);
@@@ -544,44 -547,12 +563,44 @@@
                  return doShortReadRetry(cmd);
              }
  
 +            /**
 +             * Returns the number of results counted by the counter.
 +             *
 +             * @param counter the counter.
 +             * @return the number of results counted by the counter
 +             */
 +            private int counted(Counter counter)
 +            {
 +                // We are interested by the number of rows but for GROUP BY queries 'counted' returns the number of
 +                // groups.
 +                if (command.limits().isGroupByLimit())
 +                    return counter.rowCounted();
 +
 +                return counter.counted();
 +            }
 +
 +            /**
 +             * Returns the number of results counted in the partition by the counter.
 +             *
 +             * @param counter the counter.
 +             * @return the number of results counted in the partition by the counter
 +             */
 +            private int countedInCurrentPartition(Counter counter)
 +            {
 +                // We are interested by the number of rows but for GROUP BY queries 'countedInCurrentPartition' returns
 +                // the number of groups in the current partition.
 +                if (command.limits().isGroupByLimit())
 +                    return counter.rowCountedInCurrentPartition();
 +
 +                return counter.countedInCurrentPartition();
 +            }
 +
              private UnfilteredRowIterator doShortReadRetry(SinglePartitionReadCommand retryCommand)
              {
 -                DataResolver resolver = new DataResolver(keyspace, retryCommand, ConsistencyLevel.ONE, 1);
 -                ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source));
 +                DataResolver resolver = new DataResolver(keyspace, retryCommand, ConsistencyLevel.ONE, 1, queryStartNanoTime);
 +                ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source), queryStartNanoTime);
                  if (StorageProxy.canDoLocalRequest(source))
-                       StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler));
+                     StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler));
                  else
                      MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(MessagingService.current_version), source, handler);
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/826ae9c9/test/unit/org/apache/cassandra/db/ReadCommandTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/ReadCommandTest.java
index 2aef2a7,0000000..9264297
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
@@@ -1,311 -1,0 +1,312 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.db;
 +
 +import java.nio.ByteBuffer;
 +import java.util.ArrayList;
 +import java.util.List;
 +
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +
 +import org.apache.cassandra.SchemaLoader;
 +import org.apache.cassandra.Util;
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
 +import org.apache.cassandra.db.filter.ColumnFilter;
 +import org.apache.cassandra.db.filter.DataLimits;
 +import org.apache.cassandra.db.filter.RowFilter;
 +import org.apache.cassandra.db.marshal.AsciiType;
 +import org.apache.cassandra.db.marshal.BytesType;
 +import org.apache.cassandra.db.partitions.FilteredPartition;
 +import org.apache.cassandra.db.partitions.PartitionIterator;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 +import org.apache.cassandra.db.rows.Row;
 +import org.apache.cassandra.db.rows.RowIterator;
 +import org.apache.cassandra.db.rows.SerializationHelper;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 +import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.io.util.DataInputBuffer;
 +import org.apache.cassandra.io.util.DataOutputBuffer;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.schema.KeyspaceParams;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.FBUtilities;
 +
 +import static org.junit.Assert.assertEquals;
 +
 +public class ReadCommandTest
 +{
 +    private static final String KEYSPACE = "ReadCommandTest";
 +    private static final String CF1 = "Standard1";
 +    private static final String CF2 = "Standard2";
 +    private static final String CF3 = "Standard3";
 +
 +    @BeforeClass
 +    public static void defineSchema() throws ConfigurationException
 +    {
 +        DatabaseDescriptor.daemonInitialization();
 +
 +        CFMetaData metadata1 = SchemaLoader.standardCFMD(KEYSPACE, CF1);
 +
 +        CFMetaData metadata2 = CFMetaData.Builder.create(KEYSPACE, CF2)
 +                                                         .addPartitionKey("key", BytesType.instance)
 +                                                         .addClusteringColumn("col", AsciiType.instance)
 +                                                         .addRegularColumn("a", AsciiType.instance)
 +                                                         .addRegularColumn("b", AsciiType.instance).build();
 +
 +        CFMetaData metadata3 = CFMetaData.Builder.create(KEYSPACE, CF3)
 +                                                 .addPartitionKey("key", BytesType.instance)
 +                                                 .addClusteringColumn("col", AsciiType.instance)
 +                                                 .addRegularColumn("a", AsciiType.instance)
 +                                                 .addRegularColumn("b", AsciiType.instance)
 +                                                 .addRegularColumn("c", AsciiType.instance)
 +                                                 .addRegularColumn("d", AsciiType.instance)
 +                                                 .addRegularColumn("e", AsciiType.instance)
 +                                                 .addRegularColumn("f", AsciiType.instance).build();
 +
 +        SchemaLoader.prepareServer();
 +        SchemaLoader.createKeyspace(KEYSPACE,
 +                                    KeyspaceParams.simple(1),
 +                                    metadata1,
 +                                    metadata2,
 +                                    metadata3);
 +    }
 +
 +    @Test
 +    public void testPartitionRangeAbort() throws Exception
 +    {
 +        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF1);
 +
 +        new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key1"))
 +                .clustering("Column1")
 +                .add("val", ByteBufferUtil.bytes("abcd"))
 +                .build()
 +                .apply();
 +
 +        cfs.forceBlockingFlush();
 +
 +        new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key2"))
 +                .clustering("Column1")
 +                .add("val", ByteBufferUtil.bytes("abcd"))
 +                .build()
 +                .apply();
 +
 +        ReadCommand readCommand = Util.cmd(cfs).build();
 +        assertEquals(2, Util.getAll(readCommand).size());
 +
 +        readCommand.abort();
 +        assertEquals(0, Util.getAll(readCommand).size());
 +    }
 +
 +    @Test
 +    public void testSinglePartitionSliceAbort() throws Exception
 +    {
 +        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2);
 +
 +        cfs.truncateBlocking();
 +
 +        new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key"))
 +                .clustering("cc")
 +                .add("a", ByteBufferUtil.bytes("abcd"))
 +                .build()
 +                .apply();
 +
 +        cfs.forceBlockingFlush();
 +
 +        new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key"))
 +                .clustering("dd")
 +                .add("a", ByteBufferUtil.bytes("abcd"))
 +                .build()
 +                .apply();
 +
 +        ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).build();
 +
 +        List<FilteredPartition> partitions = Util.getAll(readCommand);
 +        assertEquals(1, partitions.size());
 +        assertEquals(2, partitions.get(0).rowCount());
 +
 +        readCommand.abort();
 +        assertEquals(0, Util.getAll(readCommand).size());
 +    }
 +
 +    @Test
 +    public void testSinglePartitionNamesAbort() throws Exception
 +    {
 +        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2);
 +
 +        cfs.truncateBlocking();
 +
 +        new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key"))
 +                .clustering("cc")
 +                .add("a", ByteBufferUtil.bytes("abcd"))
 +                .build()
 +                .apply();
 +
 +        cfs.forceBlockingFlush();
 +
 +        new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key"))
 +                .clustering("dd")
 +                .add("a", ByteBufferUtil.bytes("abcd"))
 +                .build()
 +                .apply();
 +
 +        ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).includeRow("cc").includeRow("dd").build();
 +
 +        List<FilteredPartition> partitions = Util.getAll(readCommand);
 +        assertEquals(1, partitions.size());
 +        assertEquals(2, partitions.get(0).rowCount());
 +
 +        readCommand.abort();
 +        assertEquals(0, Util.getAll(readCommand).size());
 +    }
 +
 +    @Test
 +    public void testSinglePartitionGroupMerge() throws Exception
 +    {
 +        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF3);
 +
 +        String[][][] groups = new String[][][] {
 +            new String[][] {
 +                new String[] { "1", "key1", "aa", "a" }, // "1" indicates to create the data, "-1" to delete the row
 +                new String[] { "1", "key2", "bb", "b" },
 +                new String[] { "1", "key3", "cc", "c" }
 +            },
 +            new String[][] {
 +                new String[] { "1", "key3", "dd", "d" },
 +                new String[] { "1", "key2", "ee", "e" },
 +                new String[] { "1", "key1", "ff", "f" }
 +            },
 +            new String[][] {
 +                new String[] { "1", "key6", "aa", "a" },
 +                new String[] { "1", "key5", "bb", "b" },
 +                new String[] { "1", "key4", "cc", "c" }
 +            },
 +            new String[][] {
 +                new String[] { "-1", "key6", "aa", "a" },
 +                new String[] { "-1", "key2", "bb", "b" }
 +            }
 +        };
 +
 +        // Given the data above, when the keys are sorted and the deletions removed, we should
 +        // get these clustering rows in this order
 +        String[] expectedRows = new String[] { "aa", "ff", "ee", "cc", "dd", "cc", "bb"};
 +
 +        List<ByteBuffer> buffers = new ArrayList<>(groups.length);
 +        int nowInSeconds = FBUtilities.nowInSeconds();
 +        ColumnFilter columnFilter = ColumnFilter.allColumnsBuilder(cfs.metadata).build();
 +        RowFilter rowFilter = RowFilter.create();
 +        Slice slice = Slice.make(ClusteringBound.BOTTOM, ClusteringBound.TOP);
 +        ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(Slices.with(cfs.metadata.comparator, slice), false);
 +
 +        for (String[][] group : groups)
 +        {
 +            cfs.truncateBlocking();
 +
 +            List<SinglePartitionReadCommand> commands = new ArrayList<>(group.length);
 +
 +            for (String[] data : group)
 +            {
 +                if (data[0].equals("1"))
 +                {
 +                    new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes(data[1]))
 +                    .clustering(data[2])
 +                    .add(data[3], ByteBufferUtil.bytes("blah"))
 +                    .build()
 +                    .apply();
 +                }
 +                else
 +                {
 +                    RowUpdateBuilder.deleteRow(cfs.metadata, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(data[1]), data[2]).apply();
 +                }
 +                commands.add(SinglePartitionReadCommand.create(cfs.metadata, nowInSeconds, columnFilter, rowFilter, DataLimits.NONE, Util.dk(data[1]), sliceFilter));
 +            }
 +
 +            cfs.forceBlockingFlush();
 +
 +            ReadQuery query = new SinglePartitionReadCommand.Group(commands, DataLimits.NONE);
 +
 +            try (ReadExecutionController executionController = query.executionController();
 +                 UnfilteredPartitionIterator iter = query.executeLocally(executionController);
 +                 DataOutputBuffer buffer = new DataOutputBuffer())
 +            {
 +                UnfilteredPartitionIterators.serializerForIntraNode().serialize(iter,
 +                                                                                columnFilter,
 +                                                                                buffer,
 +                                                                                MessagingService.current_version);
 +                buffers.add(buffer.buffer());
 +            }
 +        }
 +
 +        // deserialize, merge and check the results are all there
 +        List<UnfilteredPartitionIterator> iterators = new ArrayList<>();
 +
 +        for (ByteBuffer buffer : buffers)
 +        {
 +            try (DataInputBuffer in = new DataInputBuffer(buffer, true))
 +            {
 +                iterators.add(UnfilteredPartitionIterators.serializerForIntraNode().deserialize(in,
 +                                                                                                MessagingService.current_version,
 +                                                                                                cfs.metadata,
 +                                                                                                columnFilter,
 +                                                                                                SerializationHelper.Flag.LOCAL));
 +            }
 +        }
 +
-         try(PartitionIterator partitionIterator = UnfilteredPartitionIterators.mergeAndFilter(iterators,
-                                                                                           nowInSeconds,
-                                                                                           new UnfilteredPartitionIterators.MergeListener()
-         {
-             public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)
++        UnfilteredPartitionIterators.MergeListener listener =
++            new UnfilteredPartitionIterators.MergeListener()
 +            {
-                 return null;
-             }
++                public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)
++                {
++                    return null;
++                }
 +
-             public void close()
-             {
++                public void close()
++                {
 +
-             }
-         }))
++                }
++            };
++
++        try (PartitionIterator partitionIterator = UnfilteredPartitionIterators.filter(UnfilteredPartitionIterators.merge(iterators, nowInSeconds, listener), nowInSeconds))
 +        {
 +
 +            int i = 0;
 +            int numPartitions = 0;
 +            while (partitionIterator.hasNext())
 +            {
 +                numPartitions++;
 +                try(RowIterator rowIterator = partitionIterator.next())
 +                {
 +                    while (rowIterator.hasNext())
 +                    {
 +                        Row row = rowIterator.next();
 +                        assertEquals("col=" + expectedRows[i++], row.clustering().toString(cfs.metadata));
 +                        //System.out.print(row.toString(cfs.metadata, true));
 +                    }
 +                }
 +            }
 +
 +            assertEquals(5, numPartitions);
 +            assertEquals(expectedRows.length, i);
 +        }
 +    }
 +}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[4/6] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by al...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/826ae9c9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/826ae9c9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/826ae9c9

Branch: refs/heads/trunk
Commit: 826ae9c91e11ebb889b3f1788b9357c2c717f9a0
Parents: 809f3b3 a7cb009
Author: Aleksey Yeschenko <al...@yeschenko.com>
Authored: Tue Aug 29 12:30:40 2017 +0100
Committer: Aleksey Yeschenko <al...@yeschenko.com>
Committed: Tue Aug 29 12:31:27 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../UnfilteredPartitionIterators.java           |  7 ---
 .../db/transform/EmptyPartitionsDiscarder.java  | 35 ++++++++++++++
 .../apache/cassandra/db/transform/Filter.java   | 28 +++--------
 .../db/transform/FilteredPartitions.java        | 18 +++++--
 .../cassandra/db/transform/FilteredRows.java    |  2 +-
 .../apache/cassandra/metrics/TableMetrics.java  |  4 ++
 .../apache/cassandra/service/DataResolver.java  | 51 ++++++++++++++------
 .../apache/cassandra/db/ReadCommandTest.java    | 23 ++++-----
 9 files changed, 107 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/826ae9c9/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index b0dbd60,6609b05..c4aee3a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,11 -1,5 +1,12 @@@
 -3.0.15
 +3.11.1
 + * Fix cassandra-stress hang issues when an error during cluster connection happens (CASSANDRA-12938)
 + * Better bootstrap failure message when blocked by (potential) range movement (CASSANDRA-13744)
 + * "ignore" option is ignored in sstableloader (CASSANDRA-13721)
 + * Deadlock in AbstractCommitLogSegmentManager (CASSANDRA-13652)
 + * Duplicate the buffer before passing it to analyser in SASI operation (CASSANDRA-13512)
 + * Properly evict pstmts from prepared statements cache (CASSANDRA-13641)
 +Merged from 3.0:
+  * Fix AssertionError in short read protection (CASSANDRA-13747)
   * Don't skip corrupted sstables on startup (CASSANDRA-13620)
   * Fix the merging of cells with different user type versions (CASSANDRA-13776)
   * Copy session properties on cqlsh.py do_login (CASSANDRA-13640)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/826ae9c9/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index fc225e8,4e0ac1b..778c71d
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@@ -78,31 -77,6 +78,24 @@@ public abstract class UnfilteredPartiti
          return Transformation.apply(toReturn, new Close());
      }
  
 +    public static UnfilteredPartitionIterator concat(final List<UnfilteredPartitionIterator> iterators)
 +    {
 +        if (iterators.size() == 1)
 +            return iterators.get(0);
 +
 +        class Extend implements MorePartitions<UnfilteredPartitionIterator>
 +        {
 +            int i = 1;
 +            public UnfilteredPartitionIterator moreContents()
 +            {
 +                if (i >= iterators.size())
 +                    return null;
 +                return iterators.get(i++);
 +            }
 +        }
 +        return MorePartitions.extend(iterators.get(0), new Extend());
 +    }
 +
- 
-     public static PartitionIterator mergeAndFilter(List<UnfilteredPartitionIterator> iterators, int nowInSec, MergeListener listener)
-     {
-         // TODO: we could have a somewhat faster version if we were to merge the UnfilteredRowIterators directly as RowIterators
-         return filter(merge(iterators, nowInSec, listener), nowInSec);
-     }
- 
      public static PartitionIterator filter(final UnfilteredPartitionIterator iterator, final int nowInSec)
      {
          return FilteredPartitions.filter(iterator, nowInSec);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/826ae9c9/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/metrics/TableMetrics.java
index 7a84eca,fe88a63..b0f667c
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@@ -167,40 -151,8 +167,42 @@@ public class TableMetric
      public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Write");
      public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Range");
  
 +    public final static Gauge<Double> globalPercentRepaired = Metrics.register(globalFactory.createMetricName("PercentRepaired"),
 +            new Gauge<Double>()
 +    {
 +        public Double getValue()
 +        {
 +            double repaired = 0;
 +            double total = 0;
 +            for (String keyspace : Schema.instance.getNonSystemKeyspaces())
 +            {
 +                Keyspace k = Schema.instance.getKeyspaceInstance(keyspace);
 +                if (SchemaConstants.DISTRIBUTED_KEYSPACE_NAME.equals(k.getName()))
 +                    continue;
 +                if (k.getReplicationStrategy().getReplicationFactor() < 2)
 +                    continue;
 +
 +                for (ColumnFamilyStore cf : k.getColumnFamilyStores())
 +                {
 +                    if (!SecondaryIndexManager.isIndexColumnFamily(cf.name))
 +                    {
 +                        for (SSTableReader sstable : cf.getSSTables(SSTableSet.CANONICAL))
 +                        {
 +                            if (sstable.isRepaired())
 +                            {
 +                                repaired += sstable.uncompressedLength();
 +                            }
 +                            total += sstable.uncompressedLength();
 +                        }
 +                    }
 +                }
 +            }
 +            return total > 0 ? (repaired / total) * 100 : 100.0;
 +        }
 +    });
 +
+     public final Meter shortReadProtectionRequests;
+ 
      public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;
      /**
       * stores metrics that will be rolled into a single global metric

http://git-wip-us.apache.org/repos/asf/cassandra/blob/826ae9c9/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/DataResolver.java
index 116dadd,72c4950..32b6d79
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@@ -27,18 -27,13 +27,13 @@@ import com.google.common.collect.Iterab
  
  import org.apache.cassandra.concurrent.Stage;
  import org.apache.cassandra.concurrent.StageManager;
- import org.apache.cassandra.config.CFMetaData;
- import org.apache.cassandra.config.ColumnDefinition;
- import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.config.*;
  import org.apache.cassandra.db.*;
--import org.apache.cassandra.db.filter.ClusteringIndexFilter;
- import org.apache.cassandra.db.filter.ColumnFilter;
--import org.apache.cassandra.db.filter.DataLimits;
++import org.apache.cassandra.db.filter.*;
 +import org.apache.cassandra.db.filter.DataLimits.Counter;
  import org.apache.cassandra.db.partitions.*;
  import org.apache.cassandra.db.rows.*;
- import org.apache.cassandra.db.transform.MoreRows;
- import org.apache.cassandra.db.transform.Transformation;
+ import org.apache.cassandra.db.transform.*;
  import org.apache.cassandra.exceptions.ReadTimeoutException;
  import org.apache.cassandra.net.*;
  import org.apache.cassandra.tracing.Tracing;
@@@ -104,10 -118,10 +120,10 @@@ public class DataResolver extends Respo
          if (!command.limits().isUnlimited())
          {
              for (int i = 0; i < results.size(); i++)
 -                results.set(i, Transformation.apply(results.get(i), new ShortReadProtection(sources[i], resultCounter)));
 +                results.set(i, Transformation.apply(results.get(i), new ShortReadProtection(sources[i], resultCounter, queryStartNanoTime)));
          }
  
-         return UnfilteredPartitionIterators.mergeAndFilter(results, command.nowInSec(), listener);
+         return UnfilteredPartitionIterators.merge(results, command.nowInSec(), listener);
      }
  
      private class RepairMergeListener implements UnfilteredPartitionIterators.MergeListener
@@@ -526,9 -526,9 +542,9 @@@
                  // we should request m rows so that m * x/n = n-x, that is m = (n^2/x) - n.
                  // Also note that it's ok if we retrieve more results that necessary since our top level iterator is a
                  // counting iterator.
 -                int n = postReconciliationCounter.countedInCurrentPartition();
 -                int x = counter.countedInCurrentPartition();
 +                int n = countedInCurrentPartition(postReconciliationCounter);
 +                int x = countedInCurrentPartition(counter);
-                 int toQuery = Math.max(((n * n) / x) - n, 1);
+                 int toQuery = Math.max(((n * n) / Math.max(x, 1)) - n, 1);
  
                  DataLimits retryLimits = command.limits().forShortReadRetry(toQuery);
                  ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey);
@@@ -544,44 -547,12 +563,44 @@@
                  return doShortReadRetry(cmd);
              }
  
 +            /**
 +             * Returns the number of results counted by the counter.
 +             *
 +             * @param counter the counter.
 +             * @return the number of results counted by the counter
 +             */
 +            private int counted(Counter counter)
 +            {
 +                // We are interested by the number of rows but for GROUP BY queries 'counted' returns the number of
 +                // groups.
 +                if (command.limits().isGroupByLimit())
 +                    return counter.rowCounted();
 +
 +                return counter.counted();
 +            }
 +
 +            /**
 +             * Returns the number of results counted in the partition by the counter.
 +             *
 +             * @param counter the counter.
 +             * @return the number of results counted in the partition by the counter
 +             */
 +            private int countedInCurrentPartition(Counter counter)
 +            {
 +                // We are interested by the number of rows but for GROUP BY queries 'countedInCurrentPartition' returns
 +                // the number of groups in the current partition.
 +                if (command.limits().isGroupByLimit())
 +                    return counter.rowCountedInCurrentPartition();
 +
 +                return counter.countedInCurrentPartition();
 +            }
 +
              private UnfilteredRowIterator doShortReadRetry(SinglePartitionReadCommand retryCommand)
              {
 -                DataResolver resolver = new DataResolver(keyspace, retryCommand, ConsistencyLevel.ONE, 1);
 -                ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source));
 +                DataResolver resolver = new DataResolver(keyspace, retryCommand, ConsistencyLevel.ONE, 1, queryStartNanoTime);
 +                ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source), queryStartNanoTime);
                  if (StorageProxy.canDoLocalRequest(source))
-                       StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler));
+                     StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler));
                  else
                      MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(MessagingService.current_version), source, handler);
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/826ae9c9/test/unit/org/apache/cassandra/db/ReadCommandTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/ReadCommandTest.java
index 2aef2a7,0000000..9264297
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
@@@ -1,311 -1,0 +1,312 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.db;
 +
 +import java.nio.ByteBuffer;
 +import java.util.ArrayList;
 +import java.util.List;
 +
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +
 +import org.apache.cassandra.SchemaLoader;
 +import org.apache.cassandra.Util;
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
 +import org.apache.cassandra.db.filter.ColumnFilter;
 +import org.apache.cassandra.db.filter.DataLimits;
 +import org.apache.cassandra.db.filter.RowFilter;
 +import org.apache.cassandra.db.marshal.AsciiType;
 +import org.apache.cassandra.db.marshal.BytesType;
 +import org.apache.cassandra.db.partitions.FilteredPartition;
 +import org.apache.cassandra.db.partitions.PartitionIterator;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 +import org.apache.cassandra.db.rows.Row;
 +import org.apache.cassandra.db.rows.RowIterator;
 +import org.apache.cassandra.db.rows.SerializationHelper;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 +import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.io.util.DataInputBuffer;
 +import org.apache.cassandra.io.util.DataOutputBuffer;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.schema.KeyspaceParams;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.FBUtilities;
 +
 +import static org.junit.Assert.assertEquals;
 +
 +public class ReadCommandTest
 +{
 +    private static final String KEYSPACE = "ReadCommandTest";
 +    private static final String CF1 = "Standard1";
 +    private static final String CF2 = "Standard2";
 +    private static final String CF3 = "Standard3";
 +
 +    @BeforeClass
 +    public static void defineSchema() throws ConfigurationException
 +    {
 +        DatabaseDescriptor.daemonInitialization();
 +
 +        CFMetaData metadata1 = SchemaLoader.standardCFMD(KEYSPACE, CF1);
 +
 +        CFMetaData metadata2 = CFMetaData.Builder.create(KEYSPACE, CF2)
 +                                                         .addPartitionKey("key", BytesType.instance)
 +                                                         .addClusteringColumn("col", AsciiType.instance)
 +                                                         .addRegularColumn("a", AsciiType.instance)
 +                                                         .addRegularColumn("b", AsciiType.instance).build();
 +
 +        CFMetaData metadata3 = CFMetaData.Builder.create(KEYSPACE, CF3)
 +                                                 .addPartitionKey("key", BytesType.instance)
 +                                                 .addClusteringColumn("col", AsciiType.instance)
 +                                                 .addRegularColumn("a", AsciiType.instance)
 +                                                 .addRegularColumn("b", AsciiType.instance)
 +                                                 .addRegularColumn("c", AsciiType.instance)
 +                                                 .addRegularColumn("d", AsciiType.instance)
 +                                                 .addRegularColumn("e", AsciiType.instance)
 +                                                 .addRegularColumn("f", AsciiType.instance).build();
 +
 +        SchemaLoader.prepareServer();
 +        SchemaLoader.createKeyspace(KEYSPACE,
 +                                    KeyspaceParams.simple(1),
 +                                    metadata1,
 +                                    metadata2,
 +                                    metadata3);
 +    }
 +
 +    @Test
 +    public void testPartitionRangeAbort() throws Exception
 +    {
 +        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF1);
 +
 +        new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key1"))
 +                .clustering("Column1")
 +                .add("val", ByteBufferUtil.bytes("abcd"))
 +                .build()
 +                .apply();
 +
 +        cfs.forceBlockingFlush();
 +
 +        new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key2"))
 +                .clustering("Column1")
 +                .add("val", ByteBufferUtil.bytes("abcd"))
 +                .build()
 +                .apply();
 +
 +        ReadCommand readCommand = Util.cmd(cfs).build();
 +        assertEquals(2, Util.getAll(readCommand).size());
 +
 +        readCommand.abort();
 +        assertEquals(0, Util.getAll(readCommand).size());
 +    }
 +
 +    @Test
 +    public void testSinglePartitionSliceAbort() throws Exception
 +    {
 +        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2);
 +
 +        cfs.truncateBlocking();
 +
 +        new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key"))
 +                .clustering("cc")
 +                .add("a", ByteBufferUtil.bytes("abcd"))
 +                .build()
 +                .apply();
 +
 +        cfs.forceBlockingFlush();
 +
 +        new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key"))
 +                .clustering("dd")
 +                .add("a", ByteBufferUtil.bytes("abcd"))
 +                .build()
 +                .apply();
 +
 +        ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).build();
 +
 +        List<FilteredPartition> partitions = Util.getAll(readCommand);
 +        assertEquals(1, partitions.size());
 +        assertEquals(2, partitions.get(0).rowCount());
 +
 +        readCommand.abort();
 +        assertEquals(0, Util.getAll(readCommand).size());
 +    }
 +
 +    @Test
 +    public void testSinglePartitionNamesAbort() throws Exception
 +    {
 +        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2);
 +
 +        cfs.truncateBlocking();
 +
 +        new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key"))
 +                .clustering("cc")
 +                .add("a", ByteBufferUtil.bytes("abcd"))
 +                .build()
 +                .apply();
 +
 +        cfs.forceBlockingFlush();
 +
 +        new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes("key"))
 +                .clustering("dd")
 +                .add("a", ByteBufferUtil.bytes("abcd"))
 +                .build()
 +                .apply();
 +
 +        ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).includeRow("cc").includeRow("dd").build();
 +
 +        List<FilteredPartition> partitions = Util.getAll(readCommand);
 +        assertEquals(1, partitions.size());
 +        assertEquals(2, partitions.get(0).rowCount());
 +
 +        readCommand.abort();
 +        assertEquals(0, Util.getAll(readCommand).size());
 +    }
 +
 +    @Test
 +    public void testSinglePartitionGroupMerge() throws Exception
 +    {
 +        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF3);
 +
 +        String[][][] groups = new String[][][] {
 +            new String[][] {
 +                new String[] { "1", "key1", "aa", "a" }, // "1" indicates to create the data, "-1" to delete the row
 +                new String[] { "1", "key2", "bb", "b" },
 +                new String[] { "1", "key3", "cc", "c" }
 +            },
 +            new String[][] {
 +                new String[] { "1", "key3", "dd", "d" },
 +                new String[] { "1", "key2", "ee", "e" },
 +                new String[] { "1", "key1", "ff", "f" }
 +            },
 +            new String[][] {
 +                new String[] { "1", "key6", "aa", "a" },
 +                new String[] { "1", "key5", "bb", "b" },
 +                new String[] { "1", "key4", "cc", "c" }
 +            },
 +            new String[][] {
 +                new String[] { "-1", "key6", "aa", "a" },
 +                new String[] { "-1", "key2", "bb", "b" }
 +            }
 +        };
 +
 +        // Given the data above, when the keys are sorted and the deletions removed, we should
 +        // get these clustering rows in this order
 +        String[] expectedRows = new String[] { "aa", "ff", "ee", "cc", "dd", "cc", "bb"};
 +
 +        List<ByteBuffer> buffers = new ArrayList<>(groups.length);
 +        int nowInSeconds = FBUtilities.nowInSeconds();
 +        ColumnFilter columnFilter = ColumnFilter.allColumnsBuilder(cfs.metadata).build();
 +        RowFilter rowFilter = RowFilter.create();
 +        Slice slice = Slice.make(ClusteringBound.BOTTOM, ClusteringBound.TOP);
 +        ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(Slices.with(cfs.metadata.comparator, slice), false);
 +
 +        for (String[][] group : groups)
 +        {
 +            cfs.truncateBlocking();
 +
 +            List<SinglePartitionReadCommand> commands = new ArrayList<>(group.length);
 +
 +            for (String[] data : group)
 +            {
 +                if (data[0].equals("1"))
 +                {
 +                    new RowUpdateBuilder(cfs.metadata, 0, ByteBufferUtil.bytes(data[1]))
 +                    .clustering(data[2])
 +                    .add(data[3], ByteBufferUtil.bytes("blah"))
 +                    .build()
 +                    .apply();
 +                }
 +                else
 +                {
 +                    RowUpdateBuilder.deleteRow(cfs.metadata, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(data[1]), data[2]).apply();
 +                }
 +                commands.add(SinglePartitionReadCommand.create(cfs.metadata, nowInSeconds, columnFilter, rowFilter, DataLimits.NONE, Util.dk(data[1]), sliceFilter));
 +            }
 +
 +            cfs.forceBlockingFlush();
 +
 +            ReadQuery query = new SinglePartitionReadCommand.Group(commands, DataLimits.NONE);
 +
 +            try (ReadExecutionController executionController = query.executionController();
 +                 UnfilteredPartitionIterator iter = query.executeLocally(executionController);
 +                 DataOutputBuffer buffer = new DataOutputBuffer())
 +            {
 +                UnfilteredPartitionIterators.serializerForIntraNode().serialize(iter,
 +                                                                                columnFilter,
 +                                                                                buffer,
 +                                                                                MessagingService.current_version);
 +                buffers.add(buffer.buffer());
 +            }
 +        }
 +
 +        // deserialize, merge and check the results are all there
 +        List<UnfilteredPartitionIterator> iterators = new ArrayList<>();
 +
 +        for (ByteBuffer buffer : buffers)
 +        {
 +            try (DataInputBuffer in = new DataInputBuffer(buffer, true))
 +            {
 +                iterators.add(UnfilteredPartitionIterators.serializerForIntraNode().deserialize(in,
 +                                                                                                MessagingService.current_version,
 +                                                                                                cfs.metadata,
 +                                                                                                columnFilter,
 +                                                                                                SerializationHelper.Flag.LOCAL));
 +            }
 +        }
 +
-         try(PartitionIterator partitionIterator = UnfilteredPartitionIterators.mergeAndFilter(iterators,
-                                                                                           nowInSeconds,
-                                                                                           new UnfilteredPartitionIterators.MergeListener()
-         {
-             public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)
++        UnfilteredPartitionIterators.MergeListener listener =
++            new UnfilteredPartitionIterators.MergeListener()
 +            {
-                 return null;
-             }
++                public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)
++                {
++                    return null;
++                }
 +
-             public void close()
-             {
++                public void close()
++                {
 +
-             }
-         }))
++                }
++            };
++
++        try (PartitionIterator partitionIterator = UnfilteredPartitionIterators.filter(UnfilteredPartitionIterators.merge(iterators, nowInSeconds, listener), nowInSeconds))
 +        {
 +
 +            int i = 0;
 +            int numPartitions = 0;
 +            while (partitionIterator.hasNext())
 +            {
 +                numPartitions++;
 +                try(RowIterator rowIterator = partitionIterator.next())
 +                {
 +                    while (rowIterator.hasNext())
 +                    {
 +                        Row row = rowIterator.next();
 +                        assertEquals("col=" + expectedRows[i++], row.clustering().toString(cfs.metadata));
 +                        //System.out.print(row.toString(cfs.metadata, true));
 +                    }
 +                }
 +            }
 +
 +            assertEquals(5, numPartitions);
 +            assertEquals(expectedRows.length, i);
 +        }
 +    }
 +}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[2/6] cassandra git commit: Fix AssertionError in short read protection

Posted by al...@apache.org.
Fix AssertionError in short read protection

patch by Aleksey Yeschenko; reviewed by Benedict Elliott Smith for
CASSANDRA-13747


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a7cb009f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a7cb009f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a7cb009f

Branch: refs/heads/cassandra-3.11
Commit: a7cb009f8a3f4d0e0293111bfcfff3d404a37a89
Parents: dfbe3fa
Author: Aleksey Yeschenko <al...@yeschenko.com>
Authored: Sun Aug 6 19:42:47 2017 +0100
Committer: Aleksey Yeschenko <al...@yeschenko.com>
Committed: Tue Aug 29 12:22:39 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../UnfilteredPartitionIterators.java           |  6 ---
 .../db/transform/EmptyPartitionsDiscarder.java  | 35 +++++++++++++++
 .../apache/cassandra/db/transform/Filter.java   | 28 +++---------
 .../db/transform/FilteredPartitions.java        | 18 +++++---
 .../cassandra/db/transform/FilteredRows.java    |  2 +-
 .../apache/cassandra/metrics/TableMetrics.java  |  4 ++
 .../apache/cassandra/service/DataResolver.java  | 47 ++++++++++++++------
 8 files changed, 94 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5ccd5cd..6609b05 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.15
+ * Fix AssertionError in short read protection (CASSANDRA-13747)
  * Don't skip corrupted sstables on startup (CASSANDRA-13620)
  * Fix the merging of cells with different user type versions (CASSANDRA-13776)
  * Copy session properties on cqlsh.py do_login (CASSANDRA-13640)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index 1abbb19..4e0ac1b 100644
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -77,12 +77,6 @@ public abstract class UnfilteredPartitionIterators
         return Transformation.apply(toReturn, new Close());
     }
 
-    public static PartitionIterator mergeAndFilter(List<UnfilteredPartitionIterator> iterators, int nowInSec, MergeListener listener)
-    {
-        // TODO: we could have a somewhat faster version if we were to merge the UnfilteredRowIterators directly as RowIterators
-        return filter(merge(iterators, nowInSec, listener), nowInSec);
-    }
-
     public static PartitionIterator filter(final UnfilteredPartitionIterator iterator, final int nowInSec)
     {
         return FilteredPartitions.filter(iterator, nowInSec);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/transform/EmptyPartitionsDiscarder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/EmptyPartitionsDiscarder.java b/src/java/org/apache/cassandra/db/transform/EmptyPartitionsDiscarder.java
new file mode 100644
index 0000000..5e41cec
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/transform/EmptyPartitionsDiscarder.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.transform;
+
+import org.apache.cassandra.db.rows.BaseRowIterator;
+
+public final class EmptyPartitionsDiscarder extends Transformation<BaseRowIterator<?>>
+{
+    @Override
+    protected BaseRowIterator applyToPartition(BaseRowIterator iterator)
+    {
+        if (iterator.isEmpty())
+        {
+            iterator.close();
+            return null;
+        }
+
+        return iterator;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/transform/Filter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/Filter.java b/src/java/org/apache/cassandra/db/transform/Filter.java
index 48c8b1a..747983f 100644
--- a/src/java/org/apache/cassandra/db/transform/Filter.java
+++ b/src/java/org/apache/cassandra/db/transform/Filter.java
@@ -23,27 +23,21 @@ package org.apache.cassandra.db.transform;
 import org.apache.cassandra.db.DeletionPurger;
 import org.apache.cassandra.db.rows.*;
 
-final class Filter extends Transformation
+public final class Filter extends Transformation
 {
-    private final boolean filterEmpty; // generally maps to !isForThrift, but also false for direct row filtration
     private final int nowInSec;
-    public Filter(boolean filterEmpty, int nowInSec)
+
+    public Filter(int nowInSec)
     {
-        this.filterEmpty = filterEmpty;
         this.nowInSec = nowInSec;
     }
 
     @Override
     protected RowIterator applyToPartition(BaseRowIterator iterator)
     {
-        RowIterator filtered = iterator instanceof UnfilteredRows
-                               ? new FilteredRows(this, (UnfilteredRows) iterator)
-                               : new FilteredRows((UnfilteredRowIterator) iterator, this);
-
-        if (filterEmpty && closeIfEmpty(filtered))
-            return null;
-
-        return filtered;
+        return iterator instanceof UnfilteredRows
+             ? new FilteredRows(this, (UnfilteredRows) iterator)
+             : new FilteredRows((UnfilteredRowIterator) iterator, this);
     }
 
     @Override
@@ -67,14 +61,4 @@ final class Filter extends Transformation
     {
         return null;
     }
-
-    private static boolean closeIfEmpty(BaseRowIterator<?> iter)
-    {
-        if (iter.isEmpty())
-        {
-            iter.close();
-            return true;
-        }
-        return false;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java b/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
index 09e36b4..ad9446d 100644
--- a/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
+++ b/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
@@ -50,11 +50,19 @@ public final class FilteredPartitions extends BasePartitions<RowIterator, BasePa
     /**
      * Filter any RangeTombstoneMarker from the iterator's iterators, transforming it into a PartitionIterator.
      */
-    public static PartitionIterator filter(UnfilteredPartitionIterator iterator, int nowInSecs)
+    public static FilteredPartitions filter(UnfilteredPartitionIterator iterator, int nowInSecs)
     {
-        Filter filter = new Filter(!iterator.isForThrift(), nowInSecs);
-        if (iterator instanceof UnfilteredPartitions)
-            return new FilteredPartitions(filter, (UnfilteredPartitions) iterator);
-        return new FilteredPartitions(iterator, filter);
+        FilteredPartitions filtered = filter(iterator, new Filter(nowInSecs));
+
+        return iterator.isForThrift()
+             ? filtered
+             : (FilteredPartitions) Transformation.apply(filtered, new EmptyPartitionsDiscarder());
+    }
+
+    public static FilteredPartitions filter(UnfilteredPartitionIterator iterator, Filter filter)
+    {
+        return iterator instanceof UnfilteredPartitions
+             ? new FilteredPartitions(filter, (UnfilteredPartitions) iterator)
+             : new FilteredPartitions(iterator, filter);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/transform/FilteredRows.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/FilteredRows.java b/src/java/org/apache/cassandra/db/transform/FilteredRows.java
index 818d3bb..5b635eb 100644
--- a/src/java/org/apache/cassandra/db/transform/FilteredRows.java
+++ b/src/java/org/apache/cassandra/db/transform/FilteredRows.java
@@ -55,6 +55,6 @@ public final class FilteredRows extends BaseRows<Row, BaseRowIterator<?>> implem
      */
     public static RowIterator filter(UnfilteredRowIterator iterator, int nowInSecs)
     {
-        return new Filter(false, nowInSecs).applyToPartition(iterator);
+        return new Filter(nowInSecs).applyToPartition(iterator);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index a493836..fe88a63 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -151,6 +151,8 @@ public class TableMetrics
     public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Write");
     public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Range");
 
+    public final Meter shortReadProtectionRequests;
+
     public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;
     /**
      * stores metrics that will be rolled into a single global metric
@@ -645,6 +647,8 @@ public class TableMetrics
         casPrepare = new LatencyMetrics(factory, "CasPrepare", cfs.keyspace.metric.casPrepare);
         casPropose = new LatencyMetrics(factory, "CasPropose", cfs.keyspace.metric.casPropose);
         casCommit = new LatencyMetrics(factory, "CasCommit", cfs.keyspace.metric.casCommit);
+
+        shortReadProtectionRequests = Metrics.meter(factory.createMetricName("ShortReadProtectionRequests"));
     }
 
     public void updateSSTableIterated(int count)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 26b1b2a..72c4950 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -27,16 +27,13 @@ import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ClusteringIndexFilter;
 import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.transform.MoreRows;
-import org.apache.cassandra.db.transform.Transformation;
+import org.apache.cassandra.db.transform.*;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.net.*;
 import org.apache.cassandra.tracing.Tracing;
@@ -72,10 +69,29 @@ public class DataResolver extends ResponseResolver
             sources[i] = msg.from;
         }
 
-        // Even though every responses should honor the limit, we might have more than requested post reconciliation,
-        // so ensure we're respecting the limit.
+        /*
+         * Even though every response, individually, will honor the limit, it is possible that we will, after the merge,
+         * have more rows than the client requested. To make sure that we still conform to the original limit,
+         * we apply a top-level post-reconciliation counter to the merged partition iterator.
+         *
+         * Short read protection logic (ShortReadRowProtection.moreContents()) relies on this counter to be applied
+         * to the current partition to work. For this reason we have to apply the counter transformation before
+         * empty partition discard logic kicks in - for it will eagerly consume the iterator.
+         *
+         * That's why the order here is: 1) merge; 2) filter rows; 3) count; 4) discard empty partitions
+         *
+         * See CASSANDRA-13747 for more details.
+         */
+
         DataLimits.Counter counter = command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition());
-        return counter.applyTo(mergeWithShortReadProtection(iters, sources, counter));
+
+        UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters, sources, counter);
+        FilteredPartitions filtered = FilteredPartitions.filter(merged, new Filter(command.nowInSec()));
+        PartitionIterator counted = counter.applyTo(filtered);
+
+        return command.isForThrift()
+             ? counted
+             : Transformation.apply(counted, new EmptyPartitionsDiscarder());
     }
 
     public void compareResponses()
@@ -87,11 +103,13 @@ public class DataResolver extends ResponseResolver
         }
     }
 
-    private PartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results, InetAddress[] sources, DataLimits.Counter resultCounter)
+    private UnfilteredPartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results,
+                                                                     InetAddress[] sources,
+                                                                     DataLimits.Counter resultCounter)
     {
         // If we have only one results, there is no read repair to do and we can't get short reads
         if (results.size() == 1)
-            return UnfilteredPartitionIterators.filter(results.get(0), command.nowInSec());
+            return results.get(0);
 
         UnfilteredPartitionIterators.MergeListener listener = new RepairMergeListener(sources);
 
@@ -103,7 +121,7 @@ public class DataResolver extends ResponseResolver
                 results.set(i, Transformation.apply(results.get(i), new ShortReadProtection(sources[i], resultCounter)));
         }
 
-        return UnfilteredPartitionIterators.mergeAndFilter(results, command.nowInSec(), listener);
+        return UnfilteredPartitionIterators.merge(results, command.nowInSec(), listener);
     }
 
     private class RepairMergeListener implements UnfilteredPartitionIterators.MergeListener
@@ -510,7 +528,7 @@ public class DataResolver extends ResponseResolver
                 // counting iterator.
                 int n = postReconciliationCounter.countedInCurrentPartition();
                 int x = counter.countedInCurrentPartition();
-                int toQuery = Math.max(((n * n) / x) - n, 1);
+                int toQuery = Math.max(((n * n) / Math.max(x, 1)) - n, 1);
 
                 DataLimits retryLimits = command.limits().forShortReadRetry(toQuery);
                 ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey);
@@ -523,6 +541,9 @@ public class DataResolver extends ResponseResolver
                                                                                    partitionKey,
                                                                                    retryFilter);
 
+                Tracing.trace("Requesting {} extra rows from {} for short read protection", toQuery, source);
+                Schema.instance.getColumnFamilyStoreInstance(cmd.metadata().cfId).metric.shortReadProtectionRequests.mark();
+
                 return doShortReadRetry(cmd);
             }
 
@@ -531,7 +552,7 @@ public class DataResolver extends ResponseResolver
                 DataResolver resolver = new DataResolver(keyspace, retryCommand, ConsistencyLevel.ONE, 1);
                 ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source));
                 if (StorageProxy.canDoLocalRequest(source))
-                      StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler));
+                    StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler));
                 else
                     MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(MessagingService.current_version), source, handler);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org