You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ca...@apache.org on 2022/06/20 02:14:33 UTC

[ozone] branch master updated: HDDS-4123. Integrate OM Open Key Cleanup Service Into Existing Code (#3319)

This is an automated email from the ASF dual-hosted git repository.

captainzmc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 321f5856e1 HDDS-4123. Integrate OM Open Key Cleanup Service Into Existing Code (#3319)
321f5856e1 is described below

commit 321f5856e10be89408616ff67fd7e0382cdde5e2
Author: Kaijie Chen <ch...@kaijie.org>
AuthorDate: Mon Jun 20 10:14:27 2022 +0800

    HDDS-4123. Integrate OM Open Key Cleanup Service Into Existing Code (#3319)
    
    * Enable open key cleanup service
---
 .../common/src/main/resources/ozone-default.xml    |  13 ++
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |   5 +
 .../hdds/scm/TestSCMDbCheckpointServlet.java       |   4 -
 .../hadoop/ozone/TestMiniOzoneOMHACluster.java     |   4 -
 .../org/apache/hadoop/ozone/om/TestOmInit.java     |   5 -
 .../hadoop/ozone/om/TestSecureOzoneManager.java    |   4 -
 .../src/main/proto/OmClientProtocol.proto          |   1 +
 hadoop-ozone/ozone-manager/pom.xml                 |   4 +
 .../org/apache/hadoop/ozone/om/KeyManager.java     |   6 +
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java |  28 +++
 .../hadoop/ozone/om/OpenKeyCleanupService.java     | 190 +++++++++++++++--
 .../om/ratis/utils/OzoneManagerRatisUtils.java     |   8 +
 .../hadoop/ozone/om/TestOpenKeyCleanupService.java | 224 +++++++++++++++++++++
 13 files changed, 457 insertions(+), 39 deletions(-)

diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 21c580b8cd..bb0e677a8c 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1249,6 +1249,19 @@
     </description>
   </property>
 
+  <property>
+    <name>ozone.om.open.key.cleanup.service.timeout</name>
+    <value>300s</value>
+    <tag>OZONE, OM, PERFORMANCE</tag>
+    <description>A timeout value of open key cleanup service. If this is set
+      greater than 0, the service will stop waiting for the open key deleting
+      completion after this time. If timeout happens to a large proportion of
+      open key deletion, this value needs to be increased or
+      ozone.om.open.key.cleanup.limit.per.task should be decreased.
+      Unit could be defined with postfix (ns,ms,s,m,h,d)
+    </description>
+  </property>
+
   <property>
     <name>ozone.om.open.key.expire.threshold</name>
     <value>7d</value>
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index aa34f049f4..99d56d2b02 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -91,6 +91,11 @@ public final class OMConfigKeys {
   public static final String
       OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_DEFAULT = "24h";
 
+  public static final String OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_TIMEOUT =
+      "ozone.om.open.key.cleanup.service.timeout";
+  public static final String OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_TIMEOUT_DEFAULT
+      = "300s";
+
   public static final String OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD =
       "ozone.om.open.key.expire.threshold";
   public static final String OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD_DEFAULT =
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java
index ca8e4799ee..b2576fefa4 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java
@@ -28,7 +28,6 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.UUID;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMMetrics;
@@ -40,7 +39,6 @@ import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.commons.io.FileUtils;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_FLUSH;
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD;
 
 import org.junit.After;
 import org.junit.Assert;
@@ -86,8 +84,6 @@ public class TestSCMDbCheckpointServlet {
     scmId = UUID.randomUUID().toString();
     omId = UUID.randomUUID().toString();
     conf.setBoolean(OZONE_ACL_ENABLED, true);
-    conf.setTimeDuration(OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD,
-        2, TimeUnit.SECONDS);
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setClusterId(clusterId)
         .setScmId(scmId)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneOMHACluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneOMHACluster.java
index 2aaf78e13a..65c57c5717 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneOMHACluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneOMHACluster.java
@@ -31,13 +31,11 @@ import org.junit.rules.Timeout;
 
 import java.io.IOException;
 import java.util.UUID;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD;
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD;
 
 /**
  * This class tests MiniOzoneHAClusterImpl.
@@ -71,8 +69,6 @@ public class TestMiniOzoneOMHACluster {
     conf.setBoolean(OZONE_ACL_ENABLED, true);
     conf.set(OzoneConfigKeys.OZONE_ADMINISTRATORS,
         OZONE_ADMINISTRATORS_WILDCARD);
-    conf.setTimeDuration(OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD,
-        2, TimeUnit.SECONDS);
     cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newOMHABuilder(conf)
         .setClusterId(clusterId)
         .setScmId(scmId)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmInit.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmInit.java
index aaf07e8b7d..a93e3ce98e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmInit.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmInit.java
@@ -18,14 +18,11 @@ package org.apache.hadoop.ozone.om;
 
 import java.io.IOException;
 import java.util.UUID;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.security.authentication.client.AuthenticationException;
 
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD;
-
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -67,8 +64,6 @@ public class TestOmInit {
     clusterId = UUID.randomUUID().toString();
     scmId = UUID.randomUUID().toString();
     omId = UUID.randomUUID().toString();
-    conf.setTimeDuration(OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD,
-        2, TimeUnit.SECONDS);
     cluster =  MiniOzoneCluster.newBuilder(conf)
         .setClusterId(clusterId)
         .setScmId(scmId)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSecureOzoneManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSecureOzoneManager.java
index c94970cad4..32cd7beb17 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSecureOzoneManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSecureOzoneManager.java
@@ -43,7 +43,6 @@ import java.security.PrivateKey;
 import java.security.PublicKey;
 import java.security.cert.X509Certificate;
 import java.util.UUID;
-import java.util.concurrent.TimeUnit;
 
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY;
@@ -51,7 +50,6 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NAMES;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD;
 import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
 import static org.apache.ozone.test.GenericTestUtils.LogCapturer;
 import static org.apache.ozone.test.GenericTestUtils.getTempPath;
@@ -85,8 +83,6 @@ public class TestSecureOzoneManager {
     omId = UUID.randomUUID().toString();
     conf.setBoolean(OZONE_ACL_ENABLED, true);
     conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
-    conf.setTimeDuration(OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD,
-        2, TimeUnit.SECONDS);
     conf.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS.toString());
     conf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 2);
     conf.set(OZONE_SCM_NAMES, "localhost");
diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 77968815df..4b6a8dd342 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -1151,6 +1151,7 @@ message PurgePathRequest {
 
 message DeleteOpenKeysRequest {
   repeated OpenKeyBucket openKeysPerBucket = 1;
+  optional BucketLayoutProto bucketLayout = 2;
 }
 
 message OpenKeyBucket {
diff --git a/hadoop-ozone/ozone-manager/pom.xml b/hadoop-ozone/ozone-manager/pom.xml
index 0201397b4a..55b426e48e 100644
--- a/hadoop-ozone/ozone-manager/pom.xml
+++ b/hadoop-ozone/ozone-manager/pom.xml
@@ -194,6 +194,10 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>org.junit.jupiter</groupId>
+      <artifactId>junit-jupiter-params</artifactId>
+    </dependency>
   </dependencies>
   <build>
     <plugins>
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
index 7bcb00fafb..8bcffa60d2 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
@@ -225,4 +225,10 @@ public interface KeyManager extends OzoneManagerFS, IOzoneAcl {
    * @return Background service.
    */
   BackgroundService getDirDeletingService();
+
+  /**
+   * Returns the instance of Open Key Cleanup Service.
+   * @return Background service.
+   */
+  BackgroundService getOpenKeyCleanupService();
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index 93c59a524a..b592d25036 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -121,6 +121,10 @@ import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_INTERVAL;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_TIMEOUT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_TIMEOUT_DEFAULT;
 import static org.apache.hadoop.ozone.om.OzoneManagerUtils.getBucketLayout;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.DIRECTORY_NOT_FOUND;
@@ -167,6 +171,8 @@ public class KeyManagerImpl implements KeyManager {
   private final boolean enableFileSystemPaths;
   private BackgroundService dirDeletingService;
 
+  private BackgroundService openKeyCleanupService;
+
   @VisibleForTesting
   public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
       OMMetadataManager metadataManager, OzoneConfiguration conf, String omId,
@@ -254,6 +260,20 @@ public class KeyManagerImpl implements KeyManager {
           TimeUnit.SECONDS, serviceTimeout, ozoneManager, configuration);
       dirDeletingService.start();
     }
+
+    if (openKeyCleanupService == null) {
+      long serviceInterval = configuration.getTimeDuration(
+          OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_INTERVAL,
+          OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_DEFAULT,
+          TimeUnit.MILLISECONDS);
+      long serviceTimeout = configuration.getTimeDuration(
+          OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_TIMEOUT,
+          OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_TIMEOUT_DEFAULT,
+          TimeUnit.MILLISECONDS);
+      openKeyCleanupService = new OpenKeyCleanupService(serviceInterval,
+          TimeUnit.MILLISECONDS, serviceTimeout, ozoneManager, configuration);
+      openKeyCleanupService.start();
+    }
   }
 
   KeyProviderCryptoExtension getKMSProvider() {
@@ -270,6 +290,10 @@ public class KeyManagerImpl implements KeyManager {
       dirDeletingService.shutdown();
       dirDeletingService = null;
     }
+    if (openKeyCleanupService != null) {
+      openKeyCleanupService.shutdown();
+      openKeyCleanupService = null;
+    }
   }
 
   private OmBucketInfo getBucketInfo(String volumeName, String bucketName)
@@ -604,6 +628,10 @@ public class KeyManagerImpl implements KeyManager {
     return dirDeletingService;
   }
 
+  public BackgroundService getOpenKeyCleanupService() {
+    return openKeyCleanupService;
+  }
+
   @Override
   public OmMultipartUploadList listMultipartUploads(String volumeName,
       String bucketName, String prefix) throws OMException {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java
index 1a4e17c96f..3cb9035149 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java
@@ -18,18 +18,33 @@
 
 package org.apache.hadoop.ozone.om;
 
-import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.utils.BackgroundService;
 import org.apache.hadoop.hdds.utils.BackgroundTask;
 import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
 import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteOpenKeysRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OpenKeyBucket;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.time.Duration;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * This is the background service to delete hanging open keys.
@@ -39,32 +54,109 @@ import java.util.concurrent.TimeUnit;
  * success for keys, then clean up those keys.
  */
 public class OpenKeyCleanupService extends BackgroundService {
-
   private static final Logger LOG =
       LoggerFactory.getLogger(OpenKeyCleanupService.class);
 
-  private static final int OPEN_KEY_DELETING_CORE_POOL_SIZE = 2;
+  // Use only a single thread for OpenKeyCleanup. Multiple threads would read
+  // from the same table and can send deletion requests for same key multiple
+  // times.
+  private static final int OPEN_KEY_DELETING_CORE_POOL_SIZE = 1;
 
+  private final OzoneManager ozoneManager;
   private final KeyManager keyManager;
-  private final ScmBlockLocationProtocol scmClient;
-
-  public OpenKeyCleanupService(ScmBlockLocationProtocol scmClient,
-      KeyManager keyManager, int serviceInterval,
-      long serviceTimeout) {
-    super("OpenKeyCleanupService", serviceInterval, TimeUnit.SECONDS,
-        OPEN_KEY_DELETING_CORE_POOL_SIZE, serviceTimeout);
-    this.keyManager = keyManager;
-    this.scmClient = scmClient;
+  // Dummy client ID to use for response, since this is triggered by a
+  // service, not the client.
+  private final ClientId clientId = ClientId.randomId();
+  private final Duration expireThreshold;
+  private final int cleanupLimitPerTask;
+  private final AtomicLong submittedOpenKeyCount;
+  private final AtomicLong runCount;
+  private final AtomicBoolean suspended;
+
+  public OpenKeyCleanupService(long interval, TimeUnit unit, long timeout,
+      OzoneManager ozoneManager, ConfigurationSource conf) {
+    super("OpenKeyCleanupService", interval, unit,
+        OPEN_KEY_DELETING_CORE_POOL_SIZE, timeout);
+    this.ozoneManager = ozoneManager;
+    this.keyManager = ozoneManager.getKeyManager();
+
+    long expireMillis = conf.getTimeDuration(
+        OMConfigKeys.OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD,
+        OMConfigKeys.OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    this.expireThreshold = Duration.ofMillis(expireMillis);
+
+    this.cleanupLimitPerTask = conf.getInt(
+        OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_LIMIT_PER_TASK,
+        OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_LIMIT_PER_TASK_DEFAULT);
+
+    this.submittedOpenKeyCount = new AtomicLong(0);
+    this.runCount = new AtomicLong(0);
+    this.suspended = new AtomicBoolean(false);
+  }
+
+  /**
+   * Returns the number of times this Background service has run.
+   *
+   * @return Long, run count.
+   */
+  @VisibleForTesting
+  public long getRunCount() {
+    return runCount.get();
+  }
+
+  /**
+   * Suspend the service (for testing).
+   */
+  @VisibleForTesting
+  public void suspend() {
+    suspended.set(true);
+  }
+
+  /**
+   * Resume the service if suspended (for testing).
+   */
+  @VisibleForTesting
+  public void resume() {
+    suspended.set(false);
+  }
+
+  /**
+   * Returns the number of open keys that were submitted for deletion by this
+   * service. If these keys were committed from the open key table between
+   * being submitted for deletion and the actual delete operation, they will
+   * not be deleted.
+   *
+   * @return long count.
+   */
+  @VisibleForTesting
+  public long getSubmittedOpenKeyCount() {
+    return submittedOpenKeyCount.get();
   }
 
   @Override
   public BackgroundTaskQueue getTasks() {
     BackgroundTaskQueue queue = new BackgroundTaskQueue();
-    queue.add(new OpenKeyDeletingTask());
+    queue.add(new OpenKeyCleanupTask(BucketLayout.DEFAULT));
+    queue.add(new OpenKeyCleanupTask(BucketLayout.FILE_SYSTEM_OPTIMIZED));
     return queue;
   }
 
-  private class OpenKeyDeletingTask implements BackgroundTask {
+  private boolean shouldRun() {
+    return !suspended.get() && ozoneManager.isLeaderReady();
+  }
+
+  private boolean isRatisEnabled() {
+    return ozoneManager.isRatisEnabled();
+  }
+
+  private class OpenKeyCleanupTask implements BackgroundTask {
+
+    private final BucketLayout bucketLayout;
+
+    OpenKeyCleanupTask(BucketLayout bucketLayout) {
+      this.bucketLayout = bucketLayout;
+    }
 
     @Override
     public int getPriority() {
@@ -73,18 +165,72 @@ public class OpenKeyCleanupService extends BackgroundService {
 
     @Override
     public BackgroundTaskResult call() throws Exception {
-      // This method is currently never used. It will be implemented in
-      // HDDS-4122, and integrated into the rest of the code base in HDDS-4123.
+      if (!shouldRun()) {
+        return BackgroundTaskResult.EmptyTaskResult.newResult();
+      }
+
+      runCount.incrementAndGet();
+      long startTime = Time.monotonicNow();
+      List<OpenKeyBucket> openKeyBuckets = null;
       try {
-        // The new API for deleting expired open keys in OM HA will differ
-        // significantly from the old implementation.
-        // The old implementation has been removed so the code compiles.
-        keyManager.getExpiredOpenKeys(Duration.ZERO, 0, BucketLayout.DEFAULT);
+        openKeyBuckets = keyManager.getExpiredOpenKeys(expireThreshold,
+            cleanupLimitPerTask, bucketLayout);
       } catch (IOException e) {
-        LOG.error("Unable to get hanging open keys, retry in"
-            + " next interval", e);
+        LOG.error("Unable to get hanging open keys, retry in next interval", e);
+      }
+
+      if (openKeyBuckets != null && !openKeyBuckets.isEmpty()) {
+        int numOpenKeys = openKeyBuckets.stream()
+            .mapToInt(OpenKeyBucket::getKeysCount).sum();
+
+        OMRequest omRequest = createRequest(openKeyBuckets);
+        submitRequest(omRequest);
+
+        LOG.debug("Number of expired keys submitted for deletion: {}, elapsed"
+            + " time: {}ms", numOpenKeys, Time.monotonicNow() - startTime);
+        submittedOpenKeyCount.addAndGet(numOpenKeys);
       }
       return BackgroundTaskResult.EmptyTaskResult.newResult();
     }
+
+    private OMRequest createRequest(List<OpenKeyBucket> openKeyBuckets) {
+      DeleteOpenKeysRequest request =
+          DeleteOpenKeysRequest.newBuilder()
+              .addAllOpenKeysPerBucket(openKeyBuckets)
+              .setBucketLayout(bucketLayout.toProto())
+              .build();
+
+      OMRequest omRequest = OMRequest.newBuilder()
+          .setCmdType(Type.DeleteOpenKeys)
+          .setDeleteOpenKeysRequest(request)
+          .setClientId(clientId.toString())
+          .build();
+
+      return omRequest;
+    }
+
+    private void submitRequest(OMRequest omRequest) {
+      try {
+        if (isRatisEnabled()) {
+          OzoneManagerRatisServer server = ozoneManager.getOmRatisServer();
+
+          RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder()
+              .setClientId(clientId)
+              .setServerId(server.getRaftPeerId())
+              .setGroupId(server.getRaftGroupId())
+              .setCallId(runCount.get())
+              .setMessage(Message.valueOf(
+                  OMRatisHelper.convertRequestToByteString(omRequest)))
+              .setType(RaftClientRequest.writeRequestType())
+              .build();
+
+          server.submitRequest(omRequest, raftClientRequest);
+        } else {
+          ozoneManager.getOmServerProtocol().submitRequest(null, omRequest);
+        }
+      } catch (ServiceException e) {
+        LOG.error("Open key delete request failed. Will retry at next run.", e);
+      }
+    }
   }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
index 3c1c59644b..cd52bd7143 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.ozone.om.request.bucket.acl.OMBucketRemoveAclRequest;
 import org.apache.hadoop.ozone.om.request.bucket.acl.OMBucketSetAclRequest;
 import org.apache.hadoop.ozone.om.request.key.OMKeyPurgeRequest;
 import org.apache.hadoop.ozone.om.request.key.OMDirectoriesPurgeRequestWithFSO;
+import org.apache.hadoop.ozone.om.request.key.OMOpenKeysDeleteRequest;
 import org.apache.hadoop.ozone.om.request.key.OMTrashRecoverRequest;
 import org.apache.hadoop.ozone.om.request.key.acl.OMKeyAddAclRequest;
 import org.apache.hadoop.ozone.om.request.key.acl.OMKeyAddAclRequestWithFSO;
@@ -208,6 +209,13 @@ public final class OzoneManagerRatisUtils {
       return new OMTenantRevokeAdminRequest(omRequest);
     case SetRangerServiceVersion:
       return new OMSetRangerServiceVersionRequest(omRequest);
+    case DeleteOpenKeys:
+      BucketLayout bktLayout = BucketLayout.DEFAULT;
+      if (omRequest.getDeleteOpenKeysRequest().hasBucketLayout()) {
+        bktLayout = BucketLayout.fromProto(
+            omRequest.getDeleteOpenKeysRequest().getBucketLayout());
+      }
+      return new OMOpenKeysDeleteRequest(omRequest, bktLayout);
 
     /*
      * Key requests that can have multiple variants based on the bucket layout
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOpenKeyCleanupService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOpenKeyCleanupService.java
new file mode 100644
index 0000000000..0fa4114e4c
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOpenKeyCleanupService.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.hdds.utils.db.DBConfigFromFile;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.util.ExitUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_INTERVAL;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test Key Deleting Service.
+ * <p>
+ * This test does the following things.
+ * <p>
+ * 1. Creates a bunch of keys. 2. Then executes delete key directly using
+ * Metadata Manager. 3. Waits for a while for the KeyDeleting Service to pick up
+ * and call into SCM. 4. Confirms that calls have been successful.
+ */
+public class TestOpenKeyCleanupService {
+  private OzoneManagerProtocol writeClient;
+  private OzoneManager om;
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestOpenKeyCleanupService.class);
+
+  private static final Duration SERVICE_INTERVAL = Duration.ofMillis(500);
+  private static final Duration EXPIRE_THRESHOLD = Duration.ofMillis(1000);
+  private KeyManager keyManager;
+  private OMMetadataManager omMetadataManager;
+
+  @BeforeAll
+  public static void setup() {
+    ExitUtils.disableSystemExit();
+  }
+
+  @BeforeEach
+  public void createConfAndInitValues(@TempDir Path tempDir) throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    System.setProperty(DBConfigFromFile.CONFIG_DIR, "/");
+    ServerUtils.setOzoneMetaDirPath(conf, tempDir.toString());
+    conf.setTimeDuration(OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_INTERVAL,
+        SERVICE_INTERVAL.toMillis(), TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD,
+        EXPIRE_THRESHOLD.toMillis(), TimeUnit.MILLISECONDS);
+    conf.setQuietMode(false);
+    OmTestManagers omTestManagers = new OmTestManagers(conf);
+    keyManager = omTestManagers.getKeyManager();
+    omMetadataManager = omTestManagers.getMetadataManager();
+    writeClient = omTestManagers.getWriteClient();
+    om = omTestManagers.getOzoneManager();
+  }
+
+  @AfterEach
+  public void cleanup() throws Exception {
+    om.stop();
+  }
+
+  /**
+   * In this test, we create a bunch of keys and delete them. Then we start the
+   * KeyDeletingService and pass a SCMClient which does not fail. We make sure
+   * that all the keys that we deleted is picked up and deleted by
+   * OzoneManager.
+   *
+   * @throws IOException - on Failure.
+   */
+  @ParameterizedTest
+  @CsvSource({
+      "99, 0",
+      "0, 88",
+      "66, 77"
+  })
+  @Timeout(300)
+  public void checkIfCleanupServiceIsDeletingExpiredOpenKeys(
+      int numDEFKeys, int numFSOKeys) throws Exception {
+
+    OpenKeyCleanupService openKeyCleanupService =
+        (OpenKeyCleanupService) keyManager.getOpenKeyCleanupService();
+
+    openKeyCleanupService.suspend();
+    // wait for submitted tasks to complete
+    Thread.sleep(SERVICE_INTERVAL.toMillis());
+    final long oldkeyCount = openKeyCleanupService.getSubmittedOpenKeyCount();
+    final long oldrunCount = openKeyCleanupService.getRunCount();
+
+    final int keyCount = numDEFKeys + numFSOKeys;
+    createOpenKeys(numDEFKeys, BucketLayout.DEFAULT);
+    createOpenKeys(numFSOKeys, BucketLayout.FILE_SYSTEM_OPTIMIZED);
+
+    // wait for open keys to expire
+    Thread.sleep(EXPIRE_THRESHOLD.toMillis());
+
+    assertEquals(numDEFKeys == 0, keyManager.getExpiredOpenKeys(
+        EXPIRE_THRESHOLD, 1, BucketLayout.DEFAULT).isEmpty());
+    assertEquals(numFSOKeys == 0, keyManager.getExpiredOpenKeys(
+        EXPIRE_THRESHOLD, 1, BucketLayout.FILE_SYSTEM_OPTIMIZED).isEmpty());
+
+    openKeyCleanupService.resume();
+
+    GenericTestUtils.waitFor(() -> openKeyCleanupService
+            .getRunCount() > oldrunCount,
+        (int) SERVICE_INTERVAL.toMillis(),
+        5 * (int) SERVICE_INTERVAL.toMillis());
+
+    // wait for requests to complete
+    Thread.sleep(SERVICE_INTERVAL.toMillis());
+
+    assertTrue(openKeyCleanupService.getSubmittedOpenKeyCount() >=
+        oldkeyCount + keyCount);
+    assertTrue(keyManager.getExpiredOpenKeys(EXPIRE_THRESHOLD,
+        1, BucketLayout.DEFAULT).isEmpty());
+    assertTrue(keyManager.getExpiredOpenKeys(EXPIRE_THRESHOLD,
+        1, BucketLayout.FILE_SYSTEM_OPTIMIZED).isEmpty());
+  }
+
+  private void createOpenKeys(int keyCount, BucketLayout bucketLayout)
+      throws IOException {
+    String volume = UUID.randomUUID().toString();
+    String bucket = UUID.randomUUID().toString();
+    for (int x = 0; x < keyCount; x++) {
+      if (RandomUtils.nextBoolean()) {
+        bucket = UUID.randomUUID().toString();
+        if (RandomUtils.nextBoolean()) {
+          volume = UUID.randomUUID().toString();
+        }
+      }
+      String key = UUID.randomUUID().toString();
+      createVolumeAndBucket(volume, bucket, bucketLayout);
+
+      final int numBlocks = RandomUtils.nextInt(0, 3);
+      // Create the key
+      createOpenKey(volume, bucket, key, numBlocks);
+    }
+  }
+
+  private void createVolumeAndBucket(String volumeName, String bucketName,
+      BucketLayout bucketLayout) throws IOException {
+    // cheat here, just create a volume and bucket entry so that we can
+    // create the keys, we put the same data for key and value since the
+    // system does not decode the object
+    OMRequestTestUtils.addVolumeToOM(omMetadataManager,
+        OmVolumeArgs.newBuilder()
+            .setOwnerName("o")
+            .setAdminName("a")
+            .setVolume(volumeName)
+            .build());
+
+    OMRequestTestUtils.addBucketToOM(omMetadataManager,
+        OmBucketInfo.newBuilder().setVolumeName(volumeName)
+            .setBucketName(bucketName)
+            .setBucketLayout(bucketLayout)
+            .build());
+  }
+
+  private void createOpenKey(String volumeName, String bucketName,
+      String keyName, int numBlocks) throws IOException {
+    OmKeyArgs keyArg =
+        new OmKeyArgs.Builder()
+            .setVolumeName(volumeName)
+            .setBucketName(bucketName)
+            .setKeyName(keyName)
+            .setAcls(Collections.emptyList())
+            .setReplicationConfig(StandaloneReplicationConfig.getInstance(
+                HddsProtos.ReplicationFactor.ONE))
+            .setLocationInfoList(new ArrayList<>())
+            .build();
+
+    // Open and write the key without commit it.
+    OpenKeySession session = writeClient.openKey(keyArg);
+    for (int i = 0; i < numBlocks; i++) {
+      keyArg.addLocationInfo(writeClient.allocateBlock(keyArg, session.getId(),
+          new ExcludeList()));
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org