You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by GitBox <gi...@apache.org> on 2022/10/25 14:49:32 UTC

[GitHub] [ozone] sadanand48 opened a new pull request, #3883: HDDS-6962.[Snapshot] Background Service to delete irrelevant SST files in a snapshot.

sadanand48 opened a new pull request, #3883:
URL: https://github.com/apache/ozone/pull/3883

   ## What changes were proposed in this pull request?
   This PR introduces a background service `SSTFilteringService` to delete irrelevant SST files of a snapshot, It makes use of RocksDB#deleteFile
   API to do so which only permits the deletion of last level of SST files ie if there are n levels of SST's at a given point , the service will delete only nth level SST's. This is still beneficial as the last level SST's are the bulkiest and deletion of these can help save space. On successful deletion , it will update a marker file `filtered-snapshots` and write the snapshot ID of the processed Snapshot to it. Only the` keyTable`, `fileTable` & `directoryTable` keys are deleted as they can grow very large compared to other tables.
   
   ## What is the link to the Apache JIRA
   https://issues.apache.org/jira/browse/HDDS-6962
   
   ## How was this patch tested?
   Unit tests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sadanand48 commented on pull request #3883: HDDS-6962. [Snapshot] Background Service to delete irrelevant SST files in a snapshot.

Posted by GitBox <gi...@apache.org>.
sadanand48 commented on PR #3883:
URL: https://github.com/apache/ozone/pull/3883#issuecomment-1302168545

   > This surprises me. I always thought the first level was the bulkiest. Is there any documentation you can point me to?
   
   Compaction is triggered when  number of files in a level reaches a threshold size and the threshold size increases as the levels increase so the last level files will be biggest in size
   https://github.com/EighteenZi/rocksdb_wiki/blob/master/Leveled-Compaction.md#levels-target-size
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sadanand48 merged pull request #3883: HDDS-6962. [Snapshot] Background Service to delete irrelevant SST files in a snapshot.

Posted by GitBox <gi...@apache.org>.
sadanand48 merged PR #3883:
URL: https://github.com/apache/ozone/pull/3883


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sadanand48 commented on a diff in pull request #3883: HDDS-6962. [Snapshot] Background Service to delete irrelevant SST files in a snapshot.

Posted by GitBox <gi...@apache.org>.
sadanand48 commented on code in PR #3883:
URL: https://github.com/apache/ozone/pull/3883#discussion_r1017829544


##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java:
##########
@@ -377,6 +407,43 @@ public void compactRange() throws IOException {
     }
   }
 
+  /**
+   * @param cfName columnFamily on which compaction will run.
+   * @throws IOException
+   */
+  public void compactRange(String cfName) throws IOException {
+    ColumnFamilyHandle handle = null;
+    handle = getColumnFamilyHandle(cfName, handle);
+    try {
+      if (handle != null) {
+        db.get().compactRange(handle);
+      } else {
+        LOG.error("Provided column family doesn't exist."
+            + " Calling compactRange on null columnFamily");
+        db.get().compactRange();
+      }
+    } catch (RocksDBException e) {
+      closeOnError(e);
+      throw toIOException(this, "compactRange", e);
+    }
+  }
+
+  private ColumnFamilyHandle getColumnFamilyHandle(String cfName,

Review Comment:
   Done.



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java:
##########
@@ -359,6 +367,28 @@ public void flush() throws IOException {
     }
   }
 
+  /**
+   * @param cfName columnFamily on which flush will run.
+   * @throws IOException
+   */
+  public void flush(String cfName) throws IOException {
+    ColumnFamilyHandle handle = null;
+    handle = getColumnFamilyHandle(cfName, handle);

Review Comment:
   Done.



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java:
##########
@@ -523,4 +590,62 @@ public String toString() {
     return name;
   }
 
+  @VisibleForTesting
+  public List<LiveFileMetaData> getSstFileList() {
+    return db.get().getLiveFilesMetaData();
+  }
+
+  /**
+   * return the max compaction level of sst files in the db.
+   * @return level
+   */
+  private int getLastLevel() {
+    return getSstFileList().stream()
+        .max(Comparator.comparing(LiveFileMetaData::level)).get().level();
+  }
+
+  /**
+   * Deletes sst files which do not correspond to prefix
+   * for given table.
+   * @param prefixPairs , a list of pairs (TableName,prefixUsed).

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sadanand48 commented on a diff in pull request #3883: HDDS-6962. [Snapshot] Background Service to delete irrelevant SST files in a snapshot.

Posted by GitBox <gi...@apache.org>.
sadanand48 commented on code in PR #3883:
URL: https://github.com/apache/ozone/pull/3883#discussion_r1017088131


##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -706,9 +718,48 @@ public synchronized List<String> getSSTDiffList(
       LOG.debug("{}", logSB);
     }
 
+/*    if (src.getTablePrefixes() != null && !src.getTablePrefixes().isEmpty()) {
+    //  filterRelevantSstFiles(fwdDAGDifferentFiles, src.getTablePrefixes());

Review Comment:
   Currently disabling filtering from DAG output, will uncomment once thoroughly tested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sadanand48 commented on a diff in pull request #3883: HDDS-6962. [Snapshot] Background Service to delete irrelevant SST files in a snapshot.

Posted by GitBox <gi...@apache.org>.
sadanand48 commented on code in PR #3883:
URL: https://github.com/apache/ozone/pull/3883#discussion_r1025467344


##########
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestSstFilteringService.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.ozone.om;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.hdds.utils.db.DBConfigFromFile;
+import org.apache.hadoop.hdds.utils.db.DBProfile;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.util.ExitUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.LiveFileMetaData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DB_PROFILE;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL;
+
+/**
+ * Test SST Filtering Service.
+ */
+public class TestSstFilteringService {
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+  private OzoneManagerProtocol writeClient;
+  private OzoneManager om;
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestSstFilteringService.class);
+
+  @BeforeClass
+  public static void setup() {
+    ExitUtils.disableSystemExit();
+  }
+
+  private OzoneConfiguration createConfAndInitValues() throws IOException {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    File newFolder = folder.newFolder();
+    if (!newFolder.exists()) {
+      Assert.assertTrue(newFolder.mkdirs());
+    }
+    System.setProperty(DBConfigFromFile.CONFIG_DIR, "/");
+    ServerUtils.setOzoneMetaDirPath(conf, newFolder.toString());
+    conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200,
+        TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, 100,
+        TimeUnit.MILLISECONDS);
+    conf.setEnum(HDDS_DB_PROFILE, DBProfile.TEST);
+    conf.setQuietMode(false);
+
+    return conf;
+  }
+
+  @After
+  public void cleanup() throws Exception {
+    om.stop();
+  }
+
+  /**
+   * Test checks whether for existing snapshots
+   * the checkpoint should not have any sst files that do not correspond to
+   * the bucket on which create snapshot command was issued.
+   *
+   * The SSTFiltering service deletes only the last level of
+   * sst file (rocksdb behaviour).
+   *
+   * 1. Create Keys for vol1/buck1 (L0 ssts will be created for vol1/buck1)
+   * 2. compact the db (new level SSTS will be created for vol1/buck1)
+   * 3. Create keys for vol1/buck2 (L0 ssts will be created for vol1/buck2)
+   * 4. Take snapshot on vol1/buck2.
+   * 5. The snapshot will contain compacted sst files pertaining to vol1/buck1
+   *    Wait till the BG service deletes these.
+   *
+   * @throws IOException - on Failure.
+   */
+
+  @Test
+  public void testIrrelevantSstFileDeletion()

Review Comment:
   I only checked for the non-existence of the irrelevant files , will add an assertion to check existence of the relevant files.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] prashantpogde commented on pull request #3883: HDDS-6962. [Snapshot] Background Service to delete irrelevant SST files in a snapshot.

Posted by GitBox <gi...@apache.org>.
prashantpogde commented on PR #3883:
URL: https://github.com/apache/ozone/pull/3883#issuecomment-1331358650

   @sadanand48 can we resolve the pending comments and merge this PR ? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sadanand48 commented on a diff in pull request #3883: HDDS-6962. [Snapshot] Background Service to delete irrelevant SST files in a snapshot.

Posted by GitBox <gi...@apache.org>.
sadanand48 commented on code in PR #3883:
URL: https://github.com/apache/ozone/pull/3883#discussion_r1017830758


##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.ozone.om;
+
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.BooleanTriFunction;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.hdds.utils.db.RocksDatabase;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.ozone.OzoneConsts.FILTERED_SNAPSHOTS;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_SST_DELETING_LIMIT_PER_TASK;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_SST_DELETING_LIMIT_PER_TASK_DEFAULT;
+
+/**
+ * When snapshots are taken, an entire snapshot of the
+ * OM RocksDB is captured and it will contain SST files corresponding
+ * to all volumes/buckets and keys and also have data from
+ * all the tables (columnFamilies) defined in the rocksdb
+ * This is a background service which will cleanup and filter out
+ * all the irrelevant and safe to delete sst files that don't correspond
+ * to the bucket on which the snapshot was taken.
+ */
+public class SstFilteringService extends BackgroundService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SstFilteringService.class);
+
+  // Use only a single thread for SST deletion. Multiple threads would read
+  // or write to same snapshots and can send deletion requests for same sst
+  // multiple times.
+  private static final int SST_FILTERING_CORE_POOL_SIZE = 1;
+
+  private final OzoneManager ozoneManager;
+
+  // Number of files to be batched in an iteration.
+  private final long snapshotLimitPerTask;
+
+  private AtomicLong snapshotFilteredCount;
+
+  private BooleanTriFunction<String, String, String, Boolean> filterFunction =
+      (first, last, prefix) -> {
+        String firstBucketKey = constructBucketKey(first);
+        String lastBucketKey = constructBucketKey(last);
+        return firstBucketKey.compareTo(prefix) <= 0
+            && prefix.compareTo(lastBucketKey) <= 0;
+      };
+
+  public SstFilteringService(long interval, TimeUnit unit, long serviceTimeout,
+      OzoneManager ozoneManager, OzoneConfiguration configuration) {
+    super("SstFilteringService", interval, unit, SST_FILTERING_CORE_POOL_SIZE,
+        serviceTimeout);
+    this.ozoneManager = ozoneManager;
+    this.snapshotLimitPerTask = configuration
+        .getLong(SNAPSHOT_SST_DELETING_LIMIT_PER_TASK,
+            SNAPSHOT_SST_DELETING_LIMIT_PER_TASK_DEFAULT);
+    snapshotFilteredCount = new AtomicLong(0);
+  }
+
+  private class SSTFilteringTask implements BackgroundTask {
+
+    @Override
+    public BackgroundTaskResult call() throws Exception {
+
+      Table<String, SnapshotInfo> snapshotInfoTable =
+          ozoneManager.getMetadataManager().getSnapshotInfoTable();
+      try (
+          TableIterator<String, ? extends Table.KeyValue
+              <String, SnapshotInfo>> iterator = snapshotInfoTable
+              .iterator()) {
+        iterator.seekToFirst();
+
+        long snapshotLimit = snapshotLimitPerTask;
+
+        while (iterator.hasNext() && snapshotLimit > 0) {

Review Comment:
   Done.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.ozone.om;
+
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.BooleanTriFunction;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.hdds.utils.db.RocksDatabase;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.ozone.OzoneConsts.FILTERED_SNAPSHOTS;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_SST_DELETING_LIMIT_PER_TASK;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_SST_DELETING_LIMIT_PER_TASK_DEFAULT;
+
+/**
+ * When snapshots are taken, an entire snapshot of the
+ * OM RocksDB is captured and it will contain SST files corresponding
+ * to all volumes/buckets and keys and also have data from
+ * all the tables (columnFamilies) defined in the rocksdb
+ * This is a background service which will cleanup and filter out
+ * all the irrelevant and safe to delete sst files that don't correspond
+ * to the bucket on which the snapshot was taken.
+ */
+public class SstFilteringService extends BackgroundService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SstFilteringService.class);
+
+  // Use only a single thread for SST deletion. Multiple threads would read
+  // or write to same snapshots and can send deletion requests for same sst
+  // multiple times.
+  private static final int SST_FILTERING_CORE_POOL_SIZE = 1;
+
+  private final OzoneManager ozoneManager;
+
+  // Number of files to be batched in an iteration.
+  private final long snapshotLimitPerTask;
+
+  private AtomicLong snapshotFilteredCount;
+
+  private BooleanTriFunction<String, String, String, Boolean> filterFunction =
+      (first, last, prefix) -> {
+        String firstBucketKey = constructBucketKey(first);
+        String lastBucketKey = constructBucketKey(last);
+        return firstBucketKey.compareTo(prefix) <= 0
+            && prefix.compareTo(lastBucketKey) <= 0;
+      };
+
+  public SstFilteringService(long interval, TimeUnit unit, long serviceTimeout,
+      OzoneManager ozoneManager, OzoneConfiguration configuration) {
+    super("SstFilteringService", interval, unit, SST_FILTERING_CORE_POOL_SIZE,
+        serviceTimeout);
+    this.ozoneManager = ozoneManager;
+    this.snapshotLimitPerTask = configuration
+        .getLong(SNAPSHOT_SST_DELETING_LIMIT_PER_TASK,
+            SNAPSHOT_SST_DELETING_LIMIT_PER_TASK_DEFAULT);
+    snapshotFilteredCount = new AtomicLong(0);
+  }
+
+  private class SSTFilteringTask implements BackgroundTask {

Review Comment:
   Done.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.ozone.om;
+
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.BooleanTriFunction;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.hdds.utils.db.RocksDatabase;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.ozone.OzoneConsts.FILTERED_SNAPSHOTS;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_SST_DELETING_LIMIT_PER_TASK;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_SST_DELETING_LIMIT_PER_TASK_DEFAULT;
+
+/**
+ * When snapshots are taken, an entire snapshot of the
+ * OM RocksDB is captured and it will contain SST files corresponding
+ * to all volumes/buckets and keys and also have data from
+ * all the tables (columnFamilies) defined in the rocksdb
+ * This is a background service which will cleanup and filter out
+ * all the irrelevant and safe to delete sst files that don't correspond
+ * to the bucket on which the snapshot was taken.
+ */
+public class SstFilteringService extends BackgroundService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SstFilteringService.class);
+
+  // Use only a single thread for SST deletion. Multiple threads would read
+  // or write to same snapshots and can send deletion requests for same sst
+  // multiple times.
+  private static final int SST_FILTERING_CORE_POOL_SIZE = 1;
+
+  private final OzoneManager ozoneManager;
+
+  // Number of files to be batched in an iteration.
+  private final long snapshotLimitPerTask;
+
+  private AtomicLong snapshotFilteredCount;
+
+  private BooleanTriFunction<String, String, String, Boolean> filterFunction =
+      (first, last, prefix) -> {
+        String firstBucketKey = constructBucketKey(first);
+        String lastBucketKey = constructBucketKey(last);
+        return firstBucketKey.compareTo(prefix) <= 0
+            && prefix.compareTo(lastBucketKey) <= 0;
+      };
+
+  public SstFilteringService(long interval, TimeUnit unit, long serviceTimeout,
+      OzoneManager ozoneManager, OzoneConfiguration configuration) {
+    super("SstFilteringService", interval, unit, SST_FILTERING_CORE_POOL_SIZE,
+        serviceTimeout);
+    this.ozoneManager = ozoneManager;
+    this.snapshotLimitPerTask = configuration
+        .getLong(SNAPSHOT_SST_DELETING_LIMIT_PER_TASK,
+            SNAPSHOT_SST_DELETING_LIMIT_PER_TASK_DEFAULT);
+    snapshotFilteredCount = new AtomicLong(0);
+  }
+
+  private class SSTFilteringTask implements BackgroundTask {
+
+    @Override
+    public BackgroundTaskResult call() throws Exception {
+
+      Table<String, SnapshotInfo> snapshotInfoTable =
+          ozoneManager.getMetadataManager().getSnapshotInfoTable();
+      try (
+          TableIterator<String, ? extends Table.KeyValue
+              <String, SnapshotInfo>> iterator = snapshotInfoTable
+              .iterator()) {
+        iterator.seekToFirst();
+
+        long snapshotLimit = snapshotLimitPerTask;
+
+        while (iterator.hasNext() && snapshotLimit > 0) {
+          Table.KeyValue<String, SnapshotInfo> keyValue = iterator.next();
+          String snapShotTableKey = keyValue.getKey();
+          SnapshotInfo snapshotInfo = keyValue.getValue();
+
+          File omMetadataDir =
+              OMStorage.getOmDbDir(ozoneManager.getConfiguration());
+          String snapshotDir = omMetadataDir + OM_KEY_PREFIX + OM_SNAPSHOT_DIR;
+          Path filePath =
+              Paths.get(snapshotDir + OM_KEY_PREFIX + FILTERED_SNAPSHOTS);
+
+          boolean isSnapshotFiltered = false;
+
+          // If entry for the snapshotID is present in this file,
+          // it has already undergone filtering.
+          if (Files.exists(filePath)) {
+            List<String> processedSnapshotIds = Files.readAllLines(filePath);
+            if (processedSnapshotIds.contains(snapshotInfo.getSnapshotID())) {
+              isSnapshotFiltered = true;
+            }
+          }
+
+          if (!isSnapshotFiltered) {
+            LOG.debug("Processing snapshot {} to filter relevant SST Files",
+                snapShotTableKey);
+
+            List<Pair<String, String>> prefixPairs =
+                constructPrefixPairs(snapshotInfo);
+
+            String dbName = OM_DB_NAME + snapshotInfo.getCheckpointDirName();
+
+            RDBStore rdbStore = (RDBStore) OmMetadataManagerImpl
+                .loadDB(ozoneManager.getConfiguration(), new File(snapshotDir),
+                    dbName, true);
+            RocksDatabase db = rdbStore.getDb();
+            db.deleteFilesNotMatchingPrefix(prefixPairs, filterFunction);
+
+            // mark the snapshot as filtered by writing to the file
+            String content = snapshotInfo.getSnapshotID() + "\n";
+            Files.write(filePath, content.getBytes(StandardCharsets.UTF_8),
+                StandardOpenOption.CREATE, StandardOpenOption.APPEND);
+            snapshotLimit--;
+            snapshotFilteredCount.getAndIncrement();
+          }
+        }
+      } catch (RocksDBException | IOException e) {
+        LOG.error("Error during Snapshot sst filtering ", e);
+      }
+
+      // nothing to return here
+      return BackgroundTaskResult.EmptyTaskResult.newResult();
+    }
+
+    /**
+     * @param snapshotInfo
+     * @return a list of pairs (tableName,keyPrefix).
+     * @throws IOException
+     */
+    private List<Pair<String, String>> constructPrefixPairs(
+        SnapshotInfo snapshotInfo) throws IOException {
+      String volumeName = snapshotInfo.getVolumeName();
+      String bucketName = snapshotInfo.getBucketName();
+
+      long volumeId = ozoneManager.getMetadataManager().getVolumeId(volumeName);
+      // TODO : buckets can be deleted via ofs , handle deletion of bucket case.
+      long bucketId =
+          ozoneManager.getMetadataManager().getBucketId(volumeName, bucketName);
+
+      String filterPrefix =
+          OM_KEY_PREFIX + volumeName + OM_KEY_PREFIX + bucketName;
+
+      String filterPrefixFSO =
+          OM_KEY_PREFIX + volumeId + OM_KEY_PREFIX + bucketId;
+
+      List<Pair<String, String>> prefixPairs = new ArrayList<>();
+      prefixPairs
+          .add(Pair.of(OmMetadataManagerImpl.KEY_TABLE, filterPrefix));
+      prefixPairs.add(
+          Pair.of(OmMetadataManagerImpl.DIRECTORY_TABLE, filterPrefixFSO));
+      prefixPairs
+          .add(Pair.of(OmMetadataManagerImpl.FILE_TABLE, filterPrefixFSO));
+      return prefixPairs;
+    }
+  }
+
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    BackgroundTaskQueue queue = new BackgroundTaskQueue();
+    queue.add(new SSTFilteringTask());
+    return queue;
+  }
+
+  public AtomicLong getSnapshotFilteredCount() {
+    return snapshotFilteredCount;
+  }
+
+  String constructBucketKey(String keyName) {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sadanand48 commented on a diff in pull request #3883: HDDS-6962. [Snapshot] Background Service to delete irrelevant SST files in a snapshot.

Posted by GitBox <gi...@apache.org>.
sadanand48 commented on code in PR #3883:
URL: https://github.com/apache/ozone/pull/3883#discussion_r1035611295


##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java:
##########
@@ -523,4 +591,64 @@ public String toString() {
     return name;
   }
 
+  @VisibleForTesting
+  public List<LiveFileMetaData> getSstFileList() {
+    return db.get().getLiveFilesMetaData();
+  }
+
+  /**
+   * return the max compaction level of sst files in the db.
+   * @return level
+   */
+  private int getLastLevel() {
+    return getSstFileList().stream()
+        .max(Comparator.comparing(LiveFileMetaData::level)).get().level();
+  }
+
+  /**
+   * Deletes sst files which do not correspond to prefix
+   * for given table.
+   * @param prefixPairs , a list of pairs (TableName,prefixUsed).
+   * @throws RocksDBException
+   */
+  public void deleteFilesNotMatchingPrefix(
+      List<Pair<String, String>> prefixPairs,
+      BooleanTriFunction<String, String, String, Boolean> filterFunction)
+      throws RocksDBException {
+    for (LiveFileMetaData liveFileMetaData : getSstFileList()) {
+      String sstFileColumnFamily =
+          new String(liveFileMetaData.columnFamilyName(),
+              StandardCharsets.UTF_8);
+      int lastLevel = getLastLevel();
+      for (Pair<String, String> prefixPair : prefixPairs) {
+        String columnFamily = prefixPair.getKey();
+        String prefixForColumnFamily = prefixPair.getValue();
+        if (sstFileColumnFamily.equals(columnFamily)) {
+          // RocksDB #deleteFile API allows only to delete the last level of
+          // SST Files. Any level < last level won't get deleted and
+          // only last file of level 0 can be deleted
+          // and will throw warning in the rocksdb manifest.
+          // Instead perform the level check here
+          // itself to avoid failed delete attempts for lower level files.
+          if (liveFileMetaData.level() == lastLevel && lastLevel != 0) {
+            String firstDbKey = new String(liveFileMetaData.smallestKey(),
+                StandardCharsets.UTF_8);
+            String lastDbKey = new String(liveFileMetaData.largestKey(),
+                StandardCharsets.UTF_8);
+            boolean isKeyWithPrefixPresent = filterFunction
+                .apply(firstDbKey, lastDbKey, prefixForColumnFamily);
+            if (!isKeyWithPrefixPresent) {
+              db.get().deleteFile(liveFileMetaData.fileName());

Review Comment:
   Done. Should INFO level be ok here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sadanand48 commented on a diff in pull request #3883: HDDS-6962. [Snapshot] Background Service to delete irrelevant SST files in a snapshot.

Posted by GitBox <gi...@apache.org>.
sadanand48 commented on code in PR #3883:
URL: https://github.com/apache/ozone/pull/3883#discussion_r1017830561


##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -1002,4 +1053,20 @@ public boolean debugEnabled(Integer level) {
   public static Logger getLog() {
     return LOG;
   }
+
+  private String constructBucketKey(String keyName) {
+    if (!keyName.startsWith("/")) {

Review Comment:
   Done used OM_KEY_PREFIX instead.



##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java:
##########
@@ -136,7 +138,24 @@ private DifferSnapshotInfo getDifferSnapshotInfo(
     // Use RocksDB transaction sequence number in SnapshotInfo, which is
     // persisted at the time of snapshot creation, as the snapshot generation
     return new DifferSnapshotInfo(checkpointPath, snapshotInfo.getSnapshotID(),
-        snapshotInfo.getDbTxSequenceNumber());
+        snapshotInfo.getDbTxSequenceNumber(),
+        getTablePrefixes(omMetadataManager, volumeName, bucketName));
+  }
+
+  private HashMap<String, String> getTablePrefixes(
+      OMMetadataManager omMetadataManager, String volumeName, String bucketName)
+      throws IOException {
+    HashMap<String, String> tablePrefixes = new HashMap<>();
+    String volumeId = String.valueOf(omMetadataManager.getVolumeId(volumeName));
+    String bucketId =
+        String.valueOf(omMetadataManager.getBucketId(volumeName, bucketName));
+    tablePrefixes.put(OmMetadataManagerImpl.KEY_TABLE,
+        "/" + volumeName + "/" + bucketName);

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sadanand48 commented on a diff in pull request #3883: HDDS-6962. [Snapshot] Background Service to delete irrelevant SST files in a snapshot.

Posted by GitBox <gi...@apache.org>.
sadanand48 commented on code in PR #3883:
URL: https://github.com/apache/ozone/pull/3883#discussion_r1017829926


##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java:
##########
@@ -523,4 +591,64 @@ public String toString() {
     return name;
   }
 
+  @VisibleForTesting
+  public List<LiveFileMetaData> getSstFileList() {
+    return db.get().getLiveFilesMetaData();
+  }
+
+  /**
+   * return the max compaction level of sst files in the db.
+   * @return level
+   */
+  private int getLastLevel() {
+    return getSstFileList().stream()
+        .max(Comparator.comparing(LiveFileMetaData::level)).get().level();
+  }
+
+  /**
+   * Deletes sst files which do not correspond to prefix
+   * for given table.
+   * @param prefixPairs , a list of pairs (TableName,prefixUsed).
+   * @throws RocksDBException
+   */
+  public void deleteFilesNotMatchingPrefix(

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sadanand48 commented on a diff in pull request #3883: HDDS-6962. [Snapshot] Background Service to delete irrelevant SST files in a snapshot.

Posted by GitBox <gi...@apache.org>.
sadanand48 commented on code in PR #3883:
URL: https://github.com/apache/ozone/pull/3883#discussion_r1017836623


##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java:
##########
@@ -523,4 +591,64 @@ public String toString() {
     return name;
   }
 
+  @VisibleForTesting
+  public List<LiveFileMetaData> getSstFileList() {
+    return db.get().getLiveFilesMetaData();
+  }
+
+  /**
+   * return the max compaction level of sst files in the db.
+   * @return level
+   */
+  private int getLastLevel() {
+    return getSstFileList().stream()
+        .max(Comparator.comparing(LiveFileMetaData::level)).get().level();
+  }
+
+  /**
+   * Deletes sst files which do not correspond to prefix
+   * for given table.
+   * @param prefixPairs , a list of pairs (TableName,prefixUsed).
+   * @throws RocksDBException
+   */
+  public void deleteFilesNotMatchingPrefix(
+      List<Pair<String, String>> prefixPairs,
+      BooleanTriFunction<String, String, String, Boolean> filterFunction)
+      throws RocksDBException {
+    for (LiveFileMetaData liveFileMetaData : getSstFileList()) {
+      String sstFileColumnFamily =
+          new String(liveFileMetaData.columnFamilyName(),
+              StandardCharsets.UTF_8);
+      int lastLevel = getLastLevel();
+      for (Pair<String, String> prefixPair : prefixPairs) {
+        String columnFamily = prefixPair.getKey();
+        String prefixForColumnFamily = prefixPair.getValue();
+        if (sstFileColumnFamily.equals(columnFamily)) {
+          // RocksDB #deleteFile API allows only to delete the last level of
+          // SST Files. Any level < last level won't get deleted and
+          // only last file of level 0 can be deleted
+          // and will throw warning in the rocksdb manifest.
+          // Instead perform the level check here
+          // itself to avoid failed delete attempts for lower level files.
+          if (liveFileMetaData.level() == lastLevel && lastLevel != 0) {
+            String firstDbKey = new String(liveFileMetaData.smallestKey(),
+                StandardCharsets.UTF_8);
+            String lastDbKey = new String(liveFileMetaData.largestKey(),
+                StandardCharsets.UTF_8);
+            boolean isKeyWithPrefixPresent = filterFunction

Review Comment:
   I did it this way because RocksDatabase is a generic class and we cant add OM Keytable related logic i.e to extract bucketKey from the DBKey etc in this class . Instead the method deletKeysMatchingPrefix takes an argument as a functional interface which any component OM,SCM,DN can leverage by just defining the filter rule by writing own BooleanTriFunction. Not every table would have "/" as delim so adding it in RocksDatabase would make it specific usage. Please let me know your thoughts if this sounds good or if any other approach.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] GeorgeJahad commented on a diff in pull request #3883: HDDS-6962. [Snapshot] Background Service to delete irrelevant SST files in a snapshot.

Posted by GitBox <gi...@apache.org>.
GeorgeJahad commented on code in PR #3883:
URL: https://github.com/apache/ozone/pull/3883#discussion_r1013176361


##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.ozone.om;
+
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.hdds.utils.db.RocksDatabase;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.ozone.OzoneConsts.FILTERED_SNAPSHOTS;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_SST_DELETING_LIMIT_PER_TASK;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_SST_DELETING_LIMIT_PER_TASK_DEFAULT;
+
+/**
+ * When snapshots are taken, an entire snapshot of the
+ * OM RocksDB is captured and it will contain SST files corresponding
+ * to all volumes/buckets and keys and also have data from
+ * all the tables (columnFamilies) defined in the rocksdb
+ * This is a background service which will cleanup and filter out
+ * all the irrelevant and safe to delete sst files that don't correspond
+ * to the bucket on which the snapshot was taken.
+ */
+public class SstFilteringService extends BackgroundService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SstFilteringService.class);
+
+  // Use only a single thread for SST deletion. Multiple threads would read
+  // or write to same snapshots and can send deletion requests for same sst
+  // multiple times.
+  private static final int SST_FILTERING_CORE_POOL_SIZE = 1;
+
+  private final OzoneManager ozoneManager;
+
+  // Number of files to be batched in an iteration.
+  private final long snapshotLimitPerTask;
+
+  private AtomicLong snapshotFilteredCount;
+
+  public SstFilteringService(long interval, TimeUnit unit, long serviceTimeout,
+      OzoneManager ozoneManager, OzoneConfiguration configuration) {
+    super("SstFilteringService", interval, unit, SST_FILTERING_CORE_POOL_SIZE,
+        serviceTimeout);
+    this.ozoneManager = ozoneManager;
+    this.snapshotLimitPerTask = configuration
+        .getLong(SNAPSHOT_SST_DELETING_LIMIT_PER_TASK,
+            SNAPSHOT_SST_DELETING_LIMIT_PER_TASK_DEFAULT);
+    snapshotFilteredCount = new AtomicLong(0);
+  }
+
+  private class SSTFilteringTask implements BackgroundTask {
+
+    @Override
+    public BackgroundTaskResult call() throws Exception {
+
+      Table<String, SnapshotInfo> snapshotInfoTable =
+          ozoneManager.getMetadataManager().getSnapshotInfoTable();
+      try (
+          TableIterator<String, ? extends Table.KeyValue
+              <String, SnapshotInfo>> iterator = snapshotInfoTable
+              .iterator()) {
+        iterator.seekToFirst();
+
+        long snapshotLimit = snapshotLimitPerTask;
+
+        while (iterator.hasNext() && snapshotLimit > 0) {
+          Table.KeyValue<String, SnapshotInfo> keyValue = iterator.next();
+          String snapShotTableKey = keyValue.getKey();
+          SnapshotInfo snapshotInfo = keyValue.getValue();
+
+          File omMetadataDir =
+              OMStorage.getOmDbDir(ozoneManager.getConfiguration());
+          String snapshotDir = omMetadataDir + OM_KEY_PREFIX + OM_SNAPSHOT_DIR;
+          Path filePath =
+              Paths.get(snapshotDir + OM_KEY_PREFIX + FILTERED_SNAPSHOTS);
+
+          boolean isSnapshotFiltered = false;
+
+          // If entry for the snapshotID is present in this file,
+          // it has already undergone filtering.
+          if (Files.exists(filePath)) {
+            List<String> processedSnapshotIds = Files.readAllLines(filePath);
+            if (processedSnapshotIds.contains(snapshotInfo.getSnapshotID())) {
+              isSnapshotFiltered = true;
+            }
+          }
+
+          if (!isSnapshotFiltered) {
+            LOG.debug("Processing snapshot {} to filter relevant SST Files",
+                snapShotTableKey);
+
+            List<Pair<String, String>> prefixPairs =
+                constructPrefixPairs(snapshotInfo);
+
+            String dbName = OM_DB_NAME + snapshotInfo.getCheckpointDirName();
+
+            RDBStore rdbStore = (RDBStore) OmMetadataManagerImpl
+                .loadDB(ozoneManager.getConfiguration(), new File(snapshotDir),
+                    dbName, true);
+            RocksDatabase db = rdbStore.getDb();
+            db.deleteFilesNotMatchingPrefix(prefixPairs);
+
+            // mark the snapshot as filtered by writing to the file
+            String content = snapshotInfo.getSnapshotID() + "\n";
+            Files.write(filePath, content.getBytes(StandardCharsets.UTF_8),
+                StandardOpenOption.CREATE, StandardOpenOption.APPEND);
+            snapshotLimit--;
+            snapshotFilteredCount.getAndIncrement();
+          }
+        }
+      } catch (RocksDBException | IOException e) {
+        LOG.error("Error during Snapshot sst filtering ", e);
+      }
+
+      // nothing to return here
+      return BackgroundTaskResult.EmptyTaskResult.newResult();
+    }
+
+    /**
+     * @param snapshotInfo
+     * @return a list of pairs (tableName,keyPrefix).
+     * @throws IOException
+     */
+    private List<Pair<String, String>> constructPrefixPairs(
+        SnapshotInfo snapshotInfo) throws IOException {
+      String volumeName = snapshotInfo.getVolumeName();
+      String bucketName = snapshotInfo.getBucketName();
+
+      long volumeId = ozoneManager.getMetadataManager().getVolumeId(volumeName);
+      // TODO : buckets can be deleted via ofs , handle deletion of bucket case.

Review Comment:
   I totally misunderstood, thanks for clarifying



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] prashantpogde commented on a diff in pull request #3883: HDDS-6962. [Snapshot] Background Service to delete irrelevant SST files in a snapshot.

Posted by GitBox <gi...@apache.org>.
prashantpogde commented on code in PR #3883:
URL: https://github.com/apache/ozone/pull/3883#discussion_r1024400623


##########
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestSstFilteringService.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.ozone.om;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.hdds.utils.db.DBConfigFromFile;
+import org.apache.hadoop.hdds.utils.db.DBProfile;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.util.ExitUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.LiveFileMetaData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DB_PROFILE;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL;
+
+/**
+ * Test SST Filtering Service.
+ */
+public class TestSstFilteringService {
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+  private OzoneManagerProtocol writeClient;
+  private OzoneManager om;
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestSstFilteringService.class);
+
+  @BeforeClass
+  public static void setup() {
+    ExitUtils.disableSystemExit();
+  }
+
+  private OzoneConfiguration createConfAndInitValues() throws IOException {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    File newFolder = folder.newFolder();
+    if (!newFolder.exists()) {
+      Assert.assertTrue(newFolder.mkdirs());
+    }
+    System.setProperty(DBConfigFromFile.CONFIG_DIR, "/");
+    ServerUtils.setOzoneMetaDirPath(conf, newFolder.toString());
+    conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200,
+        TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, 100,
+        TimeUnit.MILLISECONDS);
+    conf.setEnum(HDDS_DB_PROFILE, DBProfile.TEST);
+    conf.setQuietMode(false);
+
+    return conf;
+  }
+
+  @After
+  public void cleanup() throws Exception {
+    om.stop();
+  }
+
+  /**
+   * Test checks whether for existing snapshots
+   * the checkpoint should not have any sst files that do not correspond to
+   * the bucket on which create snapshot command was issued.
+   *
+   * The SSTFiltering service deletes only the last level of
+   * sst file (rocksdb behaviour).
+   *
+   * 1. Create Keys for vol1/buck1 (L0 ssts will be created for vol1/buck1)
+   * 2. compact the db (new level SSTS will be created for vol1/buck1)
+   * 3. Create keys for vol1/buck2 (L0 ssts will be created for vol1/buck2)
+   * 4. Take snapshot on vol1/buck2.
+   * 5. The snapshot will contain compacted sst files pertaining to vol1/buck1
+   *    Wait till the BG service deletes these.
+   *
+   * @throws IOException - on Failure.
+   */
+
+  @Test
+  public void testIrrelevantSstFileDeletion()

Review Comment:
   why didn't this test catch the bug where we were deleting valid keys ? Can we add more validation tests around this service here  ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sadanand48 commented on a diff in pull request #3883: HDDS-6962. [Snapshot] Background Service to delete irrelevant SST files in a snapshot.

Posted by GitBox <gi...@apache.org>.
sadanand48 commented on code in PR #3883:
URL: https://github.com/apache/ozone/pull/3883#discussion_r1027685519


##########
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestSstFilteringService.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.ozone.om;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.hdds.utils.db.DBConfigFromFile;
+import org.apache.hadoop.hdds.utils.db.DBProfile;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.util.ExitUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.LiveFileMetaData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DB_PROFILE;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL;
+
+/**
+ * Test SST Filtering Service.
+ */
+public class TestSstFilteringService {
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+  private OzoneManagerProtocol writeClient;
+  private OzoneManager om;
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestSstFilteringService.class);
+
+  @BeforeClass
+  public static void setup() {
+    ExitUtils.disableSystemExit();
+  }
+
+  private OzoneConfiguration createConfAndInitValues() throws IOException {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    File newFolder = folder.newFolder();
+    if (!newFolder.exists()) {
+      Assert.assertTrue(newFolder.mkdirs());
+    }
+    System.setProperty(DBConfigFromFile.CONFIG_DIR, "/");
+    ServerUtils.setOzoneMetaDirPath(conf, newFolder.toString());
+    conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200,
+        TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, 100,
+        TimeUnit.MILLISECONDS);
+    conf.setEnum(HDDS_DB_PROFILE, DBProfile.TEST);
+    conf.setQuietMode(false);
+
+    return conf;
+  }
+
+  @After
+  public void cleanup() throws Exception {
+    om.stop();
+  }
+
+  /**
+   * Test checks whether for existing snapshots
+   * the checkpoint should not have any sst files that do not correspond to
+   * the bucket on which create snapshot command was issued.
+   *
+   * The SSTFiltering service deletes only the last level of
+   * sst file (rocksdb behaviour).
+   *
+   * 1. Create Keys for vol1/buck1 (L0 ssts will be created for vol1/buck1)
+   * 2. compact the db (new level SSTS will be created for vol1/buck1)
+   * 3. Create keys for vol1/buck2 (L0 ssts will be created for vol1/buck2)
+   * 4. Take snapshot on vol1/buck2.
+   * 5. The snapshot will contain compacted sst files pertaining to vol1/buck1
+   *    Wait till the BG service deletes these.
+   *
+   * @throws IOException - on Failure.
+   */
+
+  @Test
+  public void testIrrelevantSstFileDeletion()

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] GeorgeJahad commented on pull request #3883: HDDS-6962. [Snapshot] Background Service to delete irrelevant SST files in a snapshot.

Posted by GitBox <gi...@apache.org>.
GeorgeJahad commented on PR #3883:
URL: https://github.com/apache/ozone/pull/3883#issuecomment-1301393913

   > This is still beneficial as the last level SST's are the bulkiest
   
   This surprises me.  I always thought the first level was the bulkiest.  Is there any documentation you can point me to?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sadanand48 commented on a diff in pull request #3883: HDDS-6962. [Snapshot] Background Service to delete irrelevant SST files in a snapshot.

Posted by GitBox <gi...@apache.org>.
sadanand48 commented on code in PR #3883:
URL: https://github.com/apache/ozone/pull/3883#discussion_r1017830188


##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -638,11 +643,14 @@ public static class DifferSnapshotInfo {
     private final String dbPath;
     private final String snapshotID;
     private final long snapshotGeneration;
+    private final HashMap<String, String> tablePrefixes;

Review Comment:
   Done, Thanks!



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -706,9 +718,48 @@ public synchronized List<String> getSSTDiffList(
       LOG.debug("{}", logSB);
     }
 
+/*    if (src.getTablePrefixes() != null && !src.getTablePrefixes().isEmpty()) {
+    //  filterRelevantSstFiles(fwdDAGDifferentFiles, src.getTablePrefixes());
+    }*/
+
     return new ArrayList<>(fwdDAGDifferentFiles);
   }
 
+  public void filterRelevantSstFiles(Set<String> inputFiles,
+      HashMap<String, String> tableToPrefixMap) {
+    for (String filename : inputFiles) {
+      String filepath = getAbsoluteSstFilePath(filename);
+      try (SstFileReader sstFileReader = new SstFileReader(new Options())) {
+        sstFileReader.open(filepath);
+        TableProperties properties = sstFileReader.getTableProperties();
+        String tableName = new String(properties.getColumnFamilyName());
+        if (tableToPrefixMap.containsKey(tableName)) {
+          String prefix = tableToPrefixMap.get(tableName);
+          SstFileReaderIterator iterator =
+              sstFileReader.newIterator(new ReadOptions());
+          iterator.seekToFirst();
+          String firstKey = constructBucketKey(new String(iterator.key()));
+          iterator.seekToLast();
+          String lastKey = constructBucketKey(new String(iterator.key()));
+          if (!isKeyWithPrefixPresent(prefix, firstKey, lastKey)) {
+            inputFiles.remove(filename);
+          }
+        } else {
+          // entry from other tables
+          inputFiles.remove(filename);
+        }
+      } catch (RocksDBException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  boolean isKeyWithPrefixPresent(String prefixForColumnFamily,

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] hemantk-12 commented on a diff in pull request #3883: HDDS-6962. [Snapshot] Background Service to delete irrelevant SST files in a snapshot.

Posted by GitBox <gi...@apache.org>.
hemantk-12 commented on code in PR #3883:
URL: https://github.com/apache/ozone/pull/3883#discussion_r1017102223


##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java:
##########
@@ -523,4 +591,64 @@ public String toString() {
     return name;
   }
 
+  @VisibleForTesting
+  public List<LiveFileMetaData> getSstFileList() {
+    return db.get().getLiveFilesMetaData();
+  }
+
+  /**
+   * return the max compaction level of sst files in the db.
+   * @return level
+   */
+  private int getLastLevel() {
+    return getSstFileList().stream()
+        .max(Comparator.comparing(LiveFileMetaData::level)).get().level();
+  }
+
+  /**
+   * Deletes sst files which do not correspond to prefix
+   * for given table.
+   * @param prefixPairs , a list of pairs (TableName,prefixUsed).
+   * @throws RocksDBException
+   */
+  public void deleteFilesNotMatchingPrefix(
+      List<Pair<String, String>> prefixPairs,
+      BooleanTriFunction<String, String, String, Boolean> filterFunction)
+      throws RocksDBException {
+    for (LiveFileMetaData liveFileMetaData : getSstFileList()) {
+      String sstFileColumnFamily =
+          new String(liveFileMetaData.columnFamilyName(),
+              StandardCharsets.UTF_8);
+      int lastLevel = getLastLevel();
+      for (Pair<String, String> prefixPair : prefixPairs) {
+        String columnFamily = prefixPair.getKey();
+        String prefixForColumnFamily = prefixPair.getValue();
+        if (sstFileColumnFamily.equals(columnFamily)) {
+          // RocksDB #deleteFile API allows only to delete the last level of
+          // SST Files. Any level < last level won't get deleted and
+          // only last file of level 0 can be deleted
+          // and will throw warning in the rocksdb manifest.
+          // Instead perform the level check here
+          // itself to avoid failed delete attempts for lower level files.
+          if (liveFileMetaData.level() == lastLevel && lastLevel != 0) {
+            String firstDbKey = new String(liveFileMetaData.smallestKey(),
+                StandardCharsets.UTF_8);
+            String lastDbKey = new String(liveFileMetaData.largestKey(),
+                StandardCharsets.UTF_8);
+            boolean isKeyWithPrefixPresent = filterFunction

Review Comment:
   IMO, passing `filterFunction` is increasing code read complex. Now, reader has to go to multiple place to look for an actual filtering logic. I also didn't understand why you created functional interface instead of simple helper function.



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -638,11 +643,14 @@ public static class DifferSnapshotInfo {
     private final String dbPath;
     private final String snapshotID;
     private final long snapshotGeneration;
+    private final HashMap<String, String> tablePrefixes;

Review Comment:
   Is there an specific reason to use `HashMap` and not use `Map`? If not, please use `Map`.
   
   Good read [Code Against Interfaces, Not Implementations](https://betterprogramming.pub/code-against-interfaces-not-implementations-37b30e7ab992)



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java:
##########
@@ -377,6 +407,43 @@ public void compactRange() throws IOException {
     }
   }
 
+  /**
+   * @param cfName columnFamily on which compaction will run.
+   * @throws IOException
+   */
+  public void compactRange(String cfName) throws IOException {
+    ColumnFamilyHandle handle = null;
+    handle = getColumnFamilyHandle(cfName, handle);
+    try {
+      if (handle != null) {
+        db.get().compactRange(handle);
+      } else {
+        LOG.error("Provided column family doesn't exist."
+            + " Calling compactRange on null columnFamily");
+        db.get().compactRange();
+      }
+    } catch (RocksDBException e) {
+      closeOnError(e);
+      throw toIOException(this, "compactRange", e);
+    }
+  }
+
+  private ColumnFamilyHandle getColumnFamilyHandle(String cfName,

Review Comment:
   ```suggestion
     private ColumnFamilyHandle getColumnFamilyHandle(String cfName)
         throws IOException {
       for (ColumnFamilyHandle cf : getColumnFamilyHandles()) {
         try {
           if (cfName.equals(new String(cf.getName(), StandardCharsets.UTF_8))) {
             return cf;
           }
         } catch (RocksDBException e) {
           closeOnError(e);
           throw toIOException(this, "columnFamilyHandle.getName", e);
         }
       }
       return null;
     }
   ```



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java:
##########
@@ -359,6 +367,28 @@ public void flush() throws IOException {
     }
   }
 
+  /**
+   * @param cfName columnFamily on which flush will run.
+   * @throws IOException
+   */
+  public void flush(String cfName) throws IOException {
+    ColumnFamilyHandle handle = null;
+    handle = getColumnFamilyHandle(cfName, handle);

Review Comment:
   ```suggestion
       ColumnFamilyHandle handle = getColumnFamilyHandle(cfName);
   ```
   
   If you change `getColumnFamilyHandle` to suggested one.



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java:
##########
@@ -523,4 +591,64 @@ public String toString() {
     return name;
   }
 
+  @VisibleForTesting
+  public List<LiveFileMetaData> getSstFileList() {
+    return db.get().getLiveFilesMetaData();
+  }
+
+  /**
+   * return the max compaction level of sst files in the db.
+   * @return level
+   */
+  private int getLastLevel() {
+    return getSstFileList().stream()
+        .max(Comparator.comparing(LiveFileMetaData::level)).get().level();
+  }
+
+  /**
+   * Deletes sst files which do not correspond to prefix
+   * for given table.
+   * @param prefixPairs , a list of pairs (TableName,prefixUsed).
+   * @throws RocksDBException
+   */
+  public void deleteFilesNotMatchingPrefix(

Review Comment:
   This function has too much nesting and could have been avoided if written like
   ```suggestion
     public void deleteFilesNotMatchingPrefix(
         List<Pair<String, String>> prefixPairs,
         BooleanTriFunction<String, String, String, Boolean> filterFunction)
         throws RocksDBException {
       for (LiveFileMetaData liveFileMetaData : getSstFileList()) {
         String sstFileColumnFamily =
             new String(liveFileMetaData.columnFamilyName(),
                 StandardCharsets.UTF_8);
         int lastLevel = getLastLevel();
   
         for (Pair<String, String> prefixPair : prefixPairs) {
           String columnFamily = prefixPair.getKey();
           String prefixForColumnFamily = prefixPair.getValue();
           if (!sstFileColumnFamily.equals(columnFamily)) {
             continue;
           }
           // RocksDB #deleteFile API allows only to delete the last level of
           // SST Files. Any level < last level won't get deleted and
           // only last file of level 0 can be deleted
           // and will throw warning in the rocksdb manifest.
           // Instead, perform the level check here
           // itself to avoid failed delete attempts for lower level files.
           if (liveFileMetaData.level() != lastLevel || lastLevel == 0) {
             continue;
           }
           String firstDbKey = new String(liveFileMetaData.smallestKey(),
               StandardCharsets.UTF_8);
           String lastDbKey = new String(liveFileMetaData.largestKey(),
               StandardCharsets.UTF_8);
           boolean isKeyWithPrefixPresent = filterFunction
               .apply(firstDbKey, lastDbKey, prefixForColumnFamily);
           if (!isKeyWithPrefixPresent) {
             db.get().deleteFile(liveFileMetaData.fileName());
           }
         }
       }
     }
   ```



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -1002,4 +1053,20 @@ public boolean debugEnabled(Integer level) {
   public static Logger getLog() {
     return LOG;
   }
+
+  private String constructBucketKey(String keyName) {
+    if (!keyName.startsWith("/")) {

Review Comment:
   May be use [OZONE_URI_DELIMITER](https://github.com/apache/ozone/blob/4dba9020ed16b87ca96f0d517d7e33c48cd81f01/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java#L101) instead of `/`.



##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java:
##########
@@ -136,7 +138,24 @@ private DifferSnapshotInfo getDifferSnapshotInfo(
     // Use RocksDB transaction sequence number in SnapshotInfo, which is
     // persisted at the time of snapshot creation, as the snapshot generation
     return new DifferSnapshotInfo(checkpointPath, snapshotInfo.getSnapshotID(),
-        snapshotInfo.getDbTxSequenceNumber());
+        snapshotInfo.getDbTxSequenceNumber(),
+        getTablePrefixes(omMetadataManager, volumeName, bucketName));
+  }
+
+  private HashMap<String, String> getTablePrefixes(
+      OMMetadataManager omMetadataManager, String volumeName, String bucketName)
+      throws IOException {
+    HashMap<String, String> tablePrefixes = new HashMap<>();
+    String volumeId = String.valueOf(omMetadataManager.getVolumeId(volumeName));
+    String bucketId =
+        String.valueOf(omMetadataManager.getBucketId(volumeName, bucketName));
+    tablePrefixes.put(OmMetadataManagerImpl.KEY_TABLE,
+        "/" + volumeName + "/" + bucketName);

Review Comment:
   Same as previous.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.ozone.om;
+
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.BooleanTriFunction;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.hdds.utils.db.RocksDatabase;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.ozone.OzoneConsts.FILTERED_SNAPSHOTS;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_SST_DELETING_LIMIT_PER_TASK;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_SST_DELETING_LIMIT_PER_TASK_DEFAULT;
+
+/**
+ * When snapshots are taken, an entire snapshot of the
+ * OM RocksDB is captured and it will contain SST files corresponding
+ * to all volumes/buckets and keys and also have data from
+ * all the tables (columnFamilies) defined in the rocksdb
+ * This is a background service which will cleanup and filter out
+ * all the irrelevant and safe to delete sst files that don't correspond
+ * to the bucket on which the snapshot was taken.
+ */
+public class SstFilteringService extends BackgroundService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SstFilteringService.class);
+
+  // Use only a single thread for SST deletion. Multiple threads would read
+  // or write to same snapshots and can send deletion requests for same sst
+  // multiple times.
+  private static final int SST_FILTERING_CORE_POOL_SIZE = 1;
+
+  private final OzoneManager ozoneManager;
+
+  // Number of files to be batched in an iteration.
+  private final long snapshotLimitPerTask;
+
+  private AtomicLong snapshotFilteredCount;
+
+  private BooleanTriFunction<String, String, String, Boolean> filterFunction =
+      (first, last, prefix) -> {
+        String firstBucketKey = constructBucketKey(first);
+        String lastBucketKey = constructBucketKey(last);
+        return firstBucketKey.compareTo(prefix) <= 0
+            && prefix.compareTo(lastBucketKey) <= 0;
+      };
+
+  public SstFilteringService(long interval, TimeUnit unit, long serviceTimeout,
+      OzoneManager ozoneManager, OzoneConfiguration configuration) {
+    super("SstFilteringService", interval, unit, SST_FILTERING_CORE_POOL_SIZE,
+        serviceTimeout);
+    this.ozoneManager = ozoneManager;
+    this.snapshotLimitPerTask = configuration
+        .getLong(SNAPSHOT_SST_DELETING_LIMIT_PER_TASK,
+            SNAPSHOT_SST_DELETING_LIMIT_PER_TASK_DEFAULT);
+    snapshotFilteredCount = new AtomicLong(0);
+  }
+
+  private class SSTFilteringTask implements BackgroundTask {
+
+    @Override
+    public BackgroundTaskResult call() throws Exception {
+
+      Table<String, SnapshotInfo> snapshotInfoTable =
+          ozoneManager.getMetadataManager().getSnapshotInfoTable();
+      try (
+          TableIterator<String, ? extends Table.KeyValue
+              <String, SnapshotInfo>> iterator = snapshotInfoTable
+              .iterator()) {
+        iterator.seekToFirst();
+
+        long snapshotLimit = snapshotLimitPerTask;
+
+        while (iterator.hasNext() && snapshotLimit > 0) {

Review Comment:
   Use `continue` to short-circuit the loop.
   
   ```suggestion
           while (iterator.hasNext() && snapshotLimit > 0) {
             Table.KeyValue<String, SnapshotInfo> keyValue = iterator.next();
             String snapShotTableKey = keyValue.getKey();
             SnapshotInfo snapshotInfo = keyValue.getValue();
   
             File omMetadataDir =
                 OMStorage.getOmDbDir(ozoneManager.getConfiguration());
             String snapshotDir = omMetadataDir + OM_KEY_PREFIX + OM_SNAPSHOT_DIR;
             Path filePath =
                 Paths.get(snapshotDir + OM_KEY_PREFIX + FILTERED_SNAPSHOTS);
   
             // If entry for the snapshotId is present in this file,
             // it has already undergone filtering.
             if (Files.exists(filePath)) {
               List<String> processedSnapshotIds = Files.readAllLines(filePath);
               if (processedSnapshotIds.contains(snapshotInfo.getSnapshotID())) {
                 continue;
               }
             }
   
             LOG.debug("Processing snapshot {} to filter relevant SST Files",
                 snapShotTableKey);
   
             List<Pair<String, String>> prefixPairs =
                 constructPrefixPairs(snapshotInfo);
   
             String dbName = OM_DB_NAME + snapshotInfo.getCheckpointDirName();
   
             RDBStore rdbStore = (RDBStore) OmMetadataManagerImpl
                 .loadDB(ozoneManager.getConfiguration(), new File(snapshotDir),
                     dbName, true);
             RocksDatabase db = rdbStore.getDb();
             db.deleteFilesNotMatchingPrefix(prefixPairs, filterFunction);
   
             // mark the snapshot as filtered by writing to the file
             String content = snapshotInfo.getSnapshotID() + "\n";
             Files.write(filePath, content.getBytes(StandardCharsets.UTF_8),
                 StandardOpenOption.CREATE, StandardOpenOption.APPEND);
             snapshotLimit--;
             snapshotFilteredCount.getAndIncrement();
           }
   ```



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -706,9 +718,48 @@ public synchronized List<String> getSSTDiffList(
       LOG.debug("{}", logSB);
     }
 
+/*    if (src.getTablePrefixes() != null && !src.getTablePrefixes().isEmpty()) {
+    //  filterRelevantSstFiles(fwdDAGDifferentFiles, src.getTablePrefixes());
+    }*/
+
     return new ArrayList<>(fwdDAGDifferentFiles);
   }
 
+  public void filterRelevantSstFiles(Set<String> inputFiles,
+      HashMap<String, String> tableToPrefixMap) {
+    for (String filename : inputFiles) {
+      String filepath = getAbsoluteSstFilePath(filename);
+      try (SstFileReader sstFileReader = new SstFileReader(new Options())) {
+        sstFileReader.open(filepath);
+        TableProperties properties = sstFileReader.getTableProperties();
+        String tableName = new String(properties.getColumnFamilyName());
+        if (tableToPrefixMap.containsKey(tableName)) {
+          String prefix = tableToPrefixMap.get(tableName);
+          SstFileReaderIterator iterator =
+              sstFileReader.newIterator(new ReadOptions());
+          iterator.seekToFirst();
+          String firstKey = constructBucketKey(new String(iterator.key()));
+          iterator.seekToLast();
+          String lastKey = constructBucketKey(new String(iterator.key()));
+          if (!isKeyWithPrefixPresent(prefix, firstKey, lastKey)) {
+            inputFiles.remove(filename);
+          }
+        } else {
+          // entry from other tables
+          inputFiles.remove(filename);
+        }
+      } catch (RocksDBException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  boolean isKeyWithPrefixPresent(String prefixForColumnFamily,

Review Comment:
   This function and line #741 & #743 is duplicate code. Why not create a helper function and reuse it.



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java:
##########
@@ -523,4 +590,62 @@ public String toString() {
     return name;
   }
 
+  @VisibleForTesting
+  public List<LiveFileMetaData> getSstFileList() {
+    return db.get().getLiveFilesMetaData();
+  }
+
+  /**
+   * return the max compaction level of sst files in the db.
+   * @return level
+   */
+  private int getLastLevel() {
+    return getSstFileList().stream()
+        .max(Comparator.comparing(LiveFileMetaData::level)).get().level();
+  }
+
+  /**
+   * Deletes sst files which do not correspond to prefix
+   * for given table.
+   * @param prefixPairs , a list of pairs (TableName,prefixUsed).

Review Comment:
   ```suggestion
      * @param prefixPairs, a list of pair (TableName, prefixUsed).
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.ozone.om;
+
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.BooleanTriFunction;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.hdds.utils.db.RocksDatabase;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.ozone.OzoneConsts.FILTERED_SNAPSHOTS;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_SST_DELETING_LIMIT_PER_TASK;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_SST_DELETING_LIMIT_PER_TASK_DEFAULT;
+
+/**
+ * When snapshots are taken, an entire snapshot of the
+ * OM RocksDB is captured and it will contain SST files corresponding
+ * to all volumes/buckets and keys and also have data from
+ * all the tables (columnFamilies) defined in the rocksdb
+ * This is a background service which will cleanup and filter out
+ * all the irrelevant and safe to delete sst files that don't correspond
+ * to the bucket on which the snapshot was taken.
+ */
+public class SstFilteringService extends BackgroundService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SstFilteringService.class);
+
+  // Use only a single thread for SST deletion. Multiple threads would read
+  // or write to same snapshots and can send deletion requests for same sst
+  // multiple times.
+  private static final int SST_FILTERING_CORE_POOL_SIZE = 1;
+
+  private final OzoneManager ozoneManager;
+
+  // Number of files to be batched in an iteration.
+  private final long snapshotLimitPerTask;
+
+  private AtomicLong snapshotFilteredCount;
+
+  private BooleanTriFunction<String, String, String, Boolean> filterFunction =
+      (first, last, prefix) -> {
+        String firstBucketKey = constructBucketKey(first);
+        String lastBucketKey = constructBucketKey(last);
+        return firstBucketKey.compareTo(prefix) <= 0
+            && prefix.compareTo(lastBucketKey) <= 0;
+      };
+
+  public SstFilteringService(long interval, TimeUnit unit, long serviceTimeout,
+      OzoneManager ozoneManager, OzoneConfiguration configuration) {
+    super("SstFilteringService", interval, unit, SST_FILTERING_CORE_POOL_SIZE,
+        serviceTimeout);
+    this.ozoneManager = ozoneManager;
+    this.snapshotLimitPerTask = configuration
+        .getLong(SNAPSHOT_SST_DELETING_LIMIT_PER_TASK,
+            SNAPSHOT_SST_DELETING_LIMIT_PER_TASK_DEFAULT);
+    snapshotFilteredCount = new AtomicLong(0);
+  }
+
+  private class SSTFilteringTask implements BackgroundTask {
+
+    @Override
+    public BackgroundTaskResult call() throws Exception {
+
+      Table<String, SnapshotInfo> snapshotInfoTable =
+          ozoneManager.getMetadataManager().getSnapshotInfoTable();
+      try (
+          TableIterator<String, ? extends Table.KeyValue
+              <String, SnapshotInfo>> iterator = snapshotInfoTable
+              .iterator()) {
+        iterator.seekToFirst();
+
+        long snapshotLimit = snapshotLimitPerTask;
+
+        while (iterator.hasNext() && snapshotLimit > 0) {
+          Table.KeyValue<String, SnapshotInfo> keyValue = iterator.next();
+          String snapShotTableKey = keyValue.getKey();
+          SnapshotInfo snapshotInfo = keyValue.getValue();
+
+          File omMetadataDir =
+              OMStorage.getOmDbDir(ozoneManager.getConfiguration());
+          String snapshotDir = omMetadataDir + OM_KEY_PREFIX + OM_SNAPSHOT_DIR;
+          Path filePath =
+              Paths.get(snapshotDir + OM_KEY_PREFIX + FILTERED_SNAPSHOTS);
+
+          boolean isSnapshotFiltered = false;
+
+          // If entry for the snapshotID is present in this file,
+          // it has already undergone filtering.
+          if (Files.exists(filePath)) {
+            List<String> processedSnapshotIds = Files.readAllLines(filePath);
+            if (processedSnapshotIds.contains(snapshotInfo.getSnapshotID())) {
+              isSnapshotFiltered = true;
+            }
+          }
+
+          if (!isSnapshotFiltered) {
+            LOG.debug("Processing snapshot {} to filter relevant SST Files",
+                snapShotTableKey);
+
+            List<Pair<String, String>> prefixPairs =
+                constructPrefixPairs(snapshotInfo);
+
+            String dbName = OM_DB_NAME + snapshotInfo.getCheckpointDirName();
+
+            RDBStore rdbStore = (RDBStore) OmMetadataManagerImpl
+                .loadDB(ozoneManager.getConfiguration(), new File(snapshotDir),
+                    dbName, true);
+            RocksDatabase db = rdbStore.getDb();
+            db.deleteFilesNotMatchingPrefix(prefixPairs, filterFunction);
+
+            // mark the snapshot as filtered by writing to the file
+            String content = snapshotInfo.getSnapshotID() + "\n";
+            Files.write(filePath, content.getBytes(StandardCharsets.UTF_8),
+                StandardOpenOption.CREATE, StandardOpenOption.APPEND);
+            snapshotLimit--;
+            snapshotFilteredCount.getAndIncrement();
+          }
+        }
+      } catch (RocksDBException | IOException e) {
+        LOG.error("Error during Snapshot sst filtering ", e);
+      }
+
+      // nothing to return here
+      return BackgroundTaskResult.EmptyTaskResult.newResult();
+    }
+
+    /**
+     * @param snapshotInfo
+     * @return a list of pairs (tableName,keyPrefix).
+     * @throws IOException
+     */
+    private List<Pair<String, String>> constructPrefixPairs(
+        SnapshotInfo snapshotInfo) throws IOException {
+      String volumeName = snapshotInfo.getVolumeName();
+      String bucketName = snapshotInfo.getBucketName();
+
+      long volumeId = ozoneManager.getMetadataManager().getVolumeId(volumeName);
+      // TODO : buckets can be deleted via ofs , handle deletion of bucket case.
+      long bucketId =
+          ozoneManager.getMetadataManager().getBucketId(volumeName, bucketName);
+
+      String filterPrefix =
+          OM_KEY_PREFIX + volumeName + OM_KEY_PREFIX + bucketName;
+
+      String filterPrefixFSO =
+          OM_KEY_PREFIX + volumeId + OM_KEY_PREFIX + bucketId;
+
+      List<Pair<String, String>> prefixPairs = new ArrayList<>();
+      prefixPairs
+          .add(Pair.of(OmMetadataManagerImpl.KEY_TABLE, filterPrefix));
+      prefixPairs.add(
+          Pair.of(OmMetadataManagerImpl.DIRECTORY_TABLE, filterPrefixFSO));
+      prefixPairs
+          .add(Pair.of(OmMetadataManagerImpl.FILE_TABLE, filterPrefixFSO));
+      return prefixPairs;
+    }
+  }
+
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    BackgroundTaskQueue queue = new BackgroundTaskQueue();
+    queue.add(new SSTFilteringTask());
+    return queue;
+  }
+
+  public AtomicLong getSnapshotFilteredCount() {
+    return snapshotFilteredCount;
+  }
+
+  String constructBucketKey(String keyName) {

Review Comment:
   1. Why is it not private?
   2. We have similar code in `RocksDBCheckpointDiffer`. May be create a helper function and reuse the code.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.ozone.om;
+
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.BooleanTriFunction;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.hdds.utils.db.RocksDatabase;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.ozone.OzoneConsts.FILTERED_SNAPSHOTS;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_SST_DELETING_LIMIT_PER_TASK;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_SST_DELETING_LIMIT_PER_TASK_DEFAULT;
+
+/**
+ * When snapshots are taken, an entire snapshot of the
+ * OM RocksDB is captured and it will contain SST files corresponding
+ * to all volumes/buckets and keys and also have data from
+ * all the tables (columnFamilies) defined in the rocksdb
+ * This is a background service which will cleanup and filter out
+ * all the irrelevant and safe to delete sst files that don't correspond
+ * to the bucket on which the snapshot was taken.
+ */
+public class SstFilteringService extends BackgroundService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SstFilteringService.class);
+
+  // Use only a single thread for SST deletion. Multiple threads would read
+  // or write to same snapshots and can send deletion requests for same sst
+  // multiple times.
+  private static final int SST_FILTERING_CORE_POOL_SIZE = 1;
+
+  private final OzoneManager ozoneManager;
+
+  // Number of files to be batched in an iteration.
+  private final long snapshotLimitPerTask;
+
+  private AtomicLong snapshotFilteredCount;
+
+  private BooleanTriFunction<String, String, String, Boolean> filterFunction =
+      (first, last, prefix) -> {
+        String firstBucketKey = constructBucketKey(first);
+        String lastBucketKey = constructBucketKey(last);
+        return firstBucketKey.compareTo(prefix) <= 0
+            && prefix.compareTo(lastBucketKey) <= 0;
+      };
+
+  public SstFilteringService(long interval, TimeUnit unit, long serviceTimeout,
+      OzoneManager ozoneManager, OzoneConfiguration configuration) {
+    super("SstFilteringService", interval, unit, SST_FILTERING_CORE_POOL_SIZE,
+        serviceTimeout);
+    this.ozoneManager = ozoneManager;
+    this.snapshotLimitPerTask = configuration
+        .getLong(SNAPSHOT_SST_DELETING_LIMIT_PER_TASK,
+            SNAPSHOT_SST_DELETING_LIMIT_PER_TASK_DEFAULT);
+    snapshotFilteredCount = new AtomicLong(0);
+  }
+
+  private class SSTFilteringTask implements BackgroundTask {

Review Comment:
   nit: `SstFilteringTask`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] prashantpogde commented on a diff in pull request #3883: HDDS-6962. [Snapshot] Background Service to delete irrelevant SST files in a snapshot.

Posted by GitBox <gi...@apache.org>.
prashantpogde commented on code in PR #3883:
URL: https://github.com/apache/ozone/pull/3883#discussion_r1035326342


##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java:
##########
@@ -523,4 +591,64 @@ public String toString() {
     return name;
   }
 
+  @VisibleForTesting
+  public List<LiveFileMetaData> getSstFileList() {
+    return db.get().getLiveFilesMetaData();
+  }
+
+  /**
+   * return the max compaction level of sst files in the db.
+   * @return level
+   */
+  private int getLastLevel() {
+    return getSstFileList().stream()
+        .max(Comparator.comparing(LiveFileMetaData::level)).get().level();
+  }
+
+  /**
+   * Deletes sst files which do not correspond to prefix
+   * for given table.
+   * @param prefixPairs , a list of pairs (TableName,prefixUsed).
+   * @throws RocksDBException
+   */
+  public void deleteFilesNotMatchingPrefix(
+      List<Pair<String, String>> prefixPairs,
+      BooleanTriFunction<String, String, String, Boolean> filterFunction)
+      throws RocksDBException {
+    for (LiveFileMetaData liveFileMetaData : getSstFileList()) {
+      String sstFileColumnFamily =
+          new String(liveFileMetaData.columnFamilyName(),
+              StandardCharsets.UTF_8);
+      int lastLevel = getLastLevel();
+      for (Pair<String, String> prefixPair : prefixPairs) {
+        String columnFamily = prefixPair.getKey();
+        String prefixForColumnFamily = prefixPair.getValue();
+        if (sstFileColumnFamily.equals(columnFamily)) {
+          // RocksDB #deleteFile API allows only to delete the last level of
+          // SST Files. Any level < last level won't get deleted and
+          // only last file of level 0 can be deleted
+          // and will throw warning in the rocksdb manifest.
+          // Instead perform the level check here
+          // itself to avoid failed delete attempts for lower level files.
+          if (liveFileMetaData.level() == lastLevel && lastLevel != 0) {
+            String firstDbKey = new String(liveFileMetaData.smallestKey(),
+                StandardCharsets.UTF_8);
+            String lastDbKey = new String(liveFileMetaData.largestKey(),
+                StandardCharsets.UTF_8);
+            boolean isKeyWithPrefixPresent = filterFunction
+                .apply(firstDbKey, lastDbKey, prefixForColumnFamily);
+            if (!isKeyWithPrefixPresent) {
+              db.get().deleteFile(liveFileMetaData.fileName());

Review Comment:
   Every time we delete a file, we should log the name of the file and the key range we are deleting. It can be very useful for debugging purpose.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sadanand48 commented on a diff in pull request #3883: HDDS-6962. [Snapshot] Background Service to delete irrelevant SST files in a snapshot.

Posted by GitBox <gi...@apache.org>.
sadanand48 commented on code in PR #3883:
URL: https://github.com/apache/ozone/pull/3883#discussion_r1012961384


##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.ozone.om;
+
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.hdds.utils.db.RocksDatabase;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.ozone.OzoneConsts.FILTERED_SNAPSHOTS;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_SST_DELETING_LIMIT_PER_TASK;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_SST_DELETING_LIMIT_PER_TASK_DEFAULT;
+
+/**
+ * When snapshots are taken, an entire snapshot of the
+ * OM RocksDB is captured and it will contain SST files corresponding
+ * to all volumes/buckets and keys and also have data from
+ * all the tables (columnFamilies) defined in the rocksdb
+ * This is a background service which will cleanup and filter out
+ * all the irrelevant and safe to delete sst files that don't correspond
+ * to the bucket on which the snapshot was taken.
+ */
+public class SstFilteringService extends BackgroundService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SstFilteringService.class);
+
+  // Use only a single thread for SST deletion. Multiple threads would read
+  // or write to same snapshots and can send deletion requests for same sst
+  // multiple times.
+  private static final int SST_FILTERING_CORE_POOL_SIZE = 1;
+
+  private final OzoneManager ozoneManager;
+
+  // Number of files to be batched in an iteration.
+  private final long snapshotLimitPerTask;
+
+  private AtomicLong snapshotFilteredCount;
+
+  public SstFilteringService(long interval, TimeUnit unit, long serviceTimeout,
+      OzoneManager ozoneManager, OzoneConfiguration configuration) {
+    super("SstFilteringService", interval, unit, SST_FILTERING_CORE_POOL_SIZE,
+        serviceTimeout);
+    this.ozoneManager = ozoneManager;
+    this.snapshotLimitPerTask = configuration
+        .getLong(SNAPSHOT_SST_DELETING_LIMIT_PER_TASK,
+            SNAPSHOT_SST_DELETING_LIMIT_PER_TASK_DEFAULT);
+    snapshotFilteredCount = new AtomicLong(0);
+  }
+
+  private class SSTFilteringTask implements BackgroundTask {
+
+    @Override
+    public BackgroundTaskResult call() throws Exception {
+
+      Table<String, SnapshotInfo> snapshotInfoTable =
+          ozoneManager.getMetadataManager().getSnapshotInfoTable();
+      try (
+          TableIterator<String, ? extends Table.KeyValue
+              <String, SnapshotInfo>> iterator = snapshotInfoTable
+              .iterator()) {
+        iterator.seekToFirst();
+
+        long snapshotLimit = snapshotLimitPerTask;
+
+        while (iterator.hasNext() && snapshotLimit > 0) {
+          Table.KeyValue<String, SnapshotInfo> keyValue = iterator.next();
+          String snapShotTableKey = keyValue.getKey();
+          SnapshotInfo snapshotInfo = keyValue.getValue();
+
+          File omMetadataDir =
+              OMStorage.getOmDbDir(ozoneManager.getConfiguration());
+          String snapshotDir = omMetadataDir + OM_KEY_PREFIX + OM_SNAPSHOT_DIR;
+          Path filePath =
+              Paths.get(snapshotDir + OM_KEY_PREFIX + FILTERED_SNAPSHOTS);
+
+          boolean isSnapshotFiltered = false;
+
+          // If entry for the snapshotID is present in this file,
+          // it has already undergone filtering.
+          if (Files.exists(filePath)) {
+            List<String> processedSnapshotIds = Files.readAllLines(filePath);
+            if (processedSnapshotIds.contains(snapshotInfo.getSnapshotID())) {
+              isSnapshotFiltered = true;
+            }
+          }
+
+          if (!isSnapshotFiltered) {
+            LOG.debug("Processing snapshot {} to filter relevant SST Files",
+                snapShotTableKey);
+
+            List<Pair<String, String>> prefixPairs =
+                constructPrefixPairs(snapshotInfo);
+
+            String dbName = OM_DB_NAME + snapshotInfo.getCheckpointDirName();
+
+            RDBStore rdbStore = (RDBStore) OmMetadataManagerImpl
+                .loadDB(ozoneManager.getConfiguration(), new File(snapshotDir),
+                    dbName, true);
+            RocksDatabase db = rdbStore.getDb();
+            db.deleteFilesNotMatchingPrefix(prefixPairs);
+
+            // mark the snapshot as filtered by writing to the file
+            String content = snapshotInfo.getSnapshotID() + "\n";
+            Files.write(filePath, content.getBytes(StandardCharsets.UTF_8),
+                StandardOpenOption.CREATE, StandardOpenOption.APPEND);
+            snapshotLimit--;
+            snapshotFilteredCount.getAndIncrement();
+          }
+        }
+      } catch (RocksDBException | IOException e) {
+        LOG.error("Error during Snapshot sst filtering ", e);
+      }
+
+      // nothing to return here
+      return BackgroundTaskResult.EmptyTaskResult.newResult();
+    }
+
+    /**
+     * @param snapshotInfo
+     * @return a list of pairs (tableName,keyPrefix).
+     * @throws IOException
+     */
+    private List<Pair<String, String>> constructPrefixPairs(
+        SnapshotInfo snapshotInfo) throws IOException {
+      String volumeName = snapshotInfo.getVolumeName();
+      String bucketName = snapshotInfo.getBucketName();
+
+      long volumeId = ozoneManager.getMetadataManager().getVolumeId(volumeName);
+      // TODO : buckets can be deleted via ofs , handle deletion of bucket case.

Review Comment:
   What I meant here is that since the background service doesn't acquire any bucket lock, and some one deletes it , it might fail to compute ` ozoneManager.getMetadataManager().getBucketId(volumeName, bucketName);` since it reads from active db bucket table currently in the code. Probably we should read it from the snapshot instead of active db .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sadanand48 commented on pull request #3883: HDDS-6962. [Snapshot] Background Service to delete irrelevant SST files in a snapshot.

Posted by GitBox <gi...@apache.org>.
sadanand48 commented on PR #3883:
URL: https://github.com/apache/ozone/pull/3883#issuecomment-1302182942

   Included HDDS-7281 now just for test purpose , will revert change from PR  once its merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sadanand48 commented on a diff in pull request #3883: HDDS-6962. [Snapshot] Background Service to delete irrelevant SST files in a snapshot.

Posted by GitBox <gi...@apache.org>.
sadanand48 commented on code in PR #3883:
URL: https://github.com/apache/ozone/pull/3883#discussion_r1017088131


##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -706,9 +718,48 @@ public synchronized List<String> getSSTDiffList(
       LOG.debug("{}", logSB);
     }
 
+/*    if (src.getTablePrefixes() != null && !src.getTablePrefixes().isEmpty()) {
+    //  filterRelevantSstFiles(fwdDAGDifferentFiles, src.getTablePrefixes());

Review Comment:
   Currently disabling filtering from DAG output, will uncomment once thoroughly tested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] GeorgeJahad commented on a diff in pull request #3883: HDDS-6962. [Snapshot] Background Service to delete irrelevant SST files in a snapshot.

Posted by GitBox <gi...@apache.org>.
GeorgeJahad commented on code in PR #3883:
URL: https://github.com/apache/ozone/pull/3883#discussion_r1012350630


##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.ozone.om;
+
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.hdds.utils.db.RocksDatabase;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.hadoop.ozone.OzoneConsts.FILTERED_SNAPSHOTS;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_SST_DELETING_LIMIT_PER_TASK;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_SST_DELETING_LIMIT_PER_TASK_DEFAULT;
+
+/**
+ * When snapshots are taken, an entire snapshot of the
+ * OM RocksDB is captured and it will contain SST files corresponding
+ * to all volumes/buckets and keys and also have data from
+ * all the tables (columnFamilies) defined in the rocksdb
+ * This is a background service which will cleanup and filter out
+ * all the irrelevant and safe to delete sst files that don't correspond
+ * to the bucket on which the snapshot was taken.
+ */
+public class SstFilteringService extends BackgroundService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SstFilteringService.class);
+
+  // Use only a single thread for SST deletion. Multiple threads would read
+  // or write to same snapshots and can send deletion requests for same sst
+  // multiple times.
+  private static final int SST_FILTERING_CORE_POOL_SIZE = 1;
+
+  private final OzoneManager ozoneManager;
+
+  // Number of files to be batched in an iteration.
+  private final long snapshotLimitPerTask;
+
+  private AtomicLong snapshotFilteredCount;
+
+  public SstFilteringService(long interval, TimeUnit unit, long serviceTimeout,
+      OzoneManager ozoneManager, OzoneConfiguration configuration) {
+    super("SstFilteringService", interval, unit, SST_FILTERING_CORE_POOL_SIZE,
+        serviceTimeout);
+    this.ozoneManager = ozoneManager;
+    this.snapshotLimitPerTask = configuration
+        .getLong(SNAPSHOT_SST_DELETING_LIMIT_PER_TASK,
+            SNAPSHOT_SST_DELETING_LIMIT_PER_TASK_DEFAULT);
+    snapshotFilteredCount = new AtomicLong(0);
+  }
+
+  private class SSTFilteringTask implements BackgroundTask {
+
+    @Override
+    public BackgroundTaskResult call() throws Exception {
+
+      Table<String, SnapshotInfo> snapshotInfoTable =
+          ozoneManager.getMetadataManager().getSnapshotInfoTable();
+      try (
+          TableIterator<String, ? extends Table.KeyValue
+              <String, SnapshotInfo>> iterator = snapshotInfoTable
+              .iterator()) {
+        iterator.seekToFirst();
+
+        long snapshotLimit = snapshotLimitPerTask;
+
+        while (iterator.hasNext() && snapshotLimit > 0) {
+          Table.KeyValue<String, SnapshotInfo> keyValue = iterator.next();
+          String snapShotTableKey = keyValue.getKey();
+          SnapshotInfo snapshotInfo = keyValue.getValue();
+
+          File omMetadataDir =
+              OMStorage.getOmDbDir(ozoneManager.getConfiguration());
+          String snapshotDir = omMetadataDir + OM_KEY_PREFIX + OM_SNAPSHOT_DIR;
+          Path filePath =
+              Paths.get(snapshotDir + OM_KEY_PREFIX + FILTERED_SNAPSHOTS);
+
+          boolean isSnapshotFiltered = false;
+
+          // If entry for the snapshotID is present in this file,
+          // it has already undergone filtering.
+          if (Files.exists(filePath)) {
+            List<String> processedSnapshotIds = Files.readAllLines(filePath);
+            if (processedSnapshotIds.contains(snapshotInfo.getSnapshotID())) {
+              isSnapshotFiltered = true;
+            }
+          }
+
+          if (!isSnapshotFiltered) {
+            LOG.debug("Processing snapshot {} to filter relevant SST Files",
+                snapShotTableKey);
+
+            List<Pair<String, String>> prefixPairs =
+                constructPrefixPairs(snapshotInfo);
+
+            String dbName = OM_DB_NAME + snapshotInfo.getCheckpointDirName();
+
+            RDBStore rdbStore = (RDBStore) OmMetadataManagerImpl
+                .loadDB(ozoneManager.getConfiguration(), new File(snapshotDir),
+                    dbName, true);
+            RocksDatabase db = rdbStore.getDb();
+            db.deleteFilesNotMatchingPrefix(prefixPairs);
+
+            // mark the snapshot as filtered by writing to the file
+            String content = snapshotInfo.getSnapshotID() + "\n";
+            Files.write(filePath, content.getBytes(StandardCharsets.UTF_8),
+                StandardOpenOption.CREATE, StandardOpenOption.APPEND);
+            snapshotLimit--;
+            snapshotFilteredCount.getAndIncrement();
+          }
+        }
+      } catch (RocksDBException | IOException e) {
+        LOG.error("Error during Snapshot sst filtering ", e);
+      }
+
+      // nothing to return here
+      return BackgroundTaskResult.EmptyTaskResult.newResult();
+    }
+
+    /**
+     * @param snapshotInfo
+     * @return a list of pairs (tableName,keyPrefix).
+     * @throws IOException
+     */
+    private List<Pair<String, String>> constructPrefixPairs(
+        SnapshotInfo snapshotInfo) throws IOException {
+      String volumeName = snapshotInfo.getVolumeName();
+      String bucketName = snapshotInfo.getBucketName();
+
+      long volumeId = ozoneManager.getMetadataManager().getVolumeId(volumeName);
+      // TODO : buckets can be deleted via ofs , handle deletion of bucket case.

Review Comment:
   I know it is not part of this PR, but I'm curious about bucket deletion.  Wouldn't that require the creation of new sst files?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org