You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by "iamaleksey (via GitHub)" <gi...@apache.org> on 2023/06/07 13:45:37 UTC

[GitHub] [cassandra] iamaleksey opened a new pull request, #2395: CEP-15: Convert AccordStateCache cache from write-through to write-back

iamaleksey opened a new pull request, #2395:
URL: https://github.com/apache/cassandra/pull/2395

   patch by Aleksey Yeschenko; reviewed by Blake Eggleston for CASSANDRA-18563


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] bdeggleston commented on a diff in pull request #2395: CEP-15: Convert AccordStateCache cache from write-through to write-back

Posted by "bdeggleston (via GitHub)" <gi...@apache.org>.
bdeggleston commented on code in PR #2395:
URL: https://github.com/apache/cassandra/pull/2395#discussion_r1231332508


##########
src/java/org/apache/cassandra/service/accord/AccordStateCache.java:
##########
@@ -15,547 +15,442 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.cassandra.service.accord;
 
 import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
+import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.ToLongFunction;
 import java.util.stream.Stream;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import accord.utils.Invariants;
-import accord.utils.async.AsyncChains;
-import accord.utils.async.AsyncResult;
-import org.apache.cassandra.utils.ObjectSizes;
+import accord.utils.IntrusiveLinkedList;
+import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.service.accord.AccordCachingState.Status;
 
-import static org.apache.cassandra.service.accord.AccordLoadingState.LoadingState.FAILED;
-import static org.apache.cassandra.service.accord.AccordLoadingState.LoadingState.LOADED;
+import static accord.utils.Invariants.checkState;
+import static java.lang.String.format;
+import static org.apache.cassandra.service.accord.AccordCachingState.Status.EVICTED;
+import static org.apache.cassandra.service.accord.AccordCachingState.Status.FAILED_TO_LOAD;
+import static org.apache.cassandra.service.accord.AccordCachingState.Status.LOADING;
+import static org.apache.cassandra.service.accord.AccordCachingState.Status.SAVING;
 
 /**
  * Cache for AccordCommand and AccordCommandsForKey, available memory is shared between the two object types.
- *
+ * </p>
  * Supports dynamic object sizes. After each acquire/free cycle, the cacheable objects size is recomputed to
  * account for data added/removed during txn processing if it's modified flag is set
  */
-public class AccordStateCache
+public class AccordStateCache extends IntrusiveLinkedList<AccordCachingState<?,?>>
 {
     private static final Logger logger = LoggerFactory.getLogger(AccordStateCache.class);
 
-    public static class Node<K, V> extends AccordLoadingState<K, V>
-    {
-        static final long EMPTY_SIZE = ObjectSizes.measure(new AccordStateCache.Node(null));
-
-        private Node<?, ?> prev;
-        private Node<?, ?> next;
-        private int references = 0;
-        private long lastQueriedEstimatedSizeOnHeap = 0;
-
-        public Node(K key)
-        {
-            super(key);
-        }
-
-        public int referenceCount()
-        {
-            return references;
-        }
-
-        boolean isLoaded()
-        {
-            return state() == LOADED;
-        }
-
-        public boolean isComplete()
-        {
-            switch (state())
-            {
-                case PENDING:
-                case UNINITIALIZED:
-                    return false;
-                case FAILED:
-                case LOADED:
-                    return true;
-                default: throw new UnsupportedOperationException("Unknown state: " + state());
-            }
-        }
-
-        private boolean isInQueue()
-        {
-            return prev != null && next != null;
-        }
-
-        long estimatedSizeOnHeap(ToLongFunction<V> estimator)
-        {
-            long result = EMPTY_SIZE;
-            V v;
-            if (isLoaded() && (v = value()) != null)
-                result += estimator.applyAsLong(v);
-            lastQueriedEstimatedSizeOnHeap = result;
-            return result;
-        }
-
-        long estimatedSizeOnHeapDelta(ToLongFunction<V> estimator)
-        {
-            long prevSize = lastQueriedEstimatedSizeOnHeap;
-            return estimatedSizeOnHeap(estimator) - prevSize;
-        }
-
-        boolean shouldUpdateSize()
-        {
-            return isLoaded() && lastQueriedEstimatedSizeOnHeap == EMPTY_SIZE;
-        }
-
-        void maybeCleanupLoad()
-        {
-            state();
-        }
-
-        @Override
-        public String toString()
-        {
-            return "Node{" + state() +
-                   ", key=" + key() +
-                   ", references=" + references +
-                   "}@" + Integer.toHexString(System.identityHashCode(this));
-        }
-    }
-
     static class Stats
     {
         private long queries;
         private long hits;
         private long misses;
     }
 
-    private static class NamedMap<K, V> extends HashMap<K, V>
-    {
-        final String name;
+    private final Map<Object, AccordCachingState<?, ?>> cache = new HashMap<>();
+    private final HashMap<Class<?>, Instance<?, ?, ?>> instances = new HashMap<>();
 
-        public NamedMap(String name)
-        {
-            this.name = name;
-        }
-    }
-
-    private final Map<Object, Node<?, ?>> cache = new HashMap<>();
-    private final Set<Instance<?, ?, ?>> instances = new HashSet<>();
-
-    private final NamedMap<Object, AsyncResult<Void>> saveResults = new NamedMap<>("saveResults");
+    private final ExecutorPlus loadExecutor, saveExecutor;
 
     private int unreferenced = 0;
-    Node<?, ?> head;
-    Node<?, ?> tail;
     private long maxSizeInBytes;
     private long bytesCached = 0;
     private final Stats stats = new Stats();
 
-    public AccordStateCache(long maxSizeInBytes)
+    public AccordStateCache(ExecutorPlus loadExecutor, ExecutorPlus saveExecutor, long maxSizeInBytes)
     {
+        this.loadExecutor = loadExecutor;
+        this.saveExecutor = saveExecutor;
         this.maxSizeInBytes = maxSizeInBytes;
     }
 
     public void setMaxSize(long size)
     {
         maxSizeInBytes = size;
-        maybeEvict();
+        maybeEvictSomeNodes();
     }
 
     public long getMaxSize()
     {
         return maxSizeInBytes;
     }
 
-    @VisibleForTesting
-    public void clear()
-    {
-        head = tail = null;
-        cache.clear();
-        saveResults.clear();
-    }
-
-    @VisibleForTesting
-    public Map<Object, AsyncResult<Void>> saveResults()
-    {
-        return saveResults;
-    }
-
-    private void unlink(Node<?, ?> node)
+    private void unlink(AccordCachingState<?, ?> node)
     {
-        Node<?, ?> prev = node.prev;
-        Node<?, ?> next = node.next;
-
-        if (prev == null)
-        {
-            Preconditions.checkState(head == node, "previous is null but the head isnt the provided node!");
-            head = next;
-        }
-        else
-        {
-            prev.next = next;
-        }
-
-        if (next == null)
-        {
-            Preconditions.checkState(tail == node, "next is null but the tail isnt the provided node!");
-            tail = prev;
-        }
-        else
-        {
-            next.prev = prev;
-        }
-
-        node.prev = null;
-        node.next = null;
+        node.unlink();
         unreferenced--;
     }
 
-    private void push(Node<?, ?> node)
+    private void link(AccordCachingState<?, ?> node)
     {
-        if (head != null)
-        {
-            node.prev = null;
-            node.next = head;
-            head.prev = node;
-            head = node;
-        }
-        else
-        {
-            head = node;
-            tail = node;
-        }
+        addLast(node);
         unreferenced++;
     }
 
-    private <K, V> void updateSize(Node<K, V> node, ToLongFunction<V> estimator)
-    {
-        bytesCached += node.estimatedSizeOnHeapDelta(estimator);
-    }
-
-    // don't evict if there's an outstanding save result. If an item is evicted then reloaded
-    // before it's mutation is applied, out of date info will be loaded
-    private boolean canEvict(Node<?, ?> node)
+    @SuppressWarnings("unchecked")
+    private <K, V> void maybeUpdateSize(AccordCachingState<?, ?> node, ToLongFunction<?> estimator)
     {
-        Invariants.checkState(node.references == 0);
-        return node.state() == FAILED || !hasActiveAsyncResult(saveResults, node.key());
+        if (node.shouldUpdateSize())
+            bytesCached += ((AccordCachingState<K, V>) node).estimatedSizeOnHeapDelta((ToLongFunction<V>) estimator);
     }
 
-    private void maybeEvict()
+    /*
+     * Roughly respects LRU semantics when evicting. Might consider prioritising keeping MODIFIED nodes around
+     * for longer to maximise the chances of hitting system tables fewer times (or not at all).
+     */
+    private void maybeEvictSomeNodes()
     {
         if (bytesCached <= maxSizeInBytes)
             return;
 
-        Node<?, ?> current = tail;
-        while (current != null && bytesCached > maxSizeInBytes)
+        Iterator<AccordCachingState<?, ?>> iter = this.iterator();
+        while (iter.hasNext() && bytesCached > maxSizeInBytes)
         {
-            Node<?, ?> evict = current;
-            current = current.prev;
-
-            // TODO (expected, efficiency): can this be reworked so we're not skipping unevictable nodes everytime we try to evict?
-            if (!canEvict(evict))
-                continue;
+            AccordCachingState<?, ?> node = iter.next();
+            checkState(node.references == 0);
 
-            evict(evict, true);
+            /*
+             * TODO (expected, efficiency):
+             *    can this be reworked so we're not skipping unevictable nodes everytime we try to evict?
+             */
+            Status status = node.status(); // status() call completes (if completeable)
+            switch (status)
+            {
+                default: throw new IllegalStateException("Unhandled status " + status);
+                case LOADED:
+                    unlink(node);
+                    evict(node);
+                    break;
+                case MODIFIED:
+                    // schedule a save to disk, keep linked and in the cache map
+                    Instance<?, ?, ?> instance = instanceForNode(node);
+                    node.save(saveExecutor, instance.saveFunction);
+                    maybeUpdateSize(node, instance.heapEstimator);
+                    break;
+                case SAVING:
+                    // skip over until completes to LOADED or FAILED_TO_SAVE

Review Comment:
   heh, oh right, missed it there :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] iamaleksey commented on a diff in pull request #2395: CEP-15: Convert AccordStateCache cache from write-through to write-back

Posted by "iamaleksey (via GitHub)" <gi...@apache.org>.
iamaleksey commented on code in PR #2395:
URL: https://github.com/apache/cassandra/pull/2395#discussion_r1231300788


##########
src/java/org/apache/cassandra/service/accord/AccordStateCache.java:
##########
@@ -15,547 +15,442 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.cassandra.service.accord;
 
 import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
+import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.ToLongFunction;
 import java.util.stream.Stream;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import accord.utils.Invariants;
-import accord.utils.async.AsyncChains;
-import accord.utils.async.AsyncResult;
-import org.apache.cassandra.utils.ObjectSizes;
+import accord.utils.IntrusiveLinkedList;
+import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.service.accord.AccordCachingState.Status;
 
-import static org.apache.cassandra.service.accord.AccordLoadingState.LoadingState.FAILED;
-import static org.apache.cassandra.service.accord.AccordLoadingState.LoadingState.LOADED;
+import static accord.utils.Invariants.checkState;
+import static java.lang.String.format;
+import static org.apache.cassandra.service.accord.AccordCachingState.Status.EVICTED;
+import static org.apache.cassandra.service.accord.AccordCachingState.Status.FAILED_TO_LOAD;
+import static org.apache.cassandra.service.accord.AccordCachingState.Status.LOADING;
+import static org.apache.cassandra.service.accord.AccordCachingState.Status.SAVING;
 
 /**
  * Cache for AccordCommand and AccordCommandsForKey, available memory is shared between the two object types.
- *
+ * </p>
  * Supports dynamic object sizes. After each acquire/free cycle, the cacheable objects size is recomputed to
  * account for data added/removed during txn processing if it's modified flag is set
  */
-public class AccordStateCache
+public class AccordStateCache extends IntrusiveLinkedList<AccordCachingState<?,?>>
 {
     private static final Logger logger = LoggerFactory.getLogger(AccordStateCache.class);
 
-    public static class Node<K, V> extends AccordLoadingState<K, V>
-    {
-        static final long EMPTY_SIZE = ObjectSizes.measure(new AccordStateCache.Node(null));
-
-        private Node<?, ?> prev;
-        private Node<?, ?> next;
-        private int references = 0;
-        private long lastQueriedEstimatedSizeOnHeap = 0;
-
-        public Node(K key)
-        {
-            super(key);
-        }
-
-        public int referenceCount()
-        {
-            return references;
-        }
-
-        boolean isLoaded()
-        {
-            return state() == LOADED;
-        }
-
-        public boolean isComplete()
-        {
-            switch (state())
-            {
-                case PENDING:
-                case UNINITIALIZED:
-                    return false;
-                case FAILED:
-                case LOADED:
-                    return true;
-                default: throw new UnsupportedOperationException("Unknown state: " + state());
-            }
-        }
-
-        private boolean isInQueue()
-        {
-            return prev != null && next != null;
-        }
-
-        long estimatedSizeOnHeap(ToLongFunction<V> estimator)
-        {
-            long result = EMPTY_SIZE;
-            V v;
-            if (isLoaded() && (v = value()) != null)
-                result += estimator.applyAsLong(v);
-            lastQueriedEstimatedSizeOnHeap = result;
-            return result;
-        }
-
-        long estimatedSizeOnHeapDelta(ToLongFunction<V> estimator)
-        {
-            long prevSize = lastQueriedEstimatedSizeOnHeap;
-            return estimatedSizeOnHeap(estimator) - prevSize;
-        }
-
-        boolean shouldUpdateSize()
-        {
-            return isLoaded() && lastQueriedEstimatedSizeOnHeap == EMPTY_SIZE;
-        }
-
-        void maybeCleanupLoad()
-        {
-            state();
-        }
-
-        @Override
-        public String toString()
-        {
-            return "Node{" + state() +
-                   ", key=" + key() +
-                   ", references=" + references +
-                   "}@" + Integer.toHexString(System.identityHashCode(this));
-        }
-    }
-
     static class Stats
     {
         private long queries;
         private long hits;
         private long misses;
     }
 
-    private static class NamedMap<K, V> extends HashMap<K, V>
-    {
-        final String name;
+    private final Map<Object, AccordCachingState<?, ?>> cache = new HashMap<>();
+    private final HashMap<Class<?>, Instance<?, ?, ?>> instances = new HashMap<>();
 
-        public NamedMap(String name)
-        {
-            this.name = name;
-        }
-    }
-
-    private final Map<Object, Node<?, ?>> cache = new HashMap<>();
-    private final Set<Instance<?, ?, ?>> instances = new HashSet<>();
-
-    private final NamedMap<Object, AsyncResult<Void>> saveResults = new NamedMap<>("saveResults");
+    private final ExecutorPlus loadExecutor, saveExecutor;
 
     private int unreferenced = 0;
-    Node<?, ?> head;
-    Node<?, ?> tail;
     private long maxSizeInBytes;
     private long bytesCached = 0;
     private final Stats stats = new Stats();
 
-    public AccordStateCache(long maxSizeInBytes)
+    public AccordStateCache(ExecutorPlus loadExecutor, ExecutorPlus saveExecutor, long maxSizeInBytes)
     {
+        this.loadExecutor = loadExecutor;
+        this.saveExecutor = saveExecutor;
         this.maxSizeInBytes = maxSizeInBytes;
     }
 
     public void setMaxSize(long size)
     {
         maxSizeInBytes = size;
-        maybeEvict();
+        maybeEvictSomeNodes();
     }
 
     public long getMaxSize()
     {
         return maxSizeInBytes;
     }
 
-    @VisibleForTesting
-    public void clear()
-    {
-        head = tail = null;
-        cache.clear();
-        saveResults.clear();
-    }
-
-    @VisibleForTesting
-    public Map<Object, AsyncResult<Void>> saveResults()
-    {
-        return saveResults;
-    }
-
-    private void unlink(Node<?, ?> node)
+    private void unlink(AccordCachingState<?, ?> node)
     {
-        Node<?, ?> prev = node.prev;
-        Node<?, ?> next = node.next;
-
-        if (prev == null)
-        {
-            Preconditions.checkState(head == node, "previous is null but the head isnt the provided node!");
-            head = next;
-        }
-        else
-        {
-            prev.next = next;
-        }
-
-        if (next == null)
-        {
-            Preconditions.checkState(tail == node, "next is null but the tail isnt the provided node!");
-            tail = prev;
-        }
-        else
-        {
-            next.prev = prev;
-        }
-
-        node.prev = null;
-        node.next = null;
+        node.unlink();
         unreferenced--;
     }
 
-    private void push(Node<?, ?> node)
+    private void link(AccordCachingState<?, ?> node)
     {
-        if (head != null)
-        {
-            node.prev = null;
-            node.next = head;
-            head.prev = node;
-            head = node;
-        }
-        else
-        {
-            head = node;
-            tail = node;
-        }
+        addLast(node);
         unreferenced++;
     }
 
-    private <K, V> void updateSize(Node<K, V> node, ToLongFunction<V> estimator)
-    {
-        bytesCached += node.estimatedSizeOnHeapDelta(estimator);
-    }
-
-    // don't evict if there's an outstanding save result. If an item is evicted then reloaded
-    // before it's mutation is applied, out of date info will be loaded
-    private boolean canEvict(Node<?, ?> node)
+    @SuppressWarnings("unchecked")
+    private <K, V> void maybeUpdateSize(AccordCachingState<?, ?> node, ToLongFunction<?> estimator)
     {
-        Invariants.checkState(node.references == 0);
-        return node.state() == FAILED || !hasActiveAsyncResult(saveResults, node.key());
+        if (node.shouldUpdateSize())
+            bytesCached += ((AccordCachingState<K, V>) node).estimatedSizeOnHeapDelta((ToLongFunction<V>) estimator);
     }
 
-    private void maybeEvict()
+    /*
+     * Roughly respects LRU semantics when evicting. Might consider prioritising keeping MODIFIED nodes around
+     * for longer to maximise the chances of hitting system tables fewer times (or not at all).
+     */
+    private void maybeEvictSomeNodes()
     {
         if (bytesCached <= maxSizeInBytes)
             return;
 
-        Node<?, ?> current = tail;
-        while (current != null && bytesCached > maxSizeInBytes)
+        Iterator<AccordCachingState<?, ?>> iter = this.iterator();

Review Comment:
   Introduced iterator caching to `IntrusiveLinkedList`, like Agrona does in all of its single-threaded collections, here: https://github.com/apache/cassandra-accord/pull/51



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] iamaleksey commented on a diff in pull request #2395: CEP-15: Convert AccordStateCache cache from write-through to write-back

Posted by "iamaleksey (via GitHub)" <gi...@apache.org>.
iamaleksey commented on code in PR #2395:
URL: https://github.com/apache/cassandra/pull/2395#discussion_r1231305832


##########
src/java/org/apache/cassandra/service/accord/AccordStateCache.java:
##########
@@ -15,547 +15,442 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.cassandra.service.accord;
 
 import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
+import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.ToLongFunction;
 import java.util.stream.Stream;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import accord.utils.Invariants;
-import accord.utils.async.AsyncChains;
-import accord.utils.async.AsyncResult;
-import org.apache.cassandra.utils.ObjectSizes;
+import accord.utils.IntrusiveLinkedList;
+import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.service.accord.AccordCachingState.Status;
 
-import static org.apache.cassandra.service.accord.AccordLoadingState.LoadingState.FAILED;
-import static org.apache.cassandra.service.accord.AccordLoadingState.LoadingState.LOADED;
+import static accord.utils.Invariants.checkState;
+import static java.lang.String.format;
+import static org.apache.cassandra.service.accord.AccordCachingState.Status.EVICTED;
+import static org.apache.cassandra.service.accord.AccordCachingState.Status.FAILED_TO_LOAD;
+import static org.apache.cassandra.service.accord.AccordCachingState.Status.LOADING;
+import static org.apache.cassandra.service.accord.AccordCachingState.Status.SAVING;
 
 /**
  * Cache for AccordCommand and AccordCommandsForKey, available memory is shared between the two object types.
- *
+ * </p>
  * Supports dynamic object sizes. After each acquire/free cycle, the cacheable objects size is recomputed to
  * account for data added/removed during txn processing if it's modified flag is set
  */
-public class AccordStateCache
+public class AccordStateCache extends IntrusiveLinkedList<AccordCachingState<?,?>>
 {
     private static final Logger logger = LoggerFactory.getLogger(AccordStateCache.class);
 
-    public static class Node<K, V> extends AccordLoadingState<K, V>
-    {
-        static final long EMPTY_SIZE = ObjectSizes.measure(new AccordStateCache.Node(null));
-
-        private Node<?, ?> prev;
-        private Node<?, ?> next;
-        private int references = 0;
-        private long lastQueriedEstimatedSizeOnHeap = 0;
-
-        public Node(K key)
-        {
-            super(key);
-        }
-
-        public int referenceCount()
-        {
-            return references;
-        }
-
-        boolean isLoaded()
-        {
-            return state() == LOADED;
-        }
-
-        public boolean isComplete()
-        {
-            switch (state())
-            {
-                case PENDING:
-                case UNINITIALIZED:
-                    return false;
-                case FAILED:
-                case LOADED:
-                    return true;
-                default: throw new UnsupportedOperationException("Unknown state: " + state());
-            }
-        }
-
-        private boolean isInQueue()
-        {
-            return prev != null && next != null;
-        }
-
-        long estimatedSizeOnHeap(ToLongFunction<V> estimator)
-        {
-            long result = EMPTY_SIZE;
-            V v;
-            if (isLoaded() && (v = value()) != null)
-                result += estimator.applyAsLong(v);
-            lastQueriedEstimatedSizeOnHeap = result;
-            return result;
-        }
-
-        long estimatedSizeOnHeapDelta(ToLongFunction<V> estimator)
-        {
-            long prevSize = lastQueriedEstimatedSizeOnHeap;
-            return estimatedSizeOnHeap(estimator) - prevSize;
-        }
-
-        boolean shouldUpdateSize()
-        {
-            return isLoaded() && lastQueriedEstimatedSizeOnHeap == EMPTY_SIZE;
-        }
-
-        void maybeCleanupLoad()
-        {
-            state();
-        }
-
-        @Override
-        public String toString()
-        {
-            return "Node{" + state() +
-                   ", key=" + key() +
-                   ", references=" + references +
-                   "}@" + Integer.toHexString(System.identityHashCode(this));
-        }
-    }
-
     static class Stats
     {
         private long queries;
         private long hits;
         private long misses;
     }
 
-    private static class NamedMap<K, V> extends HashMap<K, V>
-    {
-        final String name;
+    private final Map<Object, AccordCachingState<?, ?>> cache = new HashMap<>();
+    private final HashMap<Class<?>, Instance<?, ?, ?>> instances = new HashMap<>();
 
-        public NamedMap(String name)
-        {
-            this.name = name;
-        }
-    }
-
-    private final Map<Object, Node<?, ?>> cache = new HashMap<>();
-    private final Set<Instance<?, ?, ?>> instances = new HashSet<>();
-
-    private final NamedMap<Object, AsyncResult<Void>> saveResults = new NamedMap<>("saveResults");
+    private final ExecutorPlus loadExecutor, saveExecutor;
 
     private int unreferenced = 0;
-    Node<?, ?> head;
-    Node<?, ?> tail;
     private long maxSizeInBytes;
     private long bytesCached = 0;
     private final Stats stats = new Stats();
 
-    public AccordStateCache(long maxSizeInBytes)
+    public AccordStateCache(ExecutorPlus loadExecutor, ExecutorPlus saveExecutor, long maxSizeInBytes)
     {
+        this.loadExecutor = loadExecutor;
+        this.saveExecutor = saveExecutor;
         this.maxSizeInBytes = maxSizeInBytes;
     }
 
     public void setMaxSize(long size)
     {
         maxSizeInBytes = size;
-        maybeEvict();
+        maybeEvictSomeNodes();
     }
 
     public long getMaxSize()
     {
         return maxSizeInBytes;
     }
 
-    @VisibleForTesting
-    public void clear()
-    {
-        head = tail = null;
-        cache.clear();
-        saveResults.clear();
-    }
-
-    @VisibleForTesting
-    public Map<Object, AsyncResult<Void>> saveResults()
-    {
-        return saveResults;
-    }
-
-    private void unlink(Node<?, ?> node)
+    private void unlink(AccordCachingState<?, ?> node)
     {
-        Node<?, ?> prev = node.prev;
-        Node<?, ?> next = node.next;
-
-        if (prev == null)
-        {
-            Preconditions.checkState(head == node, "previous is null but the head isnt the provided node!");
-            head = next;
-        }
-        else
-        {
-            prev.next = next;
-        }
-
-        if (next == null)
-        {
-            Preconditions.checkState(tail == node, "next is null but the tail isnt the provided node!");
-            tail = prev;
-        }
-        else
-        {
-            next.prev = prev;
-        }
-
-        node.prev = null;
-        node.next = null;
+        node.unlink();
         unreferenced--;
     }
 
-    private void push(Node<?, ?> node)
+    private void link(AccordCachingState<?, ?> node)
     {
-        if (head != null)
-        {
-            node.prev = null;
-            node.next = head;
-            head.prev = node;
-            head = node;
-        }
-        else
-        {
-            head = node;
-            tail = node;
-        }
+        addLast(node);
         unreferenced++;
     }
 
-    private <K, V> void updateSize(Node<K, V> node, ToLongFunction<V> estimator)
-    {
-        bytesCached += node.estimatedSizeOnHeapDelta(estimator);
-    }
-
-    // don't evict if there's an outstanding save result. If an item is evicted then reloaded
-    // before it's mutation is applied, out of date info will be loaded
-    private boolean canEvict(Node<?, ?> node)
+    @SuppressWarnings("unchecked")
+    private <K, V> void maybeUpdateSize(AccordCachingState<?, ?> node, ToLongFunction<?> estimator)
     {
-        Invariants.checkState(node.references == 0);
-        return node.state() == FAILED || !hasActiveAsyncResult(saveResults, node.key());
+        if (node.shouldUpdateSize())
+            bytesCached += ((AccordCachingState<K, V>) node).estimatedSizeOnHeapDelta((ToLongFunction<V>) estimator);
     }
 
-    private void maybeEvict()
+    /*
+     * Roughly respects LRU semantics when evicting. Might consider prioritising keeping MODIFIED nodes around
+     * for longer to maximise the chances of hitting system tables fewer times (or not at all).
+     */
+    private void maybeEvictSomeNodes()
     {
         if (bytesCached <= maxSizeInBytes)
             return;
 
-        Node<?, ?> current = tail;
-        while (current != null && bytesCached > maxSizeInBytes)
+        Iterator<AccordCachingState<?, ?>> iter = this.iterator();
+        while (iter.hasNext() && bytesCached > maxSizeInBytes)
         {
-            Node<?, ?> evict = current;
-            current = current.prev;
-
-            // TODO (expected, efficiency): can this be reworked so we're not skipping unevictable nodes everytime we try to evict?
-            if (!canEvict(evict))
-                continue;
+            AccordCachingState<?, ?> node = iter.next();
+            checkState(node.references == 0);
 
-            evict(evict, true);
+            /*
+             * TODO (expected, efficiency):
+             *    can this be reworked so we're not skipping unevictable nodes everytime we try to evict?
+             */
+            Status status = node.status(); // status() call completes (if completeable)
+            switch (status)
+            {
+                default: throw new IllegalStateException("Unhandled status " + status);
+                case LOADED:
+                    unlink(node);
+                    evict(node);
+                    break;
+                case MODIFIED:
+                    // schedule a save to disk, keep linked and in the cache map
+                    Instance<?, ?, ?> instance = instanceForNode(node);
+                    node.save(saveExecutor, instance.saveFunction);
+                    maybeUpdateSize(node, instance.heapEstimator);
+                    break;
+                case SAVING:
+                    // skip over until completes to LOADED or FAILED_TO_SAVE
+                    break;
+                case FAILED_TO_SAVE:
+                    // permanently unlink, but keep in the map; consider panicking instead when this happens

Review Comment:
   Sure, converted to a TODO.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] iamaleksey commented on a diff in pull request #2395: CEP-15: Convert AccordStateCache cache from write-through to write-back

Posted by "iamaleksey (via GitHub)" <gi...@apache.org>.
iamaleksey commented on code in PR #2395:
URL: https://github.com/apache/cassandra/pull/2395#discussion_r1232315862


##########
src/java/org/apache/cassandra/service/accord/AccordStateCache.java:
##########
@@ -15,547 +15,442 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.cassandra.service.accord;
 
 import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
+import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.ToLongFunction;
 import java.util.stream.Stream;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import accord.utils.Invariants;
-import accord.utils.async.AsyncChains;
-import accord.utils.async.AsyncResult;
-import org.apache.cassandra.utils.ObjectSizes;
+import accord.utils.IntrusiveLinkedList;
+import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.service.accord.AccordCachingState.Status;
 
-import static org.apache.cassandra.service.accord.AccordLoadingState.LoadingState.FAILED;
-import static org.apache.cassandra.service.accord.AccordLoadingState.LoadingState.LOADED;
+import static accord.utils.Invariants.checkState;
+import static java.lang.String.format;
+import static org.apache.cassandra.service.accord.AccordCachingState.Status.EVICTED;
+import static org.apache.cassandra.service.accord.AccordCachingState.Status.FAILED_TO_LOAD;
+import static org.apache.cassandra.service.accord.AccordCachingState.Status.LOADING;
+import static org.apache.cassandra.service.accord.AccordCachingState.Status.SAVING;
 
 /**
  * Cache for AccordCommand and AccordCommandsForKey, available memory is shared between the two object types.
- *
+ * </p>
  * Supports dynamic object sizes. After each acquire/free cycle, the cacheable objects size is recomputed to
  * account for data added/removed during txn processing if it's modified flag is set
  */
-public class AccordStateCache
+public class AccordStateCache extends IntrusiveLinkedList<AccordCachingState<?,?>>
 {
     private static final Logger logger = LoggerFactory.getLogger(AccordStateCache.class);
 
-    public static class Node<K, V> extends AccordLoadingState<K, V>
-    {
-        static final long EMPTY_SIZE = ObjectSizes.measure(new AccordStateCache.Node(null));
-
-        private Node<?, ?> prev;
-        private Node<?, ?> next;
-        private int references = 0;
-        private long lastQueriedEstimatedSizeOnHeap = 0;
-
-        public Node(K key)
-        {
-            super(key);
-        }
-
-        public int referenceCount()
-        {
-            return references;
-        }
-
-        boolean isLoaded()
-        {
-            return state() == LOADED;
-        }
-
-        public boolean isComplete()
-        {
-            switch (state())
-            {
-                case PENDING:
-                case UNINITIALIZED:
-                    return false;
-                case FAILED:
-                case LOADED:
-                    return true;
-                default: throw new UnsupportedOperationException("Unknown state: " + state());
-            }
-        }
-
-        private boolean isInQueue()
-        {
-            return prev != null && next != null;
-        }
-
-        long estimatedSizeOnHeap(ToLongFunction<V> estimator)
-        {
-            long result = EMPTY_SIZE;
-            V v;
-            if (isLoaded() && (v = value()) != null)
-                result += estimator.applyAsLong(v);
-            lastQueriedEstimatedSizeOnHeap = result;
-            return result;
-        }
-
-        long estimatedSizeOnHeapDelta(ToLongFunction<V> estimator)
-        {
-            long prevSize = lastQueriedEstimatedSizeOnHeap;
-            return estimatedSizeOnHeap(estimator) - prevSize;
-        }
-
-        boolean shouldUpdateSize()
-        {
-            return isLoaded() && lastQueriedEstimatedSizeOnHeap == EMPTY_SIZE;
-        }
-
-        void maybeCleanupLoad()
-        {
-            state();
-        }
-
-        @Override
-        public String toString()
-        {
-            return "Node{" + state() +
-                   ", key=" + key() +
-                   ", references=" + references +
-                   "}@" + Integer.toHexString(System.identityHashCode(this));
-        }
-    }
-
     static class Stats
     {
         private long queries;
         private long hits;
         private long misses;
     }
 
-    private static class NamedMap<K, V> extends HashMap<K, V>
-    {
-        final String name;
+    private final Map<Object, AccordCachingState<?, ?>> cache = new HashMap<>();
+    private final HashMap<Class<?>, Instance<?, ?, ?>> instances = new HashMap<>();
 
-        public NamedMap(String name)
-        {
-            this.name = name;
-        }
-    }
-
-    private final Map<Object, Node<?, ?>> cache = new HashMap<>();
-    private final Set<Instance<?, ?, ?>> instances = new HashSet<>();
-
-    private final NamedMap<Object, AsyncResult<Void>> saveResults = new NamedMap<>("saveResults");
+    private final ExecutorPlus loadExecutor, saveExecutor;
 
     private int unreferenced = 0;
-    Node<?, ?> head;
-    Node<?, ?> tail;
     private long maxSizeInBytes;
     private long bytesCached = 0;
     private final Stats stats = new Stats();
 
-    public AccordStateCache(long maxSizeInBytes)
+    public AccordStateCache(ExecutorPlus loadExecutor, ExecutorPlus saveExecutor, long maxSizeInBytes)
     {
+        this.loadExecutor = loadExecutor;
+        this.saveExecutor = saveExecutor;
         this.maxSizeInBytes = maxSizeInBytes;
     }
 
     public void setMaxSize(long size)
     {
         maxSizeInBytes = size;
-        maybeEvict();
+        maybeEvictSomeNodes();
     }
 
     public long getMaxSize()
     {
         return maxSizeInBytes;
     }
 
-    @VisibleForTesting
-    public void clear()
-    {
-        head = tail = null;
-        cache.clear();
-        saveResults.clear();
-    }
-
-    @VisibleForTesting
-    public Map<Object, AsyncResult<Void>> saveResults()
-    {
-        return saveResults;
-    }
-
-    private void unlink(Node<?, ?> node)
+    private void unlink(AccordCachingState<?, ?> node)
     {
-        Node<?, ?> prev = node.prev;
-        Node<?, ?> next = node.next;
-
-        if (prev == null)
-        {
-            Preconditions.checkState(head == node, "previous is null but the head isnt the provided node!");
-            head = next;
-        }
-        else
-        {
-            prev.next = next;
-        }
-
-        if (next == null)
-        {
-            Preconditions.checkState(tail == node, "next is null but the tail isnt the provided node!");
-            tail = prev;
-        }
-        else
-        {
-            next.prev = prev;
-        }
-
-        node.prev = null;
-        node.next = null;
+        node.unlink();
         unreferenced--;
     }
 
-    private void push(Node<?, ?> node)
+    private void link(AccordCachingState<?, ?> node)
     {
-        if (head != null)
-        {
-            node.prev = null;
-            node.next = head;
-            head.prev = node;
-            head = node;
-        }
-        else
-        {
-            head = node;
-            tail = node;
-        }
+        addLast(node);
         unreferenced++;
     }
 
-    private <K, V> void updateSize(Node<K, V> node, ToLongFunction<V> estimator)
-    {
-        bytesCached += node.estimatedSizeOnHeapDelta(estimator);
-    }
-
-    // don't evict if there's an outstanding save result. If an item is evicted then reloaded
-    // before it's mutation is applied, out of date info will be loaded
-    private boolean canEvict(Node<?, ?> node)
+    @SuppressWarnings("unchecked")
+    private <K, V> void maybeUpdateSize(AccordCachingState<?, ?> node, ToLongFunction<?> estimator)
     {
-        Invariants.checkState(node.references == 0);
-        return node.state() == FAILED || !hasActiveAsyncResult(saveResults, node.key());
+        if (node.shouldUpdateSize())
+            bytesCached += ((AccordCachingState<K, V>) node).estimatedSizeOnHeapDelta((ToLongFunction<V>) estimator);
     }
 
-    private void maybeEvict()
+    /*
+     * Roughly respects LRU semantics when evicting. Might consider prioritising keeping MODIFIED nodes around
+     * for longer to maximise the chances of hitting system tables fewer times (or not at all).
+     */
+    private void maybeEvictSomeNodes()
     {
         if (bytesCached <= maxSizeInBytes)
             return;
 
-        Node<?, ?> current = tail;
-        while (current != null && bytesCached > maxSizeInBytes)
+        Iterator<AccordCachingState<?, ?>> iter = this.iterator();

Review Comment:
   This ended up being unnecessary (see comments on the linked cassandra-accord PR).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] iamaleksey closed pull request #2395: CEP-15: Convert AccordStateCache cache from write-through to write-back

Posted by "iamaleksey (via GitHub)" <gi...@apache.org>.
iamaleksey closed pull request #2395: CEP-15: Convert AccordStateCache cache from write-through to write-back
URL: https://github.com/apache/cassandra/pull/2395


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] iamaleksey commented on a diff in pull request #2395: CEP-15: Convert AccordStateCache cache from write-through to write-back

Posted by "iamaleksey (via GitHub)" <gi...@apache.org>.
iamaleksey commented on code in PR #2395:
URL: https://github.com/apache/cassandra/pull/2395#discussion_r1231307214


##########
src/java/org/apache/cassandra/service/accord/AccordStateCache.java:
##########
@@ -15,547 +15,442 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.cassandra.service.accord;
 
 import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
+import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.ToLongFunction;
 import java.util.stream.Stream;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import accord.utils.Invariants;
-import accord.utils.async.AsyncChains;
-import accord.utils.async.AsyncResult;
-import org.apache.cassandra.utils.ObjectSizes;
+import accord.utils.IntrusiveLinkedList;
+import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.service.accord.AccordCachingState.Status;
 
-import static org.apache.cassandra.service.accord.AccordLoadingState.LoadingState.FAILED;
-import static org.apache.cassandra.service.accord.AccordLoadingState.LoadingState.LOADED;
+import static accord.utils.Invariants.checkState;
+import static java.lang.String.format;
+import static org.apache.cassandra.service.accord.AccordCachingState.Status.EVICTED;
+import static org.apache.cassandra.service.accord.AccordCachingState.Status.FAILED_TO_LOAD;
+import static org.apache.cassandra.service.accord.AccordCachingState.Status.LOADING;
+import static org.apache.cassandra.service.accord.AccordCachingState.Status.SAVING;
 
 /**
  * Cache for AccordCommand and AccordCommandsForKey, available memory is shared between the two object types.
- *
+ * </p>
  * Supports dynamic object sizes. After each acquire/free cycle, the cacheable objects size is recomputed to
  * account for data added/removed during txn processing if it's modified flag is set
  */
-public class AccordStateCache
+public class AccordStateCache extends IntrusiveLinkedList<AccordCachingState<?,?>>
 {
     private static final Logger logger = LoggerFactory.getLogger(AccordStateCache.class);
 
-    public static class Node<K, V> extends AccordLoadingState<K, V>
-    {
-        static final long EMPTY_SIZE = ObjectSizes.measure(new AccordStateCache.Node(null));
-
-        private Node<?, ?> prev;
-        private Node<?, ?> next;
-        private int references = 0;
-        private long lastQueriedEstimatedSizeOnHeap = 0;
-
-        public Node(K key)
-        {
-            super(key);
-        }
-
-        public int referenceCount()
-        {
-            return references;
-        }
-
-        boolean isLoaded()
-        {
-            return state() == LOADED;
-        }
-
-        public boolean isComplete()
-        {
-            switch (state())
-            {
-                case PENDING:
-                case UNINITIALIZED:
-                    return false;
-                case FAILED:
-                case LOADED:
-                    return true;
-                default: throw new UnsupportedOperationException("Unknown state: " + state());
-            }
-        }
-
-        private boolean isInQueue()
-        {
-            return prev != null && next != null;
-        }
-
-        long estimatedSizeOnHeap(ToLongFunction<V> estimator)
-        {
-            long result = EMPTY_SIZE;
-            V v;
-            if (isLoaded() && (v = value()) != null)
-                result += estimator.applyAsLong(v);
-            lastQueriedEstimatedSizeOnHeap = result;
-            return result;
-        }
-
-        long estimatedSizeOnHeapDelta(ToLongFunction<V> estimator)
-        {
-            long prevSize = lastQueriedEstimatedSizeOnHeap;
-            return estimatedSizeOnHeap(estimator) - prevSize;
-        }
-
-        boolean shouldUpdateSize()
-        {
-            return isLoaded() && lastQueriedEstimatedSizeOnHeap == EMPTY_SIZE;
-        }
-
-        void maybeCleanupLoad()
-        {
-            state();
-        }
-
-        @Override
-        public String toString()
-        {
-            return "Node{" + state() +
-                   ", key=" + key() +
-                   ", references=" + references +
-                   "}@" + Integer.toHexString(System.identityHashCode(this));
-        }
-    }
-
     static class Stats
     {
         private long queries;
         private long hits;
         private long misses;
     }
 
-    private static class NamedMap<K, V> extends HashMap<K, V>
-    {
-        final String name;
+    private final Map<Object, AccordCachingState<?, ?>> cache = new HashMap<>();
+    private final HashMap<Class<?>, Instance<?, ?, ?>> instances = new HashMap<>();
 
-        public NamedMap(String name)
-        {
-            this.name = name;
-        }
-    }
-
-    private final Map<Object, Node<?, ?>> cache = new HashMap<>();
-    private final Set<Instance<?, ?, ?>> instances = new HashSet<>();
-
-    private final NamedMap<Object, AsyncResult<Void>> saveResults = new NamedMap<>("saveResults");
+    private final ExecutorPlus loadExecutor, saveExecutor;
 
     private int unreferenced = 0;
-    Node<?, ?> head;
-    Node<?, ?> tail;
     private long maxSizeInBytes;
     private long bytesCached = 0;
     private final Stats stats = new Stats();
 
-    public AccordStateCache(long maxSizeInBytes)
+    public AccordStateCache(ExecutorPlus loadExecutor, ExecutorPlus saveExecutor, long maxSizeInBytes)
     {
+        this.loadExecutor = loadExecutor;
+        this.saveExecutor = saveExecutor;
         this.maxSizeInBytes = maxSizeInBytes;
     }
 
     public void setMaxSize(long size)
     {
         maxSizeInBytes = size;
-        maybeEvict();
+        maybeEvictSomeNodes();
     }
 
     public long getMaxSize()
     {
         return maxSizeInBytes;
     }
 
-    @VisibleForTesting
-    public void clear()
-    {
-        head = tail = null;
-        cache.clear();
-        saveResults.clear();
-    }
-
-    @VisibleForTesting
-    public Map<Object, AsyncResult<Void>> saveResults()
-    {
-        return saveResults;
-    }
-
-    private void unlink(Node<?, ?> node)
+    private void unlink(AccordCachingState<?, ?> node)
     {
-        Node<?, ?> prev = node.prev;
-        Node<?, ?> next = node.next;
-
-        if (prev == null)
-        {
-            Preconditions.checkState(head == node, "previous is null but the head isnt the provided node!");
-            head = next;
-        }
-        else
-        {
-            prev.next = next;
-        }
-
-        if (next == null)
-        {
-            Preconditions.checkState(tail == node, "next is null but the tail isnt the provided node!");
-            tail = prev;
-        }
-        else
-        {
-            next.prev = prev;
-        }
-
-        node.prev = null;
-        node.next = null;
+        node.unlink();
         unreferenced--;
     }
 
-    private void push(Node<?, ?> node)
+    private void link(AccordCachingState<?, ?> node)
     {
-        if (head != null)
-        {
-            node.prev = null;
-            node.next = head;
-            head.prev = node;
-            head = node;
-        }
-        else
-        {
-            head = node;
-            tail = node;
-        }
+        addLast(node);
         unreferenced++;
     }
 
-    private <K, V> void updateSize(Node<K, V> node, ToLongFunction<V> estimator)
-    {
-        bytesCached += node.estimatedSizeOnHeapDelta(estimator);
-    }
-
-    // don't evict if there's an outstanding save result. If an item is evicted then reloaded
-    // before it's mutation is applied, out of date info will be loaded
-    private boolean canEvict(Node<?, ?> node)
+    @SuppressWarnings("unchecked")
+    private <K, V> void maybeUpdateSize(AccordCachingState<?, ?> node, ToLongFunction<?> estimator)
     {
-        Invariants.checkState(node.references == 0);
-        return node.state() == FAILED || !hasActiveAsyncResult(saveResults, node.key());
+        if (node.shouldUpdateSize())
+            bytesCached += ((AccordCachingState<K, V>) node).estimatedSizeOnHeapDelta((ToLongFunction<V>) estimator);
     }
 
-    private void maybeEvict()
+    /*
+     * Roughly respects LRU semantics when evicting. Might consider prioritising keeping MODIFIED nodes around
+     * for longer to maximise the chances of hitting system tables fewer times (or not at all).
+     */
+    private void maybeEvictSomeNodes()
     {
         if (bytesCached <= maxSizeInBytes)
             return;
 
-        Node<?, ?> current = tail;
-        while (current != null && bytesCached > maxSizeInBytes)
+        Iterator<AccordCachingState<?, ?>> iter = this.iterator();
+        while (iter.hasNext() && bytesCached > maxSizeInBytes)
         {
-            Node<?, ?> evict = current;
-            current = current.prev;
-
-            // TODO (expected, efficiency): can this be reworked so we're not skipping unevictable nodes everytime we try to evict?
-            if (!canEvict(evict))
-                continue;
+            AccordCachingState<?, ?> node = iter.next();
+            checkState(node.references == 0);
 
-            evict(evict, true);
+            /*
+             * TODO (expected, efficiency):
+             *    can this be reworked so we're not skipping unevictable nodes everytime we try to evict?
+             */
+            Status status = node.status(); // status() call completes (if completeable)
+            switch (status)
+            {
+                default: throw new IllegalStateException("Unhandled status " + status);
+                case LOADED:
+                    unlink(node);
+                    evict(node);
+                    break;
+                case MODIFIED:
+                    // schedule a save to disk, keep linked and in the cache map
+                    Instance<?, ?, ?> instance = instanceForNode(node);
+                    node.save(saveExecutor, instance.saveFunction);
+                    maybeUpdateSize(node, instance.heapEstimator);
+                    break;
+                case SAVING:
+                    // skip over until completes to LOADED or FAILED_TO_SAVE

Review Comment:
   I think it's sufficiently preserved, above, just above the switch statement? Unless you mean something else.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] iamaleksey commented on pull request #2395: CEP-15: Convert AccordStateCache cache from write-through to write-back

Posted by "iamaleksey (via GitHub)" <gi...@apache.org>.
iamaleksey commented on PR #2395:
URL: https://github.com/apache/cassandra/pull/2395#issuecomment-1594761333

   Merged manually, cheers.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra] bdeggleston commented on a diff in pull request #2395: CEP-15: Convert AccordStateCache cache from write-through to write-back

Posted by "bdeggleston (via GitHub)" <gi...@apache.org>.
bdeggleston commented on code in PR #2395:
URL: https://github.com/apache/cassandra/pull/2395#discussion_r1231221709


##########
src/java/org/apache/cassandra/service/accord/AccordStateCache.java:
##########
@@ -15,547 +15,442 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.cassandra.service.accord;
 
 import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
+import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.ToLongFunction;
 import java.util.stream.Stream;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import accord.utils.Invariants;
-import accord.utils.async.AsyncChains;
-import accord.utils.async.AsyncResult;
-import org.apache.cassandra.utils.ObjectSizes;
+import accord.utils.IntrusiveLinkedList;
+import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.service.accord.AccordCachingState.Status;
 
-import static org.apache.cassandra.service.accord.AccordLoadingState.LoadingState.FAILED;
-import static org.apache.cassandra.service.accord.AccordLoadingState.LoadingState.LOADED;
+import static accord.utils.Invariants.checkState;
+import static java.lang.String.format;
+import static org.apache.cassandra.service.accord.AccordCachingState.Status.EVICTED;
+import static org.apache.cassandra.service.accord.AccordCachingState.Status.FAILED_TO_LOAD;
+import static org.apache.cassandra.service.accord.AccordCachingState.Status.LOADING;
+import static org.apache.cassandra.service.accord.AccordCachingState.Status.SAVING;
 
 /**
  * Cache for AccordCommand and AccordCommandsForKey, available memory is shared between the two object types.
- *
+ * </p>
  * Supports dynamic object sizes. After each acquire/free cycle, the cacheable objects size is recomputed to
  * account for data added/removed during txn processing if it's modified flag is set
  */
-public class AccordStateCache
+public class AccordStateCache extends IntrusiveLinkedList<AccordCachingState<?,?>>
 {
     private static final Logger logger = LoggerFactory.getLogger(AccordStateCache.class);
 
-    public static class Node<K, V> extends AccordLoadingState<K, V>
-    {
-        static final long EMPTY_SIZE = ObjectSizes.measure(new AccordStateCache.Node(null));
-
-        private Node<?, ?> prev;
-        private Node<?, ?> next;
-        private int references = 0;
-        private long lastQueriedEstimatedSizeOnHeap = 0;
-
-        public Node(K key)
-        {
-            super(key);
-        }
-
-        public int referenceCount()
-        {
-            return references;
-        }
-
-        boolean isLoaded()
-        {
-            return state() == LOADED;
-        }
-
-        public boolean isComplete()
-        {
-            switch (state())
-            {
-                case PENDING:
-                case UNINITIALIZED:
-                    return false;
-                case FAILED:
-                case LOADED:
-                    return true;
-                default: throw new UnsupportedOperationException("Unknown state: " + state());
-            }
-        }
-
-        private boolean isInQueue()
-        {
-            return prev != null && next != null;
-        }
-
-        long estimatedSizeOnHeap(ToLongFunction<V> estimator)
-        {
-            long result = EMPTY_SIZE;
-            V v;
-            if (isLoaded() && (v = value()) != null)
-                result += estimator.applyAsLong(v);
-            lastQueriedEstimatedSizeOnHeap = result;
-            return result;
-        }
-
-        long estimatedSizeOnHeapDelta(ToLongFunction<V> estimator)
-        {
-            long prevSize = lastQueriedEstimatedSizeOnHeap;
-            return estimatedSizeOnHeap(estimator) - prevSize;
-        }
-
-        boolean shouldUpdateSize()
-        {
-            return isLoaded() && lastQueriedEstimatedSizeOnHeap == EMPTY_SIZE;
-        }
-
-        void maybeCleanupLoad()
-        {
-            state();
-        }
-
-        @Override
-        public String toString()
-        {
-            return "Node{" + state() +
-                   ", key=" + key() +
-                   ", references=" + references +
-                   "}@" + Integer.toHexString(System.identityHashCode(this));
-        }
-    }
-
     static class Stats
     {
         private long queries;
         private long hits;
         private long misses;
     }
 
-    private static class NamedMap<K, V> extends HashMap<K, V>
-    {
-        final String name;
+    private final Map<Object, AccordCachingState<?, ?>> cache = new HashMap<>();
+    private final HashMap<Class<?>, Instance<?, ?, ?>> instances = new HashMap<>();
 
-        public NamedMap(String name)
-        {
-            this.name = name;
-        }
-    }
-
-    private final Map<Object, Node<?, ?>> cache = new HashMap<>();
-    private final Set<Instance<?, ?, ?>> instances = new HashSet<>();
-
-    private final NamedMap<Object, AsyncResult<Void>> saveResults = new NamedMap<>("saveResults");
+    private final ExecutorPlus loadExecutor, saveExecutor;
 
     private int unreferenced = 0;
-    Node<?, ?> head;
-    Node<?, ?> tail;
     private long maxSizeInBytes;
     private long bytesCached = 0;
     private final Stats stats = new Stats();
 
-    public AccordStateCache(long maxSizeInBytes)
+    public AccordStateCache(ExecutorPlus loadExecutor, ExecutorPlus saveExecutor, long maxSizeInBytes)
     {
+        this.loadExecutor = loadExecutor;
+        this.saveExecutor = saveExecutor;
         this.maxSizeInBytes = maxSizeInBytes;
     }
 
     public void setMaxSize(long size)
     {
         maxSizeInBytes = size;
-        maybeEvict();
+        maybeEvictSomeNodes();
     }
 
     public long getMaxSize()
     {
         return maxSizeInBytes;
     }
 
-    @VisibleForTesting
-    public void clear()
-    {
-        head = tail = null;
-        cache.clear();
-        saveResults.clear();
-    }
-
-    @VisibleForTesting
-    public Map<Object, AsyncResult<Void>> saveResults()
-    {
-        return saveResults;
-    }
-
-    private void unlink(Node<?, ?> node)
+    private void unlink(AccordCachingState<?, ?> node)
     {
-        Node<?, ?> prev = node.prev;
-        Node<?, ?> next = node.next;
-
-        if (prev == null)
-        {
-            Preconditions.checkState(head == node, "previous is null but the head isnt the provided node!");
-            head = next;
-        }
-        else
-        {
-            prev.next = next;
-        }
-
-        if (next == null)
-        {
-            Preconditions.checkState(tail == node, "next is null but the tail isnt the provided node!");
-            tail = prev;
-        }
-        else
-        {
-            next.prev = prev;
-        }
-
-        node.prev = null;
-        node.next = null;
+        node.unlink();
         unreferenced--;
     }
 
-    private void push(Node<?, ?> node)
+    private void link(AccordCachingState<?, ?> node)
     {
-        if (head != null)
-        {
-            node.prev = null;
-            node.next = head;
-            head.prev = node;
-            head = node;
-        }
-        else
-        {
-            head = node;
-            tail = node;
-        }
+        addLast(node);
         unreferenced++;
     }
 
-    private <K, V> void updateSize(Node<K, V> node, ToLongFunction<V> estimator)
-    {
-        bytesCached += node.estimatedSizeOnHeapDelta(estimator);
-    }
-
-    // don't evict if there's an outstanding save result. If an item is evicted then reloaded
-    // before it's mutation is applied, out of date info will be loaded
-    private boolean canEvict(Node<?, ?> node)
+    @SuppressWarnings("unchecked")
+    private <K, V> void maybeUpdateSize(AccordCachingState<?, ?> node, ToLongFunction<?> estimator)
     {
-        Invariants.checkState(node.references == 0);
-        return node.state() == FAILED || !hasActiveAsyncResult(saveResults, node.key());
+        if (node.shouldUpdateSize())
+            bytesCached += ((AccordCachingState<K, V>) node).estimatedSizeOnHeapDelta((ToLongFunction<V>) estimator);
     }
 
-    private void maybeEvict()
+    /*
+     * Roughly respects LRU semantics when evicting. Might consider prioritising keeping MODIFIED nodes around
+     * for longer to maximise the chances of hitting system tables fewer times (or not at all).
+     */
+    private void maybeEvictSomeNodes()
     {
         if (bytesCached <= maxSizeInBytes)
             return;
 
-        Node<?, ?> current = tail;
-        while (current != null && bytesCached > maxSizeInBytes)
+        Iterator<AccordCachingState<?, ?>> iter = this.iterator();

Review Comment:
   Can we avoid allocating an iterator here? This is called all the time. Maybe we could add a peek method and manually iterate through the linked list?



##########
src/java/org/apache/cassandra/service/accord/AccordStateCache.java:
##########
@@ -15,547 +15,442 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.cassandra.service.accord;
 
 import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
+import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.ToLongFunction;
 import java.util.stream.Stream;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import accord.utils.Invariants;
-import accord.utils.async.AsyncChains;
-import accord.utils.async.AsyncResult;
-import org.apache.cassandra.utils.ObjectSizes;
+import accord.utils.IntrusiveLinkedList;
+import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.service.accord.AccordCachingState.Status;
 
-import static org.apache.cassandra.service.accord.AccordLoadingState.LoadingState.FAILED;
-import static org.apache.cassandra.service.accord.AccordLoadingState.LoadingState.LOADED;
+import static accord.utils.Invariants.checkState;
+import static java.lang.String.format;
+import static org.apache.cassandra.service.accord.AccordCachingState.Status.EVICTED;
+import static org.apache.cassandra.service.accord.AccordCachingState.Status.FAILED_TO_LOAD;
+import static org.apache.cassandra.service.accord.AccordCachingState.Status.LOADING;
+import static org.apache.cassandra.service.accord.AccordCachingState.Status.SAVING;
 
 /**
  * Cache for AccordCommand and AccordCommandsForKey, available memory is shared between the two object types.
- *
+ * </p>
  * Supports dynamic object sizes. After each acquire/free cycle, the cacheable objects size is recomputed to
  * account for data added/removed during txn processing if it's modified flag is set
  */
-public class AccordStateCache
+public class AccordStateCache extends IntrusiveLinkedList<AccordCachingState<?,?>>
 {
     private static final Logger logger = LoggerFactory.getLogger(AccordStateCache.class);
 
-    public static class Node<K, V> extends AccordLoadingState<K, V>
-    {
-        static final long EMPTY_SIZE = ObjectSizes.measure(new AccordStateCache.Node(null));
-
-        private Node<?, ?> prev;
-        private Node<?, ?> next;
-        private int references = 0;
-        private long lastQueriedEstimatedSizeOnHeap = 0;
-
-        public Node(K key)
-        {
-            super(key);
-        }
-
-        public int referenceCount()
-        {
-            return references;
-        }
-
-        boolean isLoaded()
-        {
-            return state() == LOADED;
-        }
-
-        public boolean isComplete()
-        {
-            switch (state())
-            {
-                case PENDING:
-                case UNINITIALIZED:
-                    return false;
-                case FAILED:
-                case LOADED:
-                    return true;
-                default: throw new UnsupportedOperationException("Unknown state: " + state());
-            }
-        }
-
-        private boolean isInQueue()
-        {
-            return prev != null && next != null;
-        }
-
-        long estimatedSizeOnHeap(ToLongFunction<V> estimator)
-        {
-            long result = EMPTY_SIZE;
-            V v;
-            if (isLoaded() && (v = value()) != null)
-                result += estimator.applyAsLong(v);
-            lastQueriedEstimatedSizeOnHeap = result;
-            return result;
-        }
-
-        long estimatedSizeOnHeapDelta(ToLongFunction<V> estimator)
-        {
-            long prevSize = lastQueriedEstimatedSizeOnHeap;
-            return estimatedSizeOnHeap(estimator) - prevSize;
-        }
-
-        boolean shouldUpdateSize()
-        {
-            return isLoaded() && lastQueriedEstimatedSizeOnHeap == EMPTY_SIZE;
-        }
-
-        void maybeCleanupLoad()
-        {
-            state();
-        }
-
-        @Override
-        public String toString()
-        {
-            return "Node{" + state() +
-                   ", key=" + key() +
-                   ", references=" + references +
-                   "}@" + Integer.toHexString(System.identityHashCode(this));
-        }
-    }
-
     static class Stats
     {
         private long queries;
         private long hits;
         private long misses;
     }
 
-    private static class NamedMap<K, V> extends HashMap<K, V>
-    {
-        final String name;
+    private final Map<Object, AccordCachingState<?, ?>> cache = new HashMap<>();
+    private final HashMap<Class<?>, Instance<?, ?, ?>> instances = new HashMap<>();
 
-        public NamedMap(String name)
-        {
-            this.name = name;
-        }
-    }
-
-    private final Map<Object, Node<?, ?>> cache = new HashMap<>();
-    private final Set<Instance<?, ?, ?>> instances = new HashSet<>();
-
-    private final NamedMap<Object, AsyncResult<Void>> saveResults = new NamedMap<>("saveResults");
+    private final ExecutorPlus loadExecutor, saveExecutor;
 
     private int unreferenced = 0;
-    Node<?, ?> head;
-    Node<?, ?> tail;
     private long maxSizeInBytes;
     private long bytesCached = 0;
     private final Stats stats = new Stats();
 
-    public AccordStateCache(long maxSizeInBytes)
+    public AccordStateCache(ExecutorPlus loadExecutor, ExecutorPlus saveExecutor, long maxSizeInBytes)
     {
+        this.loadExecutor = loadExecutor;
+        this.saveExecutor = saveExecutor;
         this.maxSizeInBytes = maxSizeInBytes;
     }
 
     public void setMaxSize(long size)
     {
         maxSizeInBytes = size;
-        maybeEvict();
+        maybeEvictSomeNodes();
     }
 
     public long getMaxSize()
     {
         return maxSizeInBytes;
     }
 
-    @VisibleForTesting
-    public void clear()
-    {
-        head = tail = null;
-        cache.clear();
-        saveResults.clear();
-    }
-
-    @VisibleForTesting
-    public Map<Object, AsyncResult<Void>> saveResults()
-    {
-        return saveResults;
-    }
-
-    private void unlink(Node<?, ?> node)
+    private void unlink(AccordCachingState<?, ?> node)
     {
-        Node<?, ?> prev = node.prev;
-        Node<?, ?> next = node.next;
-
-        if (prev == null)
-        {
-            Preconditions.checkState(head == node, "previous is null but the head isnt the provided node!");
-            head = next;
-        }
-        else
-        {
-            prev.next = next;
-        }
-
-        if (next == null)
-        {
-            Preconditions.checkState(tail == node, "next is null but the tail isnt the provided node!");
-            tail = prev;
-        }
-        else
-        {
-            next.prev = prev;
-        }
-
-        node.prev = null;
-        node.next = null;
+        node.unlink();
         unreferenced--;
     }
 
-    private void push(Node<?, ?> node)
+    private void link(AccordCachingState<?, ?> node)
     {
-        if (head != null)
-        {
-            node.prev = null;
-            node.next = head;
-            head.prev = node;
-            head = node;
-        }
-        else
-        {
-            head = node;
-            tail = node;
-        }
+        addLast(node);
         unreferenced++;
     }
 
-    private <K, V> void updateSize(Node<K, V> node, ToLongFunction<V> estimator)
-    {
-        bytesCached += node.estimatedSizeOnHeapDelta(estimator);
-    }
-
-    // don't evict if there's an outstanding save result. If an item is evicted then reloaded
-    // before it's mutation is applied, out of date info will be loaded
-    private boolean canEvict(Node<?, ?> node)
+    @SuppressWarnings("unchecked")
+    private <K, V> void maybeUpdateSize(AccordCachingState<?, ?> node, ToLongFunction<?> estimator)
     {
-        Invariants.checkState(node.references == 0);
-        return node.state() == FAILED || !hasActiveAsyncResult(saveResults, node.key());
+        if (node.shouldUpdateSize())
+            bytesCached += ((AccordCachingState<K, V>) node).estimatedSizeOnHeapDelta((ToLongFunction<V>) estimator);
     }
 
-    private void maybeEvict()
+    /*
+     * Roughly respects LRU semantics when evicting. Might consider prioritising keeping MODIFIED nodes around
+     * for longer to maximise the chances of hitting system tables fewer times (or not at all).
+     */
+    private void maybeEvictSomeNodes()
     {
         if (bytesCached <= maxSizeInBytes)
             return;
 
-        Node<?, ?> current = tail;
-        while (current != null && bytesCached > maxSizeInBytes)
+        Iterator<AccordCachingState<?, ?>> iter = this.iterator();
+        while (iter.hasNext() && bytesCached > maxSizeInBytes)
         {
-            Node<?, ?> evict = current;
-            current = current.prev;
-
-            // TODO (expected, efficiency): can this be reworked so we're not skipping unevictable nodes everytime we try to evict?
-            if (!canEvict(evict))
-                continue;
+            AccordCachingState<?, ?> node = iter.next();
+            checkState(node.references == 0);
 
-            evict(evict, true);
+            /*
+             * TODO (expected, efficiency):
+             *    can this be reworked so we're not skipping unevictable nodes everytime we try to evict?
+             */
+            Status status = node.status(); // status() call completes (if completeable)
+            switch (status)
+            {
+                default: throw new IllegalStateException("Unhandled status " + status);
+                case LOADED:
+                    unlink(node);
+                    evict(node);
+                    break;
+                case MODIFIED:
+                    // schedule a save to disk, keep linked and in the cache map
+                    Instance<?, ?, ?> instance = instanceForNode(node);
+                    node.save(saveExecutor, instance.saveFunction);
+                    maybeUpdateSize(node, instance.heapEstimator);
+                    break;
+                case SAVING:
+                    // skip over until completes to LOADED or FAILED_TO_SAVE

Review Comment:
   We should preserve the TODO about skipping unselectable nodes



##########
src/java/org/apache/cassandra/service/accord/AccordStateCache.java:
##########
@@ -15,547 +15,442 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.cassandra.service.accord;
 
 import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
+import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.ToLongFunction;
 import java.util.stream.Stream;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import accord.utils.Invariants;
-import accord.utils.async.AsyncChains;
-import accord.utils.async.AsyncResult;
-import org.apache.cassandra.utils.ObjectSizes;
+import accord.utils.IntrusiveLinkedList;
+import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.service.accord.AccordCachingState.Status;
 
-import static org.apache.cassandra.service.accord.AccordLoadingState.LoadingState.FAILED;
-import static org.apache.cassandra.service.accord.AccordLoadingState.LoadingState.LOADED;
+import static accord.utils.Invariants.checkState;
+import static java.lang.String.format;
+import static org.apache.cassandra.service.accord.AccordCachingState.Status.EVICTED;
+import static org.apache.cassandra.service.accord.AccordCachingState.Status.FAILED_TO_LOAD;
+import static org.apache.cassandra.service.accord.AccordCachingState.Status.LOADING;
+import static org.apache.cassandra.service.accord.AccordCachingState.Status.SAVING;
 
 /**
  * Cache for AccordCommand and AccordCommandsForKey, available memory is shared between the two object types.
- *
+ * </p>
  * Supports dynamic object sizes. After each acquire/free cycle, the cacheable objects size is recomputed to
  * account for data added/removed during txn processing if it's modified flag is set
  */
-public class AccordStateCache
+public class AccordStateCache extends IntrusiveLinkedList<AccordCachingState<?,?>>
 {
     private static final Logger logger = LoggerFactory.getLogger(AccordStateCache.class);
 
-    public static class Node<K, V> extends AccordLoadingState<K, V>
-    {
-        static final long EMPTY_SIZE = ObjectSizes.measure(new AccordStateCache.Node(null));
-
-        private Node<?, ?> prev;
-        private Node<?, ?> next;
-        private int references = 0;
-        private long lastQueriedEstimatedSizeOnHeap = 0;
-
-        public Node(K key)
-        {
-            super(key);
-        }
-
-        public int referenceCount()
-        {
-            return references;
-        }
-
-        boolean isLoaded()
-        {
-            return state() == LOADED;
-        }
-
-        public boolean isComplete()
-        {
-            switch (state())
-            {
-                case PENDING:
-                case UNINITIALIZED:
-                    return false;
-                case FAILED:
-                case LOADED:
-                    return true;
-                default: throw new UnsupportedOperationException("Unknown state: " + state());
-            }
-        }
-
-        private boolean isInQueue()
-        {
-            return prev != null && next != null;
-        }
-
-        long estimatedSizeOnHeap(ToLongFunction<V> estimator)
-        {
-            long result = EMPTY_SIZE;
-            V v;
-            if (isLoaded() && (v = value()) != null)
-                result += estimator.applyAsLong(v);
-            lastQueriedEstimatedSizeOnHeap = result;
-            return result;
-        }
-
-        long estimatedSizeOnHeapDelta(ToLongFunction<V> estimator)
-        {
-            long prevSize = lastQueriedEstimatedSizeOnHeap;
-            return estimatedSizeOnHeap(estimator) - prevSize;
-        }
-
-        boolean shouldUpdateSize()
-        {
-            return isLoaded() && lastQueriedEstimatedSizeOnHeap == EMPTY_SIZE;
-        }
-
-        void maybeCleanupLoad()
-        {
-            state();
-        }
-
-        @Override
-        public String toString()
-        {
-            return "Node{" + state() +
-                   ", key=" + key() +
-                   ", references=" + references +
-                   "}@" + Integer.toHexString(System.identityHashCode(this));
-        }
-    }
-
     static class Stats
     {
         private long queries;
         private long hits;
         private long misses;
     }
 
-    private static class NamedMap<K, V> extends HashMap<K, V>
-    {
-        final String name;
+    private final Map<Object, AccordCachingState<?, ?>> cache = new HashMap<>();
+    private final HashMap<Class<?>, Instance<?, ?, ?>> instances = new HashMap<>();
 
-        public NamedMap(String name)
-        {
-            this.name = name;
-        }
-    }
-
-    private final Map<Object, Node<?, ?>> cache = new HashMap<>();
-    private final Set<Instance<?, ?, ?>> instances = new HashSet<>();
-
-    private final NamedMap<Object, AsyncResult<Void>> saveResults = new NamedMap<>("saveResults");
+    private final ExecutorPlus loadExecutor, saveExecutor;
 
     private int unreferenced = 0;
-    Node<?, ?> head;
-    Node<?, ?> tail;
     private long maxSizeInBytes;
     private long bytesCached = 0;
     private final Stats stats = new Stats();
 
-    public AccordStateCache(long maxSizeInBytes)
+    public AccordStateCache(ExecutorPlus loadExecutor, ExecutorPlus saveExecutor, long maxSizeInBytes)
     {
+        this.loadExecutor = loadExecutor;
+        this.saveExecutor = saveExecutor;
         this.maxSizeInBytes = maxSizeInBytes;
     }
 
     public void setMaxSize(long size)
     {
         maxSizeInBytes = size;
-        maybeEvict();
+        maybeEvictSomeNodes();
     }
 
     public long getMaxSize()
     {
         return maxSizeInBytes;
     }
 
-    @VisibleForTesting
-    public void clear()
-    {
-        head = tail = null;
-        cache.clear();
-        saveResults.clear();
-    }
-
-    @VisibleForTesting
-    public Map<Object, AsyncResult<Void>> saveResults()
-    {
-        return saveResults;
-    }
-
-    private void unlink(Node<?, ?> node)
+    private void unlink(AccordCachingState<?, ?> node)
     {
-        Node<?, ?> prev = node.prev;
-        Node<?, ?> next = node.next;
-
-        if (prev == null)
-        {
-            Preconditions.checkState(head == node, "previous is null but the head isnt the provided node!");
-            head = next;
-        }
-        else
-        {
-            prev.next = next;
-        }
-
-        if (next == null)
-        {
-            Preconditions.checkState(tail == node, "next is null but the tail isnt the provided node!");
-            tail = prev;
-        }
-        else
-        {
-            next.prev = prev;
-        }
-
-        node.prev = null;
-        node.next = null;
+        node.unlink();
         unreferenced--;
     }
 
-    private void push(Node<?, ?> node)
+    private void link(AccordCachingState<?, ?> node)
     {
-        if (head != null)
-        {
-            node.prev = null;
-            node.next = head;
-            head.prev = node;
-            head = node;
-        }
-        else
-        {
-            head = node;
-            tail = node;
-        }
+        addLast(node);
         unreferenced++;
     }
 
-    private <K, V> void updateSize(Node<K, V> node, ToLongFunction<V> estimator)
-    {
-        bytesCached += node.estimatedSizeOnHeapDelta(estimator);
-    }
-
-    // don't evict if there's an outstanding save result. If an item is evicted then reloaded
-    // before it's mutation is applied, out of date info will be loaded
-    private boolean canEvict(Node<?, ?> node)
+    @SuppressWarnings("unchecked")
+    private <K, V> void maybeUpdateSize(AccordCachingState<?, ?> node, ToLongFunction<?> estimator)
     {
-        Invariants.checkState(node.references == 0);
-        return node.state() == FAILED || !hasActiveAsyncResult(saveResults, node.key());
+        if (node.shouldUpdateSize())
+            bytesCached += ((AccordCachingState<K, V>) node).estimatedSizeOnHeapDelta((ToLongFunction<V>) estimator);
     }
 
-    private void maybeEvict()
+    /*
+     * Roughly respects LRU semantics when evicting. Might consider prioritising keeping MODIFIED nodes around
+     * for longer to maximise the chances of hitting system tables fewer times (or not at all).
+     */
+    private void maybeEvictSomeNodes()
     {
         if (bytesCached <= maxSizeInBytes)
             return;
 
-        Node<?, ?> current = tail;
-        while (current != null && bytesCached > maxSizeInBytes)
+        Iterator<AccordCachingState<?, ?>> iter = this.iterator();
+        while (iter.hasNext() && bytesCached > maxSizeInBytes)
         {
-            Node<?, ?> evict = current;
-            current = current.prev;
-
-            // TODO (expected, efficiency): can this be reworked so we're not skipping unevictable nodes everytime we try to evict?
-            if (!canEvict(evict))
-                continue;
+            AccordCachingState<?, ?> node = iter.next();
+            checkState(node.references == 0);
 
-            evict(evict, true);
+            /*
+             * TODO (expected, efficiency):
+             *    can this be reworked so we're not skipping unevictable nodes everytime we try to evict?
+             */
+            Status status = node.status(); // status() call completes (if completeable)
+            switch (status)
+            {
+                default: throw new IllegalStateException("Unhandled status " + status);
+                case LOADED:
+                    unlink(node);
+                    evict(node);
+                    break;
+                case MODIFIED:
+                    // schedule a save to disk, keep linked and in the cache map
+                    Instance<?, ?, ?> instance = instanceForNode(node);
+                    node.save(saveExecutor, instance.saveFunction);
+                    maybeUpdateSize(node, instance.heapEstimator);
+                    break;
+                case SAVING:
+                    // skip over until completes to LOADED or FAILED_TO_SAVE
+                    break;
+                case FAILED_TO_SAVE:
+                    // permanently unlink, but keep in the map; consider panicking instead when this happens

Review Comment:
   Can you add a TODO for this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org