You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by GitBox <gi...@apache.org> on 2020/07/02 12:41:14 UTC

[GitHub] [cassandra] adelapena commented on a change in pull request #659: Operational Improvements & Hardening for Replica Filtering Protection

adelapena commented on a change in pull request #659:
URL: https://github.com/apache/cassandra/pull/659#discussion_r448967271



##########
File path: src/java/org/apache/cassandra/service/ReplicaFilteringProtection.java
##########
@@ -239,7 +253,21 @@ public Row onMergedRows(Row merged, Row[] versions)
                 {
                     // cache the row versions to be able to regenerate the original row iterator
                     for (int i = 0; i < versions.length; i++)
+                    {
                         builders[i].addRow(versions[i]);
+                        rowsCached++;
+
+                        if (rowsCached == cachedRowsFailThreshold)

Review comment:
       I wonder if it would be better to use `>` instead of `==`. If we used `>` and we were counting merged materialized/cached rows instead of per replica, we could guarantee that, without conflicts, a threshold of N rows allows a max limit of N rows. Also, I think that `tombstone_failure_threshold` uses `>`, so it will be more consistent with that meaning of threshold value. WDYT?

##########
File path: src/java/org/apache/cassandra/config/Config.java
##########
@@ -280,6 +280,9 @@
     public volatile int tombstone_warn_threshold = 1000;
     public volatile int tombstone_failure_threshold = 100000;
 
+    public volatile int cached_replica_rows_warn_threshold = 1024;
+    public volatile int cached_replica_rows_fail_threshold = 16384;

Review comment:
       I don't know if using powers of two gives us any advantage here, and I'm a bit afraid it could make users thing that this is somehow measuring memory sizes. Given that this seems more related to the query limit, perhaps we should use something like 1000, WDYT?
   
   Also, that is the number of rows in all replicas, so we are reading from two on-sync replicas the threshold will start at 512 results, won't it? Perhaps we should count the number of merged materialized/cached rows, ignoring the replicas, so the thresholds can be more easily related to the query limit.

##########
File path: conf/cassandra.yaml
##########
@@ -661,6 +661,23 @@ auto_snapshot: true
 tombstone_warn_threshold: 1000
 tombstone_failure_threshold: 100000
 
+# Filtering and secondary index queries at read consistency levels above ONE/LOCAL_ONE use a
+# mechanism called replica filtering protection to ensure that results from stale replicas do
+# not violate consistency. This mechanism, which operates in two phases, caches replica
+# results on-heap at the coordinator. The more stale results returned by the former, the more
+# rows cached at the latter.
+#
+# These thresholds exist to limit the damage very stale replicas can cause during these queries.
+# They essentially define the memory budget individal index and filtering queries have to meet

Review comment:
       We could explicitly say that the threshold are for the number of rows. Also, that this number is per query (or per query page), and perhaps that they only live for the duration of the query, especially if we used "cached" instead of "materialized".

##########
File path: src/java/org/apache/cassandra/db/rows/EncodingStats.java
##########
@@ -107,6 +108,29 @@ public EncodingStats mergeWith(EncodingStats that)
         return new EncodingStats(minTimestamp, minDelTime, minTTL);
     }
 
+    /**
+     * Merge one or more EncodingStats, that are lazily materialized from some list of arbitrary type by the provided function
+     */
+    public static <V, F extends Function<V, EncodingStats>> EncodingStats merge(List<V> values, F function)
+    {
+        if (values.size() == 1)
+            return function.apply(values.get(0));
+
+        Collector collector = new Collector();
+        for (int i=0, isize=values.size(); i<isize; i++)

Review comment:
       Nit:
   ```suggestion
           for (int i = 0, isize = values.size(); i < isize; i++)
   ```
   Also, perhaps `size` or `iSize` instead of `isize`?

##########
File path: conf/cassandra.yaml
##########
@@ -661,6 +661,23 @@ auto_snapshot: true
 tombstone_warn_threshold: 1000
 tombstone_failure_threshold: 100000
 
+# Filtering and secondary index queries at read consistency levels above ONE/LOCAL_ONE use a
+# mechanism called replica filtering protection to ensure that results from stale replicas do
+# not violate consistency. This mechanism, which operates in two phases, caches replica
+# results on-heap at the coordinator. The more stale results returned by the former, the more
+# rows cached at the latter.
+#
+# These thresholds exist to limit the damage very stale replicas can cause during these queries.
+# They essentially define the memory budget individal index and filtering queries have to meet
+# the client-desired consistency level.
+#
+# "cached_replica_rows_warn_threshold" is the threshold at which a warning will be logged.
+# "cached_replica_rows_fail_threshold" is the threshold at which the query will fail.
+#
+# These thresholds may also be adjusted at runtime using the StorageService mbean.
+cached_replica_rows_warn_threshold: 1024
+cached_replica_rows_fail_threshold: 16384

Review comment:
       Nice description. I think I'd prefer "materialized" rows rather than "cached" rows, since the later could be a bit misleading and make the unaware reader think that this is doing something different to what it actually does. WDYT?
   
   Also we could include a reference to the RFP ticket (CASSANDRA-8272) for further details, like the ones we have for another tickets in this file.

##########
File path: src/java/org/apache/cassandra/db/rows/EncodingStats.java
##########
@@ -107,6 +108,29 @@ public EncodingStats mergeWith(EncodingStats that)
         return new EncodingStats(minTimestamp, minDelTime, minTTL);
     }
 
+    /**
+     * Merge one or more EncodingStats, that are lazily materialized from some list of arbitrary type by the provided function
+     */
+    public static <V, F extends Function<V, EncodingStats>> EncodingStats merge(List<V> values, F function)
+    {
+        if (values.size() == 1)
+            return function.apply(values.get(0));
+
+        Collector collector = new Collector();
+        for (int i=0, isize=values.size(); i<isize; i++)
+        {
+            V v = values.get(i);
+            EncodingStats stats = function.apply(v);
+            if (stats.minTimestamp != TIMESTAMP_EPOCH)
+                collector.updateTimestamp(stats.minTimestamp);
+            if(stats.minLocalDeletionTime != DELETION_TIME_EPOCH)
+                collector.updateLocalDeletionTime(stats.minLocalDeletionTime);
+            if(stats.minTTL != TTL_EPOCH)

Review comment:
       ```suggestion
               if (stats.minTTL != TTL_EPOCH)
   ```

##########
File path: src/java/org/apache/cassandra/service/ReplicaFilteringProtection.java
##########
@@ -239,7 +253,21 @@ public Row onMergedRows(Row merged, Row[] versions)
                 {
                     // cache the row versions to be able to regenerate the original row iterator
                     for (int i = 0; i < versions.length; i++)
+                    {
                         builders[i].addRow(versions[i]);
+                        rowsCached++;

Review comment:
       Are these limited by `tombstone_failure_threshold`?

##########
File path: src/java/org/apache/cassandra/db/rows/EncodingStats.java
##########
@@ -107,6 +108,29 @@ public EncodingStats mergeWith(EncodingStats that)
         return new EncodingStats(minTimestamp, minDelTime, minTTL);
     }
 
+    /**
+     * Merge one or more EncodingStats, that are lazily materialized from some list of arbitrary type by the provided function
+     */
+    public static <V, F extends Function<V, EncodingStats>> EncodingStats merge(List<V> values, F function)
+    {
+        if (values.size() == 1)
+            return function.apply(values.get(0));
+
+        Collector collector = new Collector();
+        for (int i=0, isize=values.size(); i<isize; i++)
+        {
+            V v = values.get(i);
+            EncodingStats stats = function.apply(v);
+            if (stats.minTimestamp != TIMESTAMP_EPOCH)
+                collector.updateTimestamp(stats.minTimestamp);
+            if(stats.minLocalDeletionTime != DELETION_TIME_EPOCH)

Review comment:
       ```suggestion
               if (stats.minLocalDeletionTime != DELETION_TIME_EPOCH)
   ```

##########
File path: src/java/org/apache/cassandra/service/DataResolver.java
##########
@@ -171,7 +171,14 @@ private PartitionIterator resolveWithReplicaFilteringProtection()
         // We need separate contexts, as each context has his own counter
         ResolveContext firstPhaseContext = new ResolveContext(count);
         ResolveContext secondPhaseContext = new ResolveContext(count);
-        ReplicaFilteringProtection rfp = new ReplicaFilteringProtection(keyspace, command, consistency, firstPhaseContext.sources);
+
+        ReplicaFilteringProtection rfp = new ReplicaFilteringProtection(keyspace,
+                                                                        command,
+                                                                        consistency,
+                                                                        firstPhaseContext.sources,
+                                                                        DatabaseDescriptor.getCachedReplicaRowsWarnThreshold(),
+                                                                        DatabaseDescriptor.getCachedReplicaRowsFailThreshold());
+

Review comment:
       Have you intentionally withdrew the caching of `UnaryOperator.identity()` below?

##########
File path: src/java/org/apache/cassandra/service/ReplicaFilteringProtection.java
##########
@@ -132,34 +147,33 @@
      */
     UnfilteredPartitionIterator queryProtectedPartitions(int source)
     {
-        UnfilteredPartitionIterator original = makeIterator(originalPartitions.get(source));
-        SortedMap<DecoratedKey, BTreeSet.Builder<Clustering>> toFetch = rowsToFetch.get(source);
+        UnfilteredPartitionIterator original = makeIterator(originalPartitions.set(source, null));
+        SortedMap<DecoratedKey, BTreeSet.Builder<Clustering>> toFetch = rowsToFetch.set(source, null);
 
         if (toFetch.isEmpty())
             return original;
 
-        // TODO: this would be more efficient if we had multi-key queries internally
+        // TODO: This would be more efficient if we had multi-key queries internally (see CASSANDRA-15910)

Review comment:
       +1




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org