You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by "hemantk-12 (via GitHub)" <gi...@apache.org> on 2023/06/09 22:09:51 UTC

[GitHub] [ozone] hemantk-12 commented on a diff in pull request #4567: HDDS-8528. [Snapshot] Custom SnapshotCache implementation to replace LoadingCache

hemantk-12 commented on code in PR #4567:
URL: https://github.com/apache/ozone/pull/4567#discussion_r1224624077


##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java:
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.snapshot;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheLoader;
+import org.apache.hadoop.ozone.om.IOmMetadataReader;
+import org.apache.hadoop.ozone.om.OmSnapshot;
+import org.apache.hadoop.ozone.om.OmSnapshotManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_NOT_FOUND;
+import static org.apache.hadoop.ozone.om.helpers.SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE;
+
+/**
+ * Thread-safe custom unbounded LRU cache to manage open snapshot DB instances.
+ */
+public class SnapshotCache {
+
+  static final Logger LOG = LoggerFactory.getLogger(SnapshotCache.class);
+
+  // Snapshot cache internal hash map.
+  // Key:   DB snapshot table key
+  // Value: OmSnapshot instance, each holds a DB instance handle inside
+  // TODO: Also wrap SoftReference<> around the value?
+  private final ConcurrentHashMap<String, ReferenceCounted<IOmMetadataReader>>
+      dbMap;
+
+  // Linked hash set that holds OmSnapshot instances whose reference count
+  // has reached zero. Those entries are eligible to be evicted and closed.
+  // Sorted in last used order.
+  // Least-recently-used entry located at the beginning.
+  // TODO: Check thread safety. Try ConcurrentHashMultiset ?

Review Comment:
   If it has to be Concurrent `LinkedHashSet`, I don't think there any library which can be used directly. You can do something similar to https://stackoverflow.com/a/1391955 and use `Collections.synchronizedSet()`.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/ReferenceCounted.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.snapshot;
+
+import com.google.common.base.Preconditions;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Add reference counter to an object instance.
+ */
+public class ReferenceCounted<T> implements AutoCloseable {
+
+  /**
+   * Object that is being reference counted. e.g. OmSnapshot
+   */
+  private final T obj;
+
+  /**
+   * A map of thread IDs holding the reference of the object and its count.
+   */
+  private final ConcurrentHashMap<Long, Long> threadMap;
+
+  /**
+   * Sum of reference counts from all threads.
+   */
+  private final AtomicLong refCount;
+
+  /**
+   * Parent SnapshotCache instance whose callback will be triggered upon this RC
+   * closure.
+   */
+  private final SnapshotCache parentSnapshotCache;
+
+  public ReferenceCounted(T obj, boolean disableCounter,
+      SnapshotCache parentSnapshotCache) {
+    // A param to allow disabling ref counting to reduce active DB
+    //  access penalties due to AtomicLong operations.
+    this.obj = obj;
+    if (disableCounter) {
+      this.threadMap = null;
+      this.refCount = null;
+    } else {
+      this.threadMap = new ConcurrentHashMap<>();
+      this.refCount = new AtomicLong(0L);
+    }
+    this.parentSnapshotCache = parentSnapshotCache;
+  }
+
+  /**
+   * @return Object being referenced counted.
+   */
+  public T get() {
+    return obj;
+  }
+
+  public long incrementRefCount() { // TODO: [SNAPSHOT] Rename to increment()
+    if (refCount == null) {
+      return -1L;
+    }
+
+    long tid = Thread.currentThread().getId();
+
+    // Put the new mapping if absent, atomically
+    threadMap.putIfAbsent(tid, 0L);

Review Comment:
   null check on `threadMap` because it can be null. Or do an assert.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java:
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.snapshot;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheLoader;
+import org.apache.hadoop.ozone.om.IOmMetadataReader;
+import org.apache.hadoop.ozone.om.OmSnapshot;
+import org.apache.hadoop.ozone.om.OmSnapshotManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_NOT_FOUND;
+import static org.apache.hadoop.ozone.om.helpers.SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE;
+
+/**
+ * Thread-safe custom unbounded LRU cache to manage open snapshot DB instances.
+ */
+public class SnapshotCache {
+
+  static final Logger LOG = LoggerFactory.getLogger(SnapshotCache.class);
+
+  // Snapshot cache internal hash map.
+  // Key:   DB snapshot table key
+  // Value: OmSnapshot instance, each holds a DB instance handle inside
+  // TODO: Also wrap SoftReference<> around the value?
+  private final ConcurrentHashMap<String, ReferenceCounted<IOmMetadataReader>>
+      dbMap;
+
+  // Linked hash set that holds OmSnapshot instances whose reference count
+  // has reached zero. Those entries are eligible to be evicted and closed.
+  // Sorted in last used order.
+  // Least-recently-used entry located at the beginning.
+  // TODO: Check thread safety. Try ConcurrentHashMultiset ?
+  private final LinkedHashSet<ReferenceCounted<IOmMetadataReader>>
+      pendingEvictionList;
+  private final OmSnapshotManager omSnapshotManager;
+  private final CacheLoader<String, OmSnapshot> cacheLoader;
+  // Soft-limit of the total number of snapshot DB instances allowed to be
+  // opened on the OM.
+  private final int cacheSizeLimit;
+
+  public SnapshotCache(
+      OmSnapshotManager omSnapshotManager,
+      CacheLoader<String, OmSnapshot> cacheLoader,
+      int cacheSizeLimit) {
+    this.dbMap = new ConcurrentHashMap<>();
+    this.pendingEvictionList = new LinkedHashSet<>();
+    this.omSnapshotManager = omSnapshotManager;
+    this.cacheLoader = cacheLoader;
+    this.cacheSizeLimit = cacheSizeLimit;
+  }
+
+  @VisibleForTesting
+  ConcurrentHashMap<String, ReferenceCounted<IOmMetadataReader>> getDbMap() {
+    return dbMap;
+  }
+
+  @VisibleForTesting
+  LinkedHashSet<ReferenceCounted<IOmMetadataReader>> getPendingEvictionList() {
+    return pendingEvictionList;
+  }
+
+  /**
+   * @return number of DB instances currently held in cache.
+   */
+  public int size() {
+    return dbMap.size();
+  }
+
+  /**
+   * Immediately invalidate an entry.
+   * @param key DB snapshot table key
+   */
+  public void invalidate(String key) throws IOException {
+    dbMap.computeIfPresent(key, (k, v) -> {
+      pendingEvictionList.remove(v);
+      try {
+        ((OmSnapshot) v.get()).close();
+      } catch (IOException e) {
+        throw new IllegalStateException("Failed to close snapshot: " + key, e);
+      }
+      // Remove the entry from map by returning null
+      return null;
+    });
+  }
+
+  /**
+   * Immediately invalidate all entries and close their DB instances in cache.
+   */
+  public void invalidateAll() {
+    Iterator<Map.Entry<String, ReferenceCounted<IOmMetadataReader>>>
+        it = dbMap.entrySet().iterator();
+
+    while (it.hasNext()) {
+      Map.Entry<String, ReferenceCounted<IOmMetadataReader>> entry = it.next();
+      pendingEvictionList.remove(entry.getValue());
+      OmSnapshot omSnapshot = (OmSnapshot) entry.getValue().get();
+      try {
+        // TODO: If wrapped with SoftReference<>, omSnapshot could be null?
+        omSnapshot.close();
+      } catch (IOException e) {
+        throw new IllegalStateException("Failed to close snapshot", e);
+      }
+      it.remove();
+    }
+  }
+
+  /**
+   * State the reason the current thread is getting the OmSnapshot instance.
+   */
+  public enum Reason {
+    FS_API_READ,
+    SNAPDIFF_READ,
+    DEEP_CLEAN_WRITE,
+    GARBAGE_COLLECTION_WRITE
+  }
+
+
+  public ReferenceCounted<IOmMetadataReader> get(String key)
+      throws IOException {
+    return get(key, false);
+  }
+
+  /**
+   * Get or load OmSnapshot. Shall be close()d after use.
+   * TODO: [SNAPSHOT] Can add reason enum to param list later.
+   * @param key snapshot table key
+   * @return an OmSnapshot instance, or null on error
+   */
+  public ReferenceCounted<IOmMetadataReader> get(String key,
+      boolean skipActiveCheck) throws IOException {
+    // Atomic operation to initialize the OmSnapshot instance (once) if the key
+    // does not exist.
+    ReferenceCounted<IOmMetadataReader> rcOmSnapshot =
+        dbMap.computeIfAbsent(key, k -> {
+          LOG.info("Loading snapshot. Table key: {}", k);
+          try {
+            return new ReferenceCounted<>(cacheLoader.load(k), false, this);
+          } catch (OMException omEx) {
+            // Return null if the snapshot is no longer active
+            if (!omEx.getResult().equals(FILE_NOT_FOUND)) {
+              throw new IllegalStateException(omEx);
+            }
+          } catch (IOException ioEx) {
+            // Failed to load snapshot DB
+            throw new IllegalStateException(ioEx);
+          } catch (Exception ex) {
+            // Unexpected and unknown exception thrown from CacheLoader#load
+            throw new IllegalStateException(ex);
+          }
+          // Do not put the value in the map on exception
+          return null;
+        });
+
+    if (rcOmSnapshot == null) {
+      // The only exception that would fall through the loader logic above
+      // is OMException with FILE_NOT_FOUND.
+      throw new OMException("Snapshot table key '" + key + "' not found, "
+          + "or the snapshot is no longer active",
+          OMException.ResultCodes.FILE_NOT_FOUND);
+    }
+
+    // If the snapshot is already loaded in cache, the check inside the loader
+    // above is ignored. But we would still want to reject all get()s except
+    // when called from SDT (and some) if the snapshot is not active any more.
+    if (!omSnapshotManager.isSnapshotStatus(key, SNAPSHOT_ACTIVE) &&
+        !skipActiveCheck) {
+      throw new OMException("Unable to load snapshot. " +
+          "Snapshot with table key '" + key + "' is no longer active",
+          FILE_NOT_FOUND);
+    }
+
+    // Increment the reference count on the instance.
+    rcOmSnapshot.incrementRefCount();
+
+    // Remove instance from clean up list when it exists.
+    // TODO: [SNAPSHOT] Check thread safety with release()
+    pendingEvictionList.remove(rcOmSnapshot);
+
+    // Check if any entries can be cleaned up.
+    // At this point, cache size might temporarily exceed cacheSizeLimit
+    // even if there are entries that can be evicted, which is fine since it
+    // is a soft limit.
+    cleanup();
+
+    return rcOmSnapshot;
+  }
+
+  /**
+   * Release the reference count on the OmSnapshot instance.
+   * @param key snapshot table key
+   */
+  public void release(String key) {
+    ReferenceCounted<IOmMetadataReader> rcOmSnapshot = dbMap.get(key);
+    Preconditions.checkNotNull(rcOmSnapshot,

Review Comment:
   Is it possible that release can be called twice on the same key? If yes, then this condition check would fail. In that scenario, I don't think it is good idea to have this condition check. 



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/ReferenceCounted.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.snapshot;
+
+import com.google.common.base.Preconditions;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Add reference counter to an object instance.
+ */
+public class ReferenceCounted<T> implements AutoCloseable {
+
+  /**
+   * Object that is being reference counted. e.g. OmSnapshot
+   */
+  private final T obj;
+
+  /**
+   * A map of thread IDs holding the reference of the object and its count.
+   */
+  private final ConcurrentHashMap<Long, Long> threadMap;
+
+  /**
+   * Sum of reference counts from all threads.
+   */
+  private final AtomicLong refCount;
+
+  /**
+   * Parent SnapshotCache instance whose callback will be triggered upon this RC
+   * closure.
+   */
+  private final SnapshotCache parentSnapshotCache;
+
+  public ReferenceCounted(T obj, boolean disableCounter,
+      SnapshotCache parentSnapshotCache) {
+    // A param to allow disabling ref counting to reduce active DB
+    //  access penalties due to AtomicLong operations.
+    this.obj = obj;
+    if (disableCounter) {
+      this.threadMap = null;
+      this.refCount = null;
+    } else {
+      this.threadMap = new ConcurrentHashMap<>();
+      this.refCount = new AtomicLong(0L);
+    }
+    this.parentSnapshotCache = parentSnapshotCache;
+  }
+
+  /**
+   * @return Object being referenced counted.
+   */
+  public T get() {
+    return obj;
+  }
+
+  public long incrementRefCount() { // TODO: [SNAPSHOT] Rename to increment()
+    if (refCount == null) {
+      return -1L;
+    }
+
+    long tid = Thread.currentThread().getId();
+
+    // Put the new mapping if absent, atomically

Review Comment:
   nit: in my opinion these type of comments are not needed. `putIfAbsent` (method name) says itself that it will put if key is absent and `threadMap` is concurrent `HashMap` so operation would be atomic.
   In short they are not adding any value to me.
   
   Same for line 83.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java:
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.snapshot;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheLoader;
+import org.apache.hadoop.ozone.om.IOmMetadataReader;
+import org.apache.hadoop.ozone.om.OmSnapshot;
+import org.apache.hadoop.ozone.om.OmSnapshotManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_NOT_FOUND;
+import static org.apache.hadoop.ozone.om.helpers.SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE;
+
+/**
+ * Thread-safe custom unbounded LRU cache to manage open snapshot DB instances.
+ */
+public class SnapshotCache {
+
+  static final Logger LOG = LoggerFactory.getLogger(SnapshotCache.class);
+
+  // Snapshot cache internal hash map.
+  // Key:   DB snapshot table key
+  // Value: OmSnapshot instance, each holds a DB instance handle inside
+  // TODO: Also wrap SoftReference<> around the value?
+  private final ConcurrentHashMap<String, ReferenceCounted<IOmMetadataReader>>
+      dbMap;
+
+  // Linked hash set that holds OmSnapshot instances whose reference count
+  // has reached zero. Those entries are eligible to be evicted and closed.
+  // Sorted in last used order.
+  // Least-recently-used entry located at the beginning.
+  // TODO: Check thread safety. Try ConcurrentHashMultiset ?
+  private final LinkedHashSet<ReferenceCounted<IOmMetadataReader>>
+      pendingEvictionList;
+  private final OmSnapshotManager omSnapshotManager;
+  private final CacheLoader<String, OmSnapshot> cacheLoader;
+  // Soft-limit of the total number of snapshot DB instances allowed to be
+  // opened on the OM.
+  private final int cacheSizeLimit;
+
+  public SnapshotCache(
+      OmSnapshotManager omSnapshotManager,
+      CacheLoader<String, OmSnapshot> cacheLoader,
+      int cacheSizeLimit) {
+    this.dbMap = new ConcurrentHashMap<>();
+    this.pendingEvictionList = new LinkedHashSet<>();
+    this.omSnapshotManager = omSnapshotManager;
+    this.cacheLoader = cacheLoader;
+    this.cacheSizeLimit = cacheSizeLimit;
+  }
+
+  @VisibleForTesting
+  ConcurrentHashMap<String, ReferenceCounted<IOmMetadataReader>> getDbMap() {
+    return dbMap;
+  }
+
+  @VisibleForTesting
+  LinkedHashSet<ReferenceCounted<IOmMetadataReader>> getPendingEvictionList() {
+    return pendingEvictionList;
+  }
+
+  /**
+   * @return number of DB instances currently held in cache.
+   */
+  public int size() {
+    return dbMap.size();
+  }
+
+  /**
+   * Immediately invalidate an entry.
+   * @param key DB snapshot table key
+   */
+  public void invalidate(String key) throws IOException {
+    dbMap.computeIfPresent(key, (k, v) -> {
+      pendingEvictionList.remove(v);
+      try {
+        ((OmSnapshot) v.get()).close();
+      } catch (IOException e) {
+        throw new IllegalStateException("Failed to close snapshot: " + key, e);
+      }
+      // Remove the entry from map by returning null
+      return null;
+    });
+  }
+
+  /**
+   * Immediately invalidate all entries and close their DB instances in cache.
+   */
+  public void invalidateAll() {
+    Iterator<Map.Entry<String, ReferenceCounted<IOmMetadataReader>>>
+        it = dbMap.entrySet().iterator();
+
+    while (it.hasNext()) {
+      Map.Entry<String, ReferenceCounted<IOmMetadataReader>> entry = it.next();
+      pendingEvictionList.remove(entry.getValue());
+      OmSnapshot omSnapshot = (OmSnapshot) entry.getValue().get();
+      try {
+        // TODO: If wrapped with SoftReference<>, omSnapshot could be null?
+        omSnapshot.close();
+      } catch (IOException e) {
+        throw new IllegalStateException("Failed to close snapshot", e);
+      }
+      it.remove();
+    }
+  }
+
+  /**
+   * State the reason the current thread is getting the OmSnapshot instance.
+   */
+  public enum Reason {
+    FS_API_READ,
+    SNAPDIFF_READ,
+    DEEP_CLEAN_WRITE,
+    GARBAGE_COLLECTION_WRITE
+  }
+
+
+  public ReferenceCounted<IOmMetadataReader> get(String key)
+      throws IOException {
+    return get(key, false);
+  }
+
+  /**
+   * Get or load OmSnapshot. Shall be close()d after use.
+   * TODO: [SNAPSHOT] Can add reason enum to param list later.
+   * @param key snapshot table key
+   * @return an OmSnapshot instance, or null on error
+   */
+  public ReferenceCounted<IOmMetadataReader> get(String key,
+      boolean skipActiveCheck) throws IOException {
+    // Atomic operation to initialize the OmSnapshot instance (once) if the key
+    // does not exist.
+    ReferenceCounted<IOmMetadataReader> rcOmSnapshot =
+        dbMap.computeIfAbsent(key, k -> {
+          LOG.info("Loading snapshot. Table key: {}", k);
+          try {
+            return new ReferenceCounted<>(cacheLoader.load(k), false, this);
+          } catch (OMException omEx) {
+            // Return null if the snapshot is no longer active
+            if (!omEx.getResult().equals(FILE_NOT_FOUND)) {
+              throw new IllegalStateException(omEx);
+            }
+          } catch (IOException ioEx) {
+            // Failed to load snapshot DB
+            throw new IllegalStateException(ioEx);
+          } catch (Exception ex) {
+            // Unexpected and unknown exception thrown from CacheLoader#load
+            throw new IllegalStateException(ex);
+          }
+          // Do not put the value in the map on exception
+          return null;
+        });
+
+    if (rcOmSnapshot == null) {
+      // The only exception that would fall through the loader logic above
+      // is OMException with FILE_NOT_FOUND.
+      throw new OMException("Snapshot table key '" + key + "' not found, "
+          + "or the snapshot is no longer active",
+          OMException.ResultCodes.FILE_NOT_FOUND);
+    }
+
+    // If the snapshot is already loaded in cache, the check inside the loader
+    // above is ignored. But we would still want to reject all get()s except
+    // when called from SDT (and some) if the snapshot is not active any more.
+    if (!omSnapshotManager.isSnapshotStatus(key, SNAPSHOT_ACTIVE) &&
+        !skipActiveCheck) {
+      throw new OMException("Unable to load snapshot. " +
+          "Snapshot with table key '" + key + "' is no longer active",
+          FILE_NOT_FOUND);
+    }
+
+    // Increment the reference count on the instance.
+    rcOmSnapshot.incrementRefCount();
+
+    // Remove instance from clean up list when it exists.
+    // TODO: [SNAPSHOT] Check thread safety with release()
+    pendingEvictionList.remove(rcOmSnapshot);
+
+    // Check if any entries can be cleaned up.
+    // At this point, cache size might temporarily exceed cacheSizeLimit
+    // even if there are entries that can be evicted, which is fine since it
+    // is a soft limit.
+    cleanup();
+
+    return rcOmSnapshot;
+  }
+
+  /**
+   * Release the reference count on the OmSnapshot instance.
+   * @param key snapshot table key
+   */
+  public void release(String key) {
+    ReferenceCounted<IOmMetadataReader> rcOmSnapshot = dbMap.get(key);
+    Preconditions.checkNotNull(rcOmSnapshot,
+        "Key '" + key + "' does not exist in cache");
+
+    if (rcOmSnapshot.decrementRefCount() == 0L) {
+      // Eligible to be closed, add it to the list.
+      pendingEvictionList.add(rcOmSnapshot);
+      cleanup();
+    }
+  }
+
+  /**
+   * Alternatively, can release with OmSnapshot instance directly.
+   * @param omSnapshot OmSnapshot
+   */
+  public void release(OmSnapshot omSnapshot) {
+    final String key = omSnapshot.getSnapshotTableKey();
+    release(key);
+  }
+
+  /**
+   * Callback method used to enqueue or dequeue ReferenceCounted from
+   * pendingEvictionList.
+   * @param referenceCounted ReferenceCounted object
+   */
+  public void callback(ReferenceCounted referenceCounted) {
+    if (referenceCounted.getTotalRefCount() == 0L) {
+      // Reference count reaches zero, add to pendingEvictionList
+      Preconditions.checkState(!pendingEvictionList.contains(referenceCounted),
+          "SnapshotCache is inconsistent. Entry should not be in the "
+              + "pendingEvictionList when ref count just reached zero.");
+      pendingEvictionList.add(referenceCounted);
+    } else if (referenceCounted.getTotalRefCount() == 1L) {
+      pendingEvictionList.remove(referenceCounted);
+    }
+  }
+
+  /**
+   * If cache size exceeds soft limit, attempt to clean up and close the
+   * instances that has zero reference count.
+   * TODO: [SNAPSHOT] Add new ozone debug CLI command to trigger this directly.
+   */
+  private void cleanup() {
+    long numEntriesToEvict = (long) dbMap.size() - cacheSizeLimit;
+    while (pendingEvictionList.size() > 0 && numEntriesToEvict > 0L) {
+      // Get the first instance in the clean up list
+      ReferenceCounted<IOmMetadataReader> rcOmSnapshot =
+          pendingEvictionList.iterator().next();
+      OmSnapshot omSnapshot = (OmSnapshot) rcOmSnapshot.get();
+      LOG.debug("Evicting OmSnapshot instance {} with table key {}",
+          rcOmSnapshot, omSnapshot.getSnapshotTableKey());
+      // Sanity check
+      Preconditions.checkState(rcOmSnapshot.getTotalRefCount() == 0L,
+          "Illegal state: OmSnapshot reference count non-zero ("
+              + rcOmSnapshot.getTotalRefCount() + ") but shows up in the "
+              + "clean up list");
+
+      final String key = omSnapshot.getSnapshotTableKey();
+      final ReferenceCounted<IOmMetadataReader> result = dbMap.remove(key);
+      // Sanity check
+      Preconditions.checkState(rcOmSnapshot == result,
+          "Cache map entry removal failure. The cache is in an inconsistent "
+              + "state. Expected OmSnapshot instance: " + rcOmSnapshot
+              + ", actual: " + result);
+
+      pendingEvictionList.remove(result);
+
+      // Close the instance, which also closes its DB handle.
+      try {
+        ((OmSnapshot) rcOmSnapshot.get()).close();
+      } catch (IOException ex) {
+        throw new IllegalStateException("Error while closing snapshot DB", ex);
+      }
+
+      --numEntriesToEvict;
+    }
+
+    // Print warning message if actual cache size is exceeding the soft limit
+    // even after the cleanup procedure above.
+    if ((long) dbMap.size() > cacheSizeLimit) {
+      LOG.warn("Current snapshot cache size ({}) is exceeding configured "
+          + "soft-limit ({}) after possible evictions.",
+          dbMap.size(), cacheSizeLimit);
+
+      Preconditions.checkState(pendingEvictionList.size() == 0);

Review Comment:
   I'm skeptical about it and would prefer to add it in if block because there is no synchronization between this and `pendingEvictionList.add(referenceCounted);` at line # 249.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java:
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.snapshot;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheLoader;
+import org.apache.hadoop.ozone.om.IOmMetadataReader;
+import org.apache.hadoop.ozone.om.OmSnapshot;
+import org.apache.hadoop.ozone.om.OmSnapshotManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_NOT_FOUND;
+import static org.apache.hadoop.ozone.om.helpers.SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE;
+
+/**
+ * Thread-safe custom unbounded LRU cache to manage open snapshot DB instances.
+ */
+public class SnapshotCache {
+
+  static final Logger LOG = LoggerFactory.getLogger(SnapshotCache.class);
+
+  // Snapshot cache internal hash map.
+  // Key:   DB snapshot table key
+  // Value: OmSnapshot instance, each holds a DB instance handle inside
+  // TODO: Also wrap SoftReference<> around the value?
+  private final ConcurrentHashMap<String, ReferenceCounted<IOmMetadataReader>>
+      dbMap;
+
+  // Linked hash set that holds OmSnapshot instances whose reference count
+  // has reached zero. Those entries are eligible to be evicted and closed.
+  // Sorted in last used order.
+  // Least-recently-used entry located at the beginning.
+  // TODO: Check thread safety. Try ConcurrentHashMultiset ?
+  private final LinkedHashSet<ReferenceCounted<IOmMetadataReader>>
+      pendingEvictionList;
+  private final OmSnapshotManager omSnapshotManager;
+  private final CacheLoader<String, OmSnapshot> cacheLoader;
+  // Soft-limit of the total number of snapshot DB instances allowed to be
+  // opened on the OM.
+  private final int cacheSizeLimit;
+
+  public SnapshotCache(
+      OmSnapshotManager omSnapshotManager,
+      CacheLoader<String, OmSnapshot> cacheLoader,
+      int cacheSizeLimit) {
+    this.dbMap = new ConcurrentHashMap<>();
+    this.pendingEvictionList = new LinkedHashSet<>();
+    this.omSnapshotManager = omSnapshotManager;
+    this.cacheLoader = cacheLoader;
+    this.cacheSizeLimit = cacheSizeLimit;
+  }
+
+  @VisibleForTesting
+  ConcurrentHashMap<String, ReferenceCounted<IOmMetadataReader>> getDbMap() {
+    return dbMap;
+  }
+
+  @VisibleForTesting
+  LinkedHashSet<ReferenceCounted<IOmMetadataReader>> getPendingEvictionList() {
+    return pendingEvictionList;
+  }
+
+  /**
+   * @return number of DB instances currently held in cache.
+   */
+  public int size() {
+    return dbMap.size();
+  }
+
+  /**
+   * Immediately invalidate an entry.
+   * @param key DB snapshot table key
+   */
+  public void invalidate(String key) throws IOException {
+    dbMap.computeIfPresent(key, (k, v) -> {
+      pendingEvictionList.remove(v);
+      try {
+        ((OmSnapshot) v.get()).close();
+      } catch (IOException e) {
+        throw new IllegalStateException("Failed to close snapshot: " + key, e);
+      }
+      // Remove the entry from map by returning null
+      return null;
+    });
+  }
+
+  /**
+   * Immediately invalidate all entries and close their DB instances in cache.
+   */
+  public void invalidateAll() {
+    Iterator<Map.Entry<String, ReferenceCounted<IOmMetadataReader>>>
+        it = dbMap.entrySet().iterator();
+
+    while (it.hasNext()) {
+      Map.Entry<String, ReferenceCounted<IOmMetadataReader>> entry = it.next();
+      pendingEvictionList.remove(entry.getValue());
+      OmSnapshot omSnapshot = (OmSnapshot) entry.getValue().get();
+      try {
+        // TODO: If wrapped with SoftReference<>, omSnapshot could be null?
+        omSnapshot.close();
+      } catch (IOException e) {
+        throw new IllegalStateException("Failed to close snapshot", e);
+      }
+      it.remove();
+    }
+  }
+
+  /**
+   * State the reason the current thread is getting the OmSnapshot instance.
+   */
+  public enum Reason {
+    FS_API_READ,
+    SNAPDIFF_READ,
+    DEEP_CLEAN_WRITE,
+    GARBAGE_COLLECTION_WRITE
+  }
+
+
+  public ReferenceCounted<IOmMetadataReader> get(String key)
+      throws IOException {
+    return get(key, false);
+  }
+
+  /**
+   * Get or load OmSnapshot. Shall be close()d after use.
+   * TODO: [SNAPSHOT] Can add reason enum to param list later.
+   * @param key snapshot table key
+   * @return an OmSnapshot instance, or null on error
+   */
+  public ReferenceCounted<IOmMetadataReader> get(String key,
+      boolean skipActiveCheck) throws IOException {
+    // Atomic operation to initialize the OmSnapshot instance (once) if the key
+    // does not exist.
+    ReferenceCounted<IOmMetadataReader> rcOmSnapshot =
+        dbMap.computeIfAbsent(key, k -> {
+          LOG.info("Loading snapshot. Table key: {}", k);
+          try {
+            return new ReferenceCounted<>(cacheLoader.load(k), false, this);
+          } catch (OMException omEx) {
+            // Return null if the snapshot is no longer active
+            if (!omEx.getResult().equals(FILE_NOT_FOUND)) {
+              throw new IllegalStateException(omEx);
+            }
+          } catch (IOException ioEx) {
+            // Failed to load snapshot DB
+            throw new IllegalStateException(ioEx);
+          } catch (Exception ex) {
+            // Unexpected and unknown exception thrown from CacheLoader#load
+            throw new IllegalStateException(ex);
+          }
+          // Do not put the value in the map on exception
+          return null;
+        });
+
+    if (rcOmSnapshot == null) {
+      // The only exception that would fall through the loader logic above
+      // is OMException with FILE_NOT_FOUND.
+      throw new OMException("Snapshot table key '" + key + "' not found, "
+          + "or the snapshot is no longer active",
+          OMException.ResultCodes.FILE_NOT_FOUND);
+    }
+
+    // If the snapshot is already loaded in cache, the check inside the loader
+    // above is ignored. But we would still want to reject all get()s except
+    // when called from SDT (and some) if the snapshot is not active any more.

Review Comment:
   nit:
   ```suggestion
       // when called from SDT (and some) if the snapshot is not active anymore.
   ```



##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestSnapshotDeletingService.java:
##########
@@ -471,4 +471,3 @@ private boolean assertTableRowCount(int expectedCount,
     return count == expectedCount;
   }
 }
-

Review Comment:
   Why? :P
   
   I think every file should have an empty line in the end of the file.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/ReferenceCounted.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.snapshot;
+
+import com.google.common.base.Preconditions;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Add reference counter to an object instance.
+ */
+public class ReferenceCounted<T> implements AutoCloseable {
+
+  /**
+   * Object that is being reference counted. e.g. OmSnapshot
+   */
+  private final T obj;
+
+  /**
+   * A map of thread IDs holding the reference of the object and its count.
+   */
+  private final ConcurrentHashMap<Long, Long> threadMap;
+
+  /**
+   * Sum of reference counts from all threads.
+   */
+  private final AtomicLong refCount;
+
+  /**
+   * Parent SnapshotCache instance whose callback will be triggered upon this RC
+   * closure.
+   */
+  private final SnapshotCache parentSnapshotCache;
+
+  public ReferenceCounted(T obj, boolean disableCounter,

Review Comment:
   I am not aligned by providing the flexibility to `disableCounter`. If someone doesn't need reference count, then don't wrap object with `ReferenceCounted`. I see it is used for active DB to be in consistency but you can still keep the count. It is not harmful, IMO. Providing `disableCounter` is just making it over complicated. Also defeats the whole purpose of reference count.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/ReferenceCounted.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.snapshot;
+
+import com.google.common.base.Preconditions;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Add reference counter to an object instance.
+ */
+public class ReferenceCounted<T> implements AutoCloseable {
+
+  /**
+   * Object that is being reference counted. e.g. OmSnapshot
+   */
+  private final T obj;
+
+  /**
+   * A map of thread IDs holding the reference of the object and its count.
+   */
+  private final ConcurrentHashMap<Long, Long> threadMap;
+
+  /**
+   * Sum of reference counts from all threads.
+   */
+  private final AtomicLong refCount;
+
+  /**
+   * Parent SnapshotCache instance whose callback will be triggered upon this RC
+   * closure.
+   */
+  private final SnapshotCache parentSnapshotCache;
+
+  public ReferenceCounted(T obj, boolean disableCounter,
+      SnapshotCache parentSnapshotCache) {
+    // A param to allow disabling ref counting to reduce active DB
+    //  access penalties due to AtomicLong operations.
+    this.obj = obj;
+    if (disableCounter) {
+      this.threadMap = null;
+      this.refCount = null;
+    } else {
+      this.threadMap = new ConcurrentHashMap<>();
+      this.refCount = new AtomicLong(0L);
+    }
+    this.parentSnapshotCache = parentSnapshotCache;
+  }
+
+  /**
+   * @return Object being referenced counted.
+   */
+  public T get() {
+    return obj;
+  }
+
+  public long incrementRefCount() { // TODO: [SNAPSHOT] Rename to increment()
+    if (refCount == null) {
+      return -1L;
+    }
+
+    long tid = Thread.currentThread().getId();
+
+    // Put the new mapping if absent, atomically
+    threadMap.putIfAbsent(tid, 0L);
+
+    // Update the value and do some checks, atomically
+    threadMap.computeIfPresent(tid, (k, v) -> {
+      long newVal = v + 1;
+      Preconditions.checkState(newVal > 0L, "Thread reference count overflown");
+
+      long newValTotal = refCount.incrementAndGet();
+      Preconditions.checkState(newValTotal > 0L,
+          "Total reference count overflown");
+
+      if (refCount.get() == 1L) {
+        // ref count increased to one (from zero), remove from
+        // pendingEvictionList if added
+        parentSnapshotCache.callback(this);
+      }
+
+      return newVal;
+    });
+
+    return refCount.get();
+  }
+
+  public long decrementRefCount() {
+    if (refCount == null) {
+      return -1L;
+    }
+
+    long tid = Thread.currentThread().getId();
+
+    Preconditions.checkState(threadMap.containsKey(tid),
+        "Current thread have not holden reference before");
+
+    Preconditions.checkNotNull(threadMap.get(tid), "This thread " + tid +
+        " has not incremented the reference count before.");

Review Comment:
   1. Are these precondition checks to make sure that `decrementRefCount` and `incrementRefCount` are not getting called two or more times for the same object? And catch any bug?
   2. These is no difference in`Preconditions.checkState(threadMap.containsKey(tid), ...)` and `Preconditions.checkNotNull(threadMap.get(tid),..)`. Because if `threadMap` contains tid and value is 0L (unboxed), its boxed value would be 0L(boxed) not null. Second condition should be `Preconditions.checkState(threadMap.get(tid) > 0L, ..)`



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/ReferenceCounted.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.snapshot;
+
+import com.google.common.base.Preconditions;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Add reference counter to an object instance.
+ */
+public class ReferenceCounted<T> implements AutoCloseable {
+
+  /**
+   * Object that is being reference counted. e.g. OmSnapshot
+   */
+  private final T obj;
+
+  /**
+   * A map of thread IDs holding the reference of the object and its count.
+   */
+  private final ConcurrentHashMap<Long, Long> threadMap;
+
+  /**
+   * Sum of reference counts from all threads.
+   */
+  private final AtomicLong refCount;
+
+  /**
+   * Parent SnapshotCache instance whose callback will be triggered upon this RC
+   * closure.
+   */
+  private final SnapshotCache parentSnapshotCache;

Review Comment:
   Is it has to be `SnapshotCache`? Should it type T or P? `ReferenceCounted` is not generic anymore and getting coupled with Snapshot. I'm fine if you make `ReferenceCounted` for snapshot only but then it should not be T type.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/ReferenceCounted.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.snapshot;
+
+import com.google.common.base.Preconditions;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Add reference counter to an object instance.
+ */
+public class ReferenceCounted<T> implements AutoCloseable {
+
+  /**
+   * Object that is being reference counted. e.g. OmSnapshot
+   */
+  private final T obj;
+
+  /**
+   * A map of thread IDs holding the reference of the object and its count.
+   */
+  private final ConcurrentHashMap<Long, Long> threadMap;

Review Comment:
   What do we want to achieve with this `threadMap`? I am not seeing any use of it as of now.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java:
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.snapshot;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheLoader;
+import org.apache.hadoop.ozone.om.IOmMetadataReader;
+import org.apache.hadoop.ozone.om.OmSnapshot;
+import org.apache.hadoop.ozone.om.OmSnapshotManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_NOT_FOUND;
+import static org.apache.hadoop.ozone.om.helpers.SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE;
+
+/**
+ * Thread-safe custom unbounded LRU cache to manage open snapshot DB instances.
+ */
+public class SnapshotCache {
+
+  static final Logger LOG = LoggerFactory.getLogger(SnapshotCache.class);
+
+  // Snapshot cache internal hash map.
+  // Key:   DB snapshot table key
+  // Value: OmSnapshot instance, each holds a DB instance handle inside
+  // TODO: Also wrap SoftReference<> around the value?
+  private final ConcurrentHashMap<String, ReferenceCounted<IOmMetadataReader>>
+      dbMap;
+
+  // Linked hash set that holds OmSnapshot instances whose reference count
+  // has reached zero. Those entries are eligible to be evicted and closed.
+  // Sorted in last used order.
+  // Least-recently-used entry located at the beginning.
+  // TODO: Check thread safety. Try ConcurrentHashMultiset ?
+  private final LinkedHashSet<ReferenceCounted<IOmMetadataReader>>
+      pendingEvictionList;
+  private final OmSnapshotManager omSnapshotManager;
+  private final CacheLoader<String, OmSnapshot> cacheLoader;
+  // Soft-limit of the total number of snapshot DB instances allowed to be
+  // opened on the OM.
+  private final int cacheSizeLimit;
+
+  public SnapshotCache(
+      OmSnapshotManager omSnapshotManager,
+      CacheLoader<String, OmSnapshot> cacheLoader,
+      int cacheSizeLimit) {
+    this.dbMap = new ConcurrentHashMap<>();
+    this.pendingEvictionList = new LinkedHashSet<>();
+    this.omSnapshotManager = omSnapshotManager;
+    this.cacheLoader = cacheLoader;
+    this.cacheSizeLimit = cacheSizeLimit;
+  }
+
+  @VisibleForTesting
+  ConcurrentHashMap<String, ReferenceCounted<IOmMetadataReader>> getDbMap() {
+    return dbMap;
+  }
+
+  @VisibleForTesting
+  LinkedHashSet<ReferenceCounted<IOmMetadataReader>> getPendingEvictionList() {
+    return pendingEvictionList;
+  }
+
+  /**
+   * @return number of DB instances currently held in cache.
+   */
+  public int size() {
+    return dbMap.size();
+  }
+
+  /**
+   * Immediately invalidate an entry.
+   * @param key DB snapshot table key
+   */
+  public void invalidate(String key) throws IOException {
+    dbMap.computeIfPresent(key, (k, v) -> {
+      pendingEvictionList.remove(v);
+      try {
+        ((OmSnapshot) v.get()).close();
+      } catch (IOException e) {
+        throw new IllegalStateException("Failed to close snapshot: " + key, e);
+      }
+      // Remove the entry from map by returning null
+      return null;
+    });
+  }
+
+  /**
+   * Immediately invalidate all entries and close their DB instances in cache.
+   */
+  public void invalidateAll() {
+    Iterator<Map.Entry<String, ReferenceCounted<IOmMetadataReader>>>
+        it = dbMap.entrySet().iterator();
+
+    while (it.hasNext()) {
+      Map.Entry<String, ReferenceCounted<IOmMetadataReader>> entry = it.next();
+      pendingEvictionList.remove(entry.getValue());
+      OmSnapshot omSnapshot = (OmSnapshot) entry.getValue().get();
+      try {
+        // TODO: If wrapped with SoftReference<>, omSnapshot could be null?
+        omSnapshot.close();
+      } catch (IOException e) {
+        throw new IllegalStateException("Failed to close snapshot", e);
+      }
+      it.remove();
+    }
+  }
+
+  /**
+   * State the reason the current thread is getting the OmSnapshot instance.
+   */
+  public enum Reason {
+    FS_API_READ,
+    SNAPDIFF_READ,
+    DEEP_CLEAN_WRITE,
+    GARBAGE_COLLECTION_WRITE
+  }
+
+
+  public ReferenceCounted<IOmMetadataReader> get(String key)
+      throws IOException {
+    return get(key, false);
+  }
+
+  /**
+   * Get or load OmSnapshot. Shall be close()d after use.
+   * TODO: [SNAPSHOT] Can add reason enum to param list later.
+   * @param key snapshot table key
+   * @return an OmSnapshot instance, or null on error
+   */
+  public ReferenceCounted<IOmMetadataReader> get(String key,
+      boolean skipActiveCheck) throws IOException {
+    // Atomic operation to initialize the OmSnapshot instance (once) if the key
+    // does not exist.
+    ReferenceCounted<IOmMetadataReader> rcOmSnapshot =
+        dbMap.computeIfAbsent(key, k -> {
+          LOG.info("Loading snapshot. Table key: {}", k);
+          try {
+            return new ReferenceCounted<>(cacheLoader.load(k), false, this);
+          } catch (OMException omEx) {
+            // Return null if the snapshot is no longer active
+            if (!omEx.getResult().equals(FILE_NOT_FOUND)) {
+              throw new IllegalStateException(omEx);
+            }
+          } catch (IOException ioEx) {
+            // Failed to load snapshot DB
+            throw new IllegalStateException(ioEx);
+          } catch (Exception ex) {
+            // Unexpected and unknown exception thrown from CacheLoader#load
+            throw new IllegalStateException(ex);
+          }
+          // Do not put the value in the map on exception
+          return null;
+        });
+
+    if (rcOmSnapshot == null) {
+      // The only exception that would fall through the loader logic above
+      // is OMException with FILE_NOT_FOUND.
+      throw new OMException("Snapshot table key '" + key + "' not found, "
+          + "or the snapshot is no longer active",
+          OMException.ResultCodes.FILE_NOT_FOUND);
+    }
+
+    // If the snapshot is already loaded in cache, the check inside the loader
+    // above is ignored. But we would still want to reject all get()s except
+    // when called from SDT (and some) if the snapshot is not active any more.
+    if (!omSnapshotManager.isSnapshotStatus(key, SNAPSHOT_ACTIVE) &&

Review Comment:
   nit: to fail fast.
   ```suggestion
       if (!skipActiveCheck &&
           !omSnapshotManager.isSnapshotStatus(key, SNAPSHOT_ACTIVE))
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/ReferenceCounted.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.snapshot;
+
+import com.google.common.base.Preconditions;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Add reference counter to an object instance.
+ */
+public class ReferenceCounted<T> implements AutoCloseable {
+
+  /**
+   * Object that is being reference counted. e.g. OmSnapshot
+   */
+  private final T obj;
+
+  /**
+   * A map of thread IDs holding the reference of the object and its count.
+   */
+  private final ConcurrentHashMap<Long, Long> threadMap;
+
+  /**
+   * Sum of reference counts from all threads.
+   */
+  private final AtomicLong refCount;
+
+  /**
+   * Parent SnapshotCache instance whose callback will be triggered upon this RC
+   * closure.
+   */
+  private final SnapshotCache parentSnapshotCache;
+
+  public ReferenceCounted(T obj, boolean disableCounter,
+      SnapshotCache parentSnapshotCache) {
+    // A param to allow disabling ref counting to reduce active DB
+    //  access penalties due to AtomicLong operations.
+    this.obj = obj;
+    if (disableCounter) {
+      this.threadMap = null;
+      this.refCount = null;
+    } else {
+      this.threadMap = new ConcurrentHashMap<>();
+      this.refCount = new AtomicLong(0L);
+    }
+    this.parentSnapshotCache = parentSnapshotCache;
+  }
+
+  /**
+   * @return Object being referenced counted.
+   */
+  public T get() {
+    return obj;
+  }
+
+  public long incrementRefCount() { // TODO: [SNAPSHOT] Rename to increment()
+    if (refCount == null) {
+      return -1L;
+    }
+
+    long tid = Thread.currentThread().getId();
+
+    // Put the new mapping if absent, atomically
+    threadMap.putIfAbsent(tid, 0L);
+
+    // Update the value and do some checks, atomically
+    threadMap.computeIfPresent(tid, (k, v) -> {
+      long newVal = v + 1;
+      Preconditions.checkState(newVal > 0L, "Thread reference count overflown");
+
+      long newValTotal = refCount.incrementAndGet();
+      Preconditions.checkState(newValTotal > 0L,
+          "Total reference count overflown");
+
+      if (refCount.get() == 1L) {
+        // ref count increased to one (from zero), remove from
+        // pendingEvictionList if added
+        parentSnapshotCache.callback(this);
+      }

Review Comment:
   This should be move out of `computeIfPresent`. I believe you did it this way to synchronize global and thread level  update. It would be cleaner if you use synchronize block on `refCount`.
   
   ```
       synchronized (refCount) {
         threadMap.computeIfPresent(tid, (k, v) -> {
           long newVal = v + 1;
           Preconditions.checkState(newVal > 0L,
               "Thread reference count overflown");
           return newVal;
         });
   
         long newValTotal = refCount.incrementAndGet();
         Preconditions.checkState(newValTotal > 0L,
             "Total reference count overflown");
   
         if (refCount.get() == 1L) {
           // ref count increased to one (from zero), remove from
           // pendingEvictionList if added
           parentSnapshotCache.callback(this);
         }
       }
   ```
   
   same for `decrementRefCount`.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java:
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.snapshot;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheLoader;
+import org.apache.hadoop.ozone.om.IOmMetadataReader;
+import org.apache.hadoop.ozone.om.OmSnapshot;
+import org.apache.hadoop.ozone.om.OmSnapshotManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_NOT_FOUND;
+import static org.apache.hadoop.ozone.om.helpers.SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE;
+
+/**
+ * Thread-safe custom unbounded LRU cache to manage open snapshot DB instances.
+ */
+public class SnapshotCache {
+
+  static final Logger LOG = LoggerFactory.getLogger(SnapshotCache.class);
+
+  // Snapshot cache internal hash map.
+  // Key:   DB snapshot table key
+  // Value: OmSnapshot instance, each holds a DB instance handle inside
+  // TODO: Also wrap SoftReference<> around the value?
+  private final ConcurrentHashMap<String, ReferenceCounted<IOmMetadataReader>>
+      dbMap;
+
+  // Linked hash set that holds OmSnapshot instances whose reference count
+  // has reached zero. Those entries are eligible to be evicted and closed.
+  // Sorted in last used order.
+  // Least-recently-used entry located at the beginning.
+  // TODO: Check thread safety. Try ConcurrentHashMultiset ?
+  private final LinkedHashSet<ReferenceCounted<IOmMetadataReader>>
+      pendingEvictionList;
+  private final OmSnapshotManager omSnapshotManager;
+  private final CacheLoader<String, OmSnapshot> cacheLoader;
+  // Soft-limit of the total number of snapshot DB instances allowed to be
+  // opened on the OM.
+  private final int cacheSizeLimit;
+
+  public SnapshotCache(
+      OmSnapshotManager omSnapshotManager,
+      CacheLoader<String, OmSnapshot> cacheLoader,
+      int cacheSizeLimit) {
+    this.dbMap = new ConcurrentHashMap<>();
+    this.pendingEvictionList = new LinkedHashSet<>();
+    this.omSnapshotManager = omSnapshotManager;
+    this.cacheLoader = cacheLoader;
+    this.cacheSizeLimit = cacheSizeLimit;
+  }
+
+  @VisibleForTesting
+  ConcurrentHashMap<String, ReferenceCounted<IOmMetadataReader>> getDbMap() {
+    return dbMap;
+  }
+
+  @VisibleForTesting
+  LinkedHashSet<ReferenceCounted<IOmMetadataReader>> getPendingEvictionList() {
+    return pendingEvictionList;
+  }
+
+  /**
+   * @return number of DB instances currently held in cache.
+   */
+  public int size() {
+    return dbMap.size();
+  }
+
+  /**
+   * Immediately invalidate an entry.
+   * @param key DB snapshot table key
+   */
+  public void invalidate(String key) throws IOException {
+    dbMap.computeIfPresent(key, (k, v) -> {
+      pendingEvictionList.remove(v);
+      try {
+        ((OmSnapshot) v.get()).close();
+      } catch (IOException e) {
+        throw new IllegalStateException("Failed to close snapshot: " + key, e);
+      }
+      // Remove the entry from map by returning null
+      return null;
+    });
+  }
+
+  /**
+   * Immediately invalidate all entries and close their DB instances in cache.
+   */
+  public void invalidateAll() {
+    Iterator<Map.Entry<String, ReferenceCounted<IOmMetadataReader>>>
+        it = dbMap.entrySet().iterator();
+
+    while (it.hasNext()) {
+      Map.Entry<String, ReferenceCounted<IOmMetadataReader>> entry = it.next();
+      pendingEvictionList.remove(entry.getValue());
+      OmSnapshot omSnapshot = (OmSnapshot) entry.getValue().get();
+      try {
+        // TODO: If wrapped with SoftReference<>, omSnapshot could be null?
+        omSnapshot.close();
+      } catch (IOException e) {
+        throw new IllegalStateException("Failed to close snapshot", e);
+      }
+      it.remove();
+    }
+  }
+
+  /**
+   * State the reason the current thread is getting the OmSnapshot instance.
+   */
+  public enum Reason {
+    FS_API_READ,
+    SNAPDIFF_READ,
+    DEEP_CLEAN_WRITE,
+    GARBAGE_COLLECTION_WRITE
+  }
+
+
+  public ReferenceCounted<IOmMetadataReader> get(String key)
+      throws IOException {
+    return get(key, false);
+  }
+
+  /**
+   * Get or load OmSnapshot. Shall be close()d after use.
+   * TODO: [SNAPSHOT] Can add reason enum to param list later.
+   * @param key snapshot table key
+   * @return an OmSnapshot instance, or null on error
+   */
+  public ReferenceCounted<IOmMetadataReader> get(String key,
+      boolean skipActiveCheck) throws IOException {
+    // Atomic operation to initialize the OmSnapshot instance (once) if the key
+    // does not exist.
+    ReferenceCounted<IOmMetadataReader> rcOmSnapshot =
+        dbMap.computeIfAbsent(key, k -> {
+          LOG.info("Loading snapshot. Table key: {}", k);
+          try {
+            return new ReferenceCounted<>(cacheLoader.load(k), false, this);
+          } catch (OMException omEx) {
+            // Return null if the snapshot is no longer active
+            if (!omEx.getResult().equals(FILE_NOT_FOUND)) {
+              throw new IllegalStateException(omEx);
+            }
+          } catch (IOException ioEx) {
+            // Failed to load snapshot DB
+            throw new IllegalStateException(ioEx);
+          } catch (Exception ex) {
+            // Unexpected and unknown exception thrown from CacheLoader#load
+            throw new IllegalStateException(ex);
+          }
+          // Do not put the value in the map on exception
+          return null;
+        });
+
+    if (rcOmSnapshot == null) {
+      // The only exception that would fall through the loader logic above
+      // is OMException with FILE_NOT_FOUND.
+      throw new OMException("Snapshot table key '" + key + "' not found, "
+          + "or the snapshot is no longer active",
+          OMException.ResultCodes.FILE_NOT_FOUND);
+    }
+
+    // If the snapshot is already loaded in cache, the check inside the loader
+    // above is ignored. But we would still want to reject all get()s except
+    // when called from SDT (and some) if the snapshot is not active any more.
+    if (!omSnapshotManager.isSnapshotStatus(key, SNAPSHOT_ACTIVE) &&
+        !skipActiveCheck) {
+      throw new OMException("Unable to load snapshot. " +
+          "Snapshot with table key '" + key + "' is no longer active",
+          FILE_NOT_FOUND);
+    }
+
+    // Increment the reference count on the instance.
+    rcOmSnapshot.incrementRefCount();
+
+    // Remove instance from clean up list when it exists.
+    // TODO: [SNAPSHOT] Check thread safety with release()
+    pendingEvictionList.remove(rcOmSnapshot);
+
+    // Check if any entries can be cleaned up.
+    // At this point, cache size might temporarily exceed cacheSizeLimit
+    // even if there are entries that can be evicted, which is fine since it
+    // is a soft limit.
+    cleanup();
+
+    return rcOmSnapshot;
+  }
+
+  /**
+   * Release the reference count on the OmSnapshot instance.
+   * @param key snapshot table key
+   */
+  public void release(String key) {
+    ReferenceCounted<IOmMetadataReader> rcOmSnapshot = dbMap.get(key);
+    Preconditions.checkNotNull(rcOmSnapshot,
+        "Key '" + key + "' does not exist in cache");
+
+    if (rcOmSnapshot.decrementRefCount() == 0L) {
+      // Eligible to be closed, add it to the list.
+      pendingEvictionList.add(rcOmSnapshot);
+      cleanup();
+    }
+  }
+
+  /**
+   * Alternatively, can release with OmSnapshot instance directly.
+   * @param omSnapshot OmSnapshot
+   */
+  public void release(OmSnapshot omSnapshot) {
+    final String key = omSnapshot.getSnapshotTableKey();
+    release(key);
+  }
+
+  /**
+   * Callback method used to enqueue or dequeue ReferenceCounted from
+   * pendingEvictionList.
+   * @param referenceCounted ReferenceCounted object
+   */
+  public void callback(ReferenceCounted referenceCounted) {
+    if (referenceCounted.getTotalRefCount() == 0L) {
+      // Reference count reaches zero, add to pendingEvictionList
+      Preconditions.checkState(!pendingEvictionList.contains(referenceCounted),
+          "SnapshotCache is inconsistent. Entry should not be in the "
+              + "pendingEvictionList when ref count just reached zero.");
+      pendingEvictionList.add(referenceCounted);
+    } else if (referenceCounted.getTotalRefCount() == 1L) {
+      pendingEvictionList.remove(referenceCounted);
+    }
+  }
+
+  /**
+   * If cache size exceeds soft limit, attempt to clean up and close the
+   * instances that has zero reference count.
+   * TODO: [SNAPSHOT] Add new ozone debug CLI command to trigger this directly.
+   */
+  private void cleanup() {
+    long numEntriesToEvict = (long) dbMap.size() - cacheSizeLimit;
+    while (pendingEvictionList.size() > 0 && numEntriesToEvict > 0L) {

Review Comment:
   nit: fail fast
   ```suggestion
       while (numEntriesToEvict > 0L && pendingEvictionList.size() > 0) { 
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java:
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.snapshot;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheLoader;
+import org.apache.hadoop.ozone.om.IOmMetadataReader;
+import org.apache.hadoop.ozone.om.OmSnapshot;
+import org.apache.hadoop.ozone.om.OmSnapshotManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_NOT_FOUND;
+import static org.apache.hadoop.ozone.om.helpers.SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE;
+
+/**
+ * Thread-safe custom unbounded LRU cache to manage open snapshot DB instances.
+ */
+public class SnapshotCache {
+
+  static final Logger LOG = LoggerFactory.getLogger(SnapshotCache.class);
+
+  // Snapshot cache internal hash map.
+  // Key:   DB snapshot table key
+  // Value: OmSnapshot instance, each holds a DB instance handle inside
+  // TODO: Also wrap SoftReference<> around the value?
+  private final ConcurrentHashMap<String, ReferenceCounted<IOmMetadataReader>>
+      dbMap;
+
+  // Linked hash set that holds OmSnapshot instances whose reference count
+  // has reached zero. Those entries are eligible to be evicted and closed.
+  // Sorted in last used order.
+  // Least-recently-used entry located at the beginning.
+  // TODO: Check thread safety. Try ConcurrentHashMultiset ?
+  private final LinkedHashSet<ReferenceCounted<IOmMetadataReader>>
+      pendingEvictionList;
+  private final OmSnapshotManager omSnapshotManager;
+  private final CacheLoader<String, OmSnapshot> cacheLoader;
+  // Soft-limit of the total number of snapshot DB instances allowed to be
+  // opened on the OM.
+  private final int cacheSizeLimit;
+
+  public SnapshotCache(
+      OmSnapshotManager omSnapshotManager,
+      CacheLoader<String, OmSnapshot> cacheLoader,
+      int cacheSizeLimit) {
+    this.dbMap = new ConcurrentHashMap<>();
+    this.pendingEvictionList = new LinkedHashSet<>();
+    this.omSnapshotManager = omSnapshotManager;
+    this.cacheLoader = cacheLoader;
+    this.cacheSizeLimit = cacheSizeLimit;
+  }
+
+  @VisibleForTesting
+  ConcurrentHashMap<String, ReferenceCounted<IOmMetadataReader>> getDbMap() {
+    return dbMap;
+  }
+
+  @VisibleForTesting
+  LinkedHashSet<ReferenceCounted<IOmMetadataReader>> getPendingEvictionList() {
+    return pendingEvictionList;
+  }
+
+  /**
+   * @return number of DB instances currently held in cache.
+   */
+  public int size() {
+    return dbMap.size();
+  }
+
+  /**
+   * Immediately invalidate an entry.
+   * @param key DB snapshot table key
+   */
+  public void invalidate(String key) throws IOException {
+    dbMap.computeIfPresent(key, (k, v) -> {
+      pendingEvictionList.remove(v);
+      try {
+        ((OmSnapshot) v.get()).close();
+      } catch (IOException e) {
+        throw new IllegalStateException("Failed to close snapshot: " + key, e);
+      }
+      // Remove the entry from map by returning null
+      return null;
+    });
+  }
+
+  /**
+   * Immediately invalidate all entries and close their DB instances in cache.
+   */
+  public void invalidateAll() {
+    Iterator<Map.Entry<String, ReferenceCounted<IOmMetadataReader>>>
+        it = dbMap.entrySet().iterator();
+
+    while (it.hasNext()) {
+      Map.Entry<String, ReferenceCounted<IOmMetadataReader>> entry = it.next();
+      pendingEvictionList.remove(entry.getValue());
+      OmSnapshot omSnapshot = (OmSnapshot) entry.getValue().get();
+      try {
+        // TODO: If wrapped with SoftReference<>, omSnapshot could be null?
+        omSnapshot.close();
+      } catch (IOException e) {
+        throw new IllegalStateException("Failed to close snapshot", e);
+      }
+      it.remove();
+    }
+  }
+
+  /**
+   * State the reason the current thread is getting the OmSnapshot instance.
+   */
+  public enum Reason {
+    FS_API_READ,
+    SNAPDIFF_READ,
+    DEEP_CLEAN_WRITE,
+    GARBAGE_COLLECTION_WRITE
+  }
+
+
+  public ReferenceCounted<IOmMetadataReader> get(String key)
+      throws IOException {
+    return get(key, false);
+  }
+
+  /**
+   * Get or load OmSnapshot. Shall be close()d after use.
+   * TODO: [SNAPSHOT] Can add reason enum to param list later.
+   * @param key snapshot table key
+   * @return an OmSnapshot instance, or null on error
+   */
+  public ReferenceCounted<IOmMetadataReader> get(String key,
+      boolean skipActiveCheck) throws IOException {
+    // Atomic operation to initialize the OmSnapshot instance (once) if the key
+    // does not exist.
+    ReferenceCounted<IOmMetadataReader> rcOmSnapshot =
+        dbMap.computeIfAbsent(key, k -> {
+          LOG.info("Loading snapshot. Table key: {}", k);
+          try {
+            return new ReferenceCounted<>(cacheLoader.load(k), false, this);
+          } catch (OMException omEx) {
+            // Return null if the snapshot is no longer active
+            if (!omEx.getResult().equals(FILE_NOT_FOUND)) {
+              throw new IllegalStateException(omEx);
+            }
+          } catch (IOException ioEx) {
+            // Failed to load snapshot DB
+            throw new IllegalStateException(ioEx);
+          } catch (Exception ex) {
+            // Unexpected and unknown exception thrown from CacheLoader#load
+            throw new IllegalStateException(ex);
+          }
+          // Do not put the value in the map on exception
+          return null;
+        });
+
+    if (rcOmSnapshot == null) {
+      // The only exception that would fall through the loader logic above
+      // is OMException with FILE_NOT_FOUND.
+      throw new OMException("Snapshot table key '" + key + "' not found, "
+          + "or the snapshot is no longer active",
+          OMException.ResultCodes.FILE_NOT_FOUND);
+    }
+
+    // If the snapshot is already loaded in cache, the check inside the loader
+    // above is ignored. But we would still want to reject all get()s except
+    // when called from SDT (and some) if the snapshot is not active any more.
+    if (!omSnapshotManager.isSnapshotStatus(key, SNAPSHOT_ACTIVE) &&
+        !skipActiveCheck) {
+      throw new OMException("Unable to load snapshot. " +
+          "Snapshot with table key '" + key + "' is no longer active",
+          FILE_NOT_FOUND);
+    }
+
+    // Increment the reference count on the instance.
+    rcOmSnapshot.incrementRefCount();
+
+    // Remove instance from clean up list when it exists.
+    // TODO: [SNAPSHOT] Check thread safety with release()
+    pendingEvictionList.remove(rcOmSnapshot);
+
+    // Check if any entries can be cleaned up.
+    // At this point, cache size might temporarily exceed cacheSizeLimit
+    // even if there are entries that can be evicted, which is fine since it
+    // is a soft limit.
+    cleanup();
+
+    return rcOmSnapshot;
+  }
+
+  /**
+   * Release the reference count on the OmSnapshot instance.
+   * @param key snapshot table key
+   */
+  public void release(String key) {
+    ReferenceCounted<IOmMetadataReader> rcOmSnapshot = dbMap.get(key);
+    Preconditions.checkNotNull(rcOmSnapshot,
+        "Key '" + key + "' does not exist in cache");
+
+    if (rcOmSnapshot.decrementRefCount() == 0L) {
+      // Eligible to be closed, add it to the list.
+      pendingEvictionList.add(rcOmSnapshot);
+      cleanup();
+    }
+  }
+
+  /**
+   * Alternatively, can release with OmSnapshot instance directly.
+   * @param omSnapshot OmSnapshot
+   */
+  public void release(OmSnapshot omSnapshot) {
+    final String key = omSnapshot.getSnapshotTableKey();
+    release(key);
+  }
+
+  /**
+   * Callback method used to enqueue or dequeue ReferenceCounted from
+   * pendingEvictionList.
+   * @param referenceCounted ReferenceCounted object
+   */
+  public void callback(ReferenceCounted referenceCounted) {

Review Comment:
   Make it typed.
   ```suggestion
     public void callback(ReferenceCounted<IOmMetadataReader> referenceCounted)
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java:
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.snapshot;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheLoader;
+import org.apache.hadoop.ozone.om.IOmMetadataReader;
+import org.apache.hadoop.ozone.om.OmSnapshot;
+import org.apache.hadoop.ozone.om.OmSnapshotManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_NOT_FOUND;
+import static org.apache.hadoop.ozone.om.helpers.SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE;
+
+/**
+ * Thread-safe custom unbounded LRU cache to manage open snapshot DB instances.
+ */
+public class SnapshotCache {
+
+  static final Logger LOG = LoggerFactory.getLogger(SnapshotCache.class);
+
+  // Snapshot cache internal hash map.
+  // Key:   DB snapshot table key
+  // Value: OmSnapshot instance, each holds a DB instance handle inside
+  // TODO: Also wrap SoftReference<> around the value?
+  private final ConcurrentHashMap<String, ReferenceCounted<IOmMetadataReader>>
+      dbMap;
+
+  // Linked hash set that holds OmSnapshot instances whose reference count
+  // has reached zero. Those entries are eligible to be evicted and closed.
+  // Sorted in last used order.
+  // Least-recently-used entry located at the beginning.
+  // TODO: Check thread safety. Try ConcurrentHashMultiset ?
+  private final LinkedHashSet<ReferenceCounted<IOmMetadataReader>>
+      pendingEvictionList;
+  private final OmSnapshotManager omSnapshotManager;
+  private final CacheLoader<String, OmSnapshot> cacheLoader;
+  // Soft-limit of the total number of snapshot DB instances allowed to be
+  // opened on the OM.
+  private final int cacheSizeLimit;
+
+  public SnapshotCache(
+      OmSnapshotManager omSnapshotManager,
+      CacheLoader<String, OmSnapshot> cacheLoader,
+      int cacheSizeLimit) {
+    this.dbMap = new ConcurrentHashMap<>();
+    this.pendingEvictionList = new LinkedHashSet<>();
+    this.omSnapshotManager = omSnapshotManager;
+    this.cacheLoader = cacheLoader;
+    this.cacheSizeLimit = cacheSizeLimit;
+  }
+
+  @VisibleForTesting
+  ConcurrentHashMap<String, ReferenceCounted<IOmMetadataReader>> getDbMap() {
+    return dbMap;
+  }
+
+  @VisibleForTesting
+  LinkedHashSet<ReferenceCounted<IOmMetadataReader>> getPendingEvictionList() {
+    return pendingEvictionList;
+  }
+
+  /**
+   * @return number of DB instances currently held in cache.
+   */
+  public int size() {
+    return dbMap.size();
+  }
+
+  /**
+   * Immediately invalidate an entry.
+   * @param key DB snapshot table key
+   */
+  public void invalidate(String key) throws IOException {
+    dbMap.computeIfPresent(key, (k, v) -> {
+      pendingEvictionList.remove(v);
+      try {
+        ((OmSnapshot) v.get()).close();
+      } catch (IOException e) {
+        throw new IllegalStateException("Failed to close snapshot: " + key, e);
+      }
+      // Remove the entry from map by returning null
+      return null;
+    });
+  }
+
+  /**
+   * Immediately invalidate all entries and close their DB instances in cache.
+   */
+  public void invalidateAll() {
+    Iterator<Map.Entry<String, ReferenceCounted<IOmMetadataReader>>>
+        it = dbMap.entrySet().iterator();
+
+    while (it.hasNext()) {
+      Map.Entry<String, ReferenceCounted<IOmMetadataReader>> entry = it.next();
+      pendingEvictionList.remove(entry.getValue());
+      OmSnapshot omSnapshot = (OmSnapshot) entry.getValue().get();
+      try {
+        // TODO: If wrapped with SoftReference<>, omSnapshot could be null?
+        omSnapshot.close();
+      } catch (IOException e) {
+        throw new IllegalStateException("Failed to close snapshot", e);
+      }
+      it.remove();
+    }
+  }
+
+  /**
+   * State the reason the current thread is getting the OmSnapshot instance.
+   */
+  public enum Reason {
+    FS_API_READ,
+    SNAPDIFF_READ,
+    DEEP_CLEAN_WRITE,
+    GARBAGE_COLLECTION_WRITE
+  }
+
+
+  public ReferenceCounted<IOmMetadataReader> get(String key)
+      throws IOException {
+    return get(key, false);
+  }
+
+  /**
+   * Get or load OmSnapshot. Shall be close()d after use.
+   * TODO: [SNAPSHOT] Can add reason enum to param list later.
+   * @param key snapshot table key
+   * @return an OmSnapshot instance, or null on error
+   */
+  public ReferenceCounted<IOmMetadataReader> get(String key,
+      boolean skipActiveCheck) throws IOException {
+    // Atomic operation to initialize the OmSnapshot instance (once) if the key
+    // does not exist.
+    ReferenceCounted<IOmMetadataReader> rcOmSnapshot =
+        dbMap.computeIfAbsent(key, k -> {
+          LOG.info("Loading snapshot. Table key: {}", k);
+          try {
+            return new ReferenceCounted<>(cacheLoader.load(k), false, this);
+          } catch (OMException omEx) {
+            // Return null if the snapshot is no longer active
+            if (!omEx.getResult().equals(FILE_NOT_FOUND)) {
+              throw new IllegalStateException(omEx);
+            }
+          } catch (IOException ioEx) {
+            // Failed to load snapshot DB
+            throw new IllegalStateException(ioEx);
+          } catch (Exception ex) {
+            // Unexpected and unknown exception thrown from CacheLoader#load
+            throw new IllegalStateException(ex);
+          }
+          // Do not put the value in the map on exception
+          return null;
+        });
+
+    if (rcOmSnapshot == null) {
+      // The only exception that would fall through the loader logic above
+      // is OMException with FILE_NOT_FOUND.
+      throw new OMException("Snapshot table key '" + key + "' not found, "
+          + "or the snapshot is no longer active",
+          OMException.ResultCodes.FILE_NOT_FOUND);
+    }
+
+    // If the snapshot is already loaded in cache, the check inside the loader
+    // above is ignored. But we would still want to reject all get()s except
+    // when called from SDT (and some) if the snapshot is not active any more.
+    if (!omSnapshotManager.isSnapshotStatus(key, SNAPSHOT_ACTIVE) &&
+        !skipActiveCheck) {
+      throw new OMException("Unable to load snapshot. " +
+          "Snapshot with table key '" + key + "' is no longer active",
+          FILE_NOT_FOUND);
+    }
+
+    // Increment the reference count on the instance.
+    rcOmSnapshot.incrementRefCount();
+
+    // Remove instance from clean up list when it exists.
+    // TODO: [SNAPSHOT] Check thread safety with release()

Review Comment:
   I'm guessing you meant the `pendingEvictionList.remove(result);` in cleanup().



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org