You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by "iamaleksey (via GitHub)" <gi...@apache.org> on 2023/06/15 16:47:28 UTC

[GitHub] [cassandra] iamaleksey commented on a diff in pull request #2395: CEP-15: Convert AccordStateCache cache from write-through to write-back

iamaleksey commented on code in PR #2395:
URL: https://github.com/apache/cassandra/pull/2395#discussion_r1231300788


##########
src/java/org/apache/cassandra/service/accord/AccordStateCache.java:
##########
@@ -15,547 +15,442 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.cassandra.service.accord;
 
 import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
+import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.ToLongFunction;
 import java.util.stream.Stream;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import accord.utils.Invariants;
-import accord.utils.async.AsyncChains;
-import accord.utils.async.AsyncResult;
-import org.apache.cassandra.utils.ObjectSizes;
+import accord.utils.IntrusiveLinkedList;
+import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.service.accord.AccordCachingState.Status;
 
-import static org.apache.cassandra.service.accord.AccordLoadingState.LoadingState.FAILED;
-import static org.apache.cassandra.service.accord.AccordLoadingState.LoadingState.LOADED;
+import static accord.utils.Invariants.checkState;
+import static java.lang.String.format;
+import static org.apache.cassandra.service.accord.AccordCachingState.Status.EVICTED;
+import static org.apache.cassandra.service.accord.AccordCachingState.Status.FAILED_TO_LOAD;
+import static org.apache.cassandra.service.accord.AccordCachingState.Status.LOADING;
+import static org.apache.cassandra.service.accord.AccordCachingState.Status.SAVING;
 
 /**
  * Cache for AccordCommand and AccordCommandsForKey, available memory is shared between the two object types.
- *
+ * </p>
  * Supports dynamic object sizes. After each acquire/free cycle, the cacheable objects size is recomputed to
  * account for data added/removed during txn processing if it's modified flag is set
  */
-public class AccordStateCache
+public class AccordStateCache extends IntrusiveLinkedList<AccordCachingState<?,?>>
 {
     private static final Logger logger = LoggerFactory.getLogger(AccordStateCache.class);
 
-    public static class Node<K, V> extends AccordLoadingState<K, V>
-    {
-        static final long EMPTY_SIZE = ObjectSizes.measure(new AccordStateCache.Node(null));
-
-        private Node<?, ?> prev;
-        private Node<?, ?> next;
-        private int references = 0;
-        private long lastQueriedEstimatedSizeOnHeap = 0;
-
-        public Node(K key)
-        {
-            super(key);
-        }
-
-        public int referenceCount()
-        {
-            return references;
-        }
-
-        boolean isLoaded()
-        {
-            return state() == LOADED;
-        }
-
-        public boolean isComplete()
-        {
-            switch (state())
-            {
-                case PENDING:
-                case UNINITIALIZED:
-                    return false;
-                case FAILED:
-                case LOADED:
-                    return true;
-                default: throw new UnsupportedOperationException("Unknown state: " + state());
-            }
-        }
-
-        private boolean isInQueue()
-        {
-            return prev != null && next != null;
-        }
-
-        long estimatedSizeOnHeap(ToLongFunction<V> estimator)
-        {
-            long result = EMPTY_SIZE;
-            V v;
-            if (isLoaded() && (v = value()) != null)
-                result += estimator.applyAsLong(v);
-            lastQueriedEstimatedSizeOnHeap = result;
-            return result;
-        }
-
-        long estimatedSizeOnHeapDelta(ToLongFunction<V> estimator)
-        {
-            long prevSize = lastQueriedEstimatedSizeOnHeap;
-            return estimatedSizeOnHeap(estimator) - prevSize;
-        }
-
-        boolean shouldUpdateSize()
-        {
-            return isLoaded() && lastQueriedEstimatedSizeOnHeap == EMPTY_SIZE;
-        }
-
-        void maybeCleanupLoad()
-        {
-            state();
-        }
-
-        @Override
-        public String toString()
-        {
-            return "Node{" + state() +
-                   ", key=" + key() +
-                   ", references=" + references +
-                   "}@" + Integer.toHexString(System.identityHashCode(this));
-        }
-    }
-
     static class Stats
     {
         private long queries;
         private long hits;
         private long misses;
     }
 
-    private static class NamedMap<K, V> extends HashMap<K, V>
-    {
-        final String name;
+    private final Map<Object, AccordCachingState<?, ?>> cache = new HashMap<>();
+    private final HashMap<Class<?>, Instance<?, ?, ?>> instances = new HashMap<>();
 
-        public NamedMap(String name)
-        {
-            this.name = name;
-        }
-    }
-
-    private final Map<Object, Node<?, ?>> cache = new HashMap<>();
-    private final Set<Instance<?, ?, ?>> instances = new HashSet<>();
-
-    private final NamedMap<Object, AsyncResult<Void>> saveResults = new NamedMap<>("saveResults");
+    private final ExecutorPlus loadExecutor, saveExecutor;
 
     private int unreferenced = 0;
-    Node<?, ?> head;
-    Node<?, ?> tail;
     private long maxSizeInBytes;
     private long bytesCached = 0;
     private final Stats stats = new Stats();
 
-    public AccordStateCache(long maxSizeInBytes)
+    public AccordStateCache(ExecutorPlus loadExecutor, ExecutorPlus saveExecutor, long maxSizeInBytes)
     {
+        this.loadExecutor = loadExecutor;
+        this.saveExecutor = saveExecutor;
         this.maxSizeInBytes = maxSizeInBytes;
     }
 
     public void setMaxSize(long size)
     {
         maxSizeInBytes = size;
-        maybeEvict();
+        maybeEvictSomeNodes();
     }
 
     public long getMaxSize()
     {
         return maxSizeInBytes;
     }
 
-    @VisibleForTesting
-    public void clear()
-    {
-        head = tail = null;
-        cache.clear();
-        saveResults.clear();
-    }
-
-    @VisibleForTesting
-    public Map<Object, AsyncResult<Void>> saveResults()
-    {
-        return saveResults;
-    }
-
-    private void unlink(Node<?, ?> node)
+    private void unlink(AccordCachingState<?, ?> node)
     {
-        Node<?, ?> prev = node.prev;
-        Node<?, ?> next = node.next;
-
-        if (prev == null)
-        {
-            Preconditions.checkState(head == node, "previous is null but the head isnt the provided node!");
-            head = next;
-        }
-        else
-        {
-            prev.next = next;
-        }
-
-        if (next == null)
-        {
-            Preconditions.checkState(tail == node, "next is null but the tail isnt the provided node!");
-            tail = prev;
-        }
-        else
-        {
-            next.prev = prev;
-        }
-
-        node.prev = null;
-        node.next = null;
+        node.unlink();
         unreferenced--;
     }
 
-    private void push(Node<?, ?> node)
+    private void link(AccordCachingState<?, ?> node)
     {
-        if (head != null)
-        {
-            node.prev = null;
-            node.next = head;
-            head.prev = node;
-            head = node;
-        }
-        else
-        {
-            head = node;
-            tail = node;
-        }
+        addLast(node);
         unreferenced++;
     }
 
-    private <K, V> void updateSize(Node<K, V> node, ToLongFunction<V> estimator)
-    {
-        bytesCached += node.estimatedSizeOnHeapDelta(estimator);
-    }
-
-    // don't evict if there's an outstanding save result. If an item is evicted then reloaded
-    // before it's mutation is applied, out of date info will be loaded
-    private boolean canEvict(Node<?, ?> node)
+    @SuppressWarnings("unchecked")
+    private <K, V> void maybeUpdateSize(AccordCachingState<?, ?> node, ToLongFunction<?> estimator)
     {
-        Invariants.checkState(node.references == 0);
-        return node.state() == FAILED || !hasActiveAsyncResult(saveResults, node.key());
+        if (node.shouldUpdateSize())
+            bytesCached += ((AccordCachingState<K, V>) node).estimatedSizeOnHeapDelta((ToLongFunction<V>) estimator);
     }
 
-    private void maybeEvict()
+    /*
+     * Roughly respects LRU semantics when evicting. Might consider prioritising keeping MODIFIED nodes around
+     * for longer to maximise the chances of hitting system tables fewer times (or not at all).
+     */
+    private void maybeEvictSomeNodes()
     {
         if (bytesCached <= maxSizeInBytes)
             return;
 
-        Node<?, ?> current = tail;
-        while (current != null && bytesCached > maxSizeInBytes)
+        Iterator<AccordCachingState<?, ?>> iter = this.iterator();

Review Comment:
   Introduced iterator caching to `IntrusiveLinkedList`, like Agrona does in all of its single-threaded collections, here: https://github.com/apache/cassandra-accord/pull/51



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org