You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ca...@apache.org on 2022/06/20 02:14:33 UTC
[ozone] branch master updated: HDDS-4123. Integrate OM Open Key Cleanup Service Into Existing Code (#3319)
This is an automated email from the ASF dual-hosted git repository.
captainzmc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 321f5856e1 HDDS-4123. Integrate OM Open Key Cleanup Service Into Existing Code (#3319)
321f5856e1 is described below
commit 321f5856e10be89408616ff67fd7e0382cdde5e2
Author: Kaijie Chen <ch...@kaijie.org>
AuthorDate: Mon Jun 20 10:14:27 2022 +0800
HDDS-4123. Integrate OM Open Key Cleanup Service Into Existing Code (#3319)
* Enable open key cleanup service
---
.../common/src/main/resources/ozone-default.xml | 13 ++
.../org/apache/hadoop/ozone/om/OMConfigKeys.java | 5 +
.../hdds/scm/TestSCMDbCheckpointServlet.java | 4 -
.../hadoop/ozone/TestMiniOzoneOMHACluster.java | 4 -
.../org/apache/hadoop/ozone/om/TestOmInit.java | 5 -
.../hadoop/ozone/om/TestSecureOzoneManager.java | 4 -
.../src/main/proto/OmClientProtocol.proto | 1 +
hadoop-ozone/ozone-manager/pom.xml | 4 +
.../org/apache/hadoop/ozone/om/KeyManager.java | 6 +
.../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 28 +++
.../hadoop/ozone/om/OpenKeyCleanupService.java | 190 +++++++++++++++--
.../om/ratis/utils/OzoneManagerRatisUtils.java | 8 +
.../hadoop/ozone/om/TestOpenKeyCleanupService.java | 224 +++++++++++++++++++++
13 files changed, 457 insertions(+), 39 deletions(-)
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 21c580b8cd..bb0e677a8c 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1249,6 +1249,19 @@
</description>
</property>
+ <property>
+ <name>ozone.om.open.key.cleanup.service.timeout</name>
+ <value>300s</value>
+ <tag>OZONE, OM, PERFORMANCE</tag>
+ <description>A timeout value of open key cleanup service. If this is set
+ greater than 0, the service will stop waiting for the open key deleting
+ completion after this time. If timeout happens to a large proportion of
+ open key deletion, this value needs to be increased or
+ ozone.om.open.key.cleanup.limit.per.task should be decreased.
+ Unit could be defined with postfix (ns,ms,s,m,h,d)
+ </description>
+ </property>
+
<property>
<name>ozone.om.open.key.expire.threshold</name>
<value>7d</value>
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index aa34f049f4..99d56d2b02 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -91,6 +91,11 @@ public final class OMConfigKeys {
public static final String
OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_DEFAULT = "24h";
+ public static final String OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_TIMEOUT =
+ "ozone.om.open.key.cleanup.service.timeout";
+ public static final String OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_TIMEOUT_DEFAULT
+ = "300s";
+
public static final String OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD =
"ozone.om.open.key.expire.threshold";
public static final String OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD_DEFAULT =
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java
index ca8e4799ee..b2576fefa4 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java
@@ -28,7 +28,6 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.UUID;
-import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMMetrics;
@@ -40,7 +39,6 @@ import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.commons.io.FileUtils;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_FLUSH;
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD;
import org.junit.After;
import org.junit.Assert;
@@ -86,8 +84,6 @@ public class TestSCMDbCheckpointServlet {
scmId = UUID.randomUUID().toString();
omId = UUID.randomUUID().toString();
conf.setBoolean(OZONE_ACL_ENABLED, true);
- conf.setTimeDuration(OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD,
- 2, TimeUnit.SECONDS);
cluster = MiniOzoneCluster.newBuilder(conf)
.setClusterId(clusterId)
.setScmId(scmId)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneOMHACluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneOMHACluster.java
index 2aaf78e13a..65c57c5717 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneOMHACluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneOMHACluster.java
@@ -31,13 +31,11 @@ import org.junit.rules.Timeout;
import java.io.IOException;
import java.util.UUID;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD;
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD;
/**
* This class tests MiniOzoneHAClusterImpl.
@@ -71,8 +69,6 @@ public class TestMiniOzoneOMHACluster {
conf.setBoolean(OZONE_ACL_ENABLED, true);
conf.set(OzoneConfigKeys.OZONE_ADMINISTRATORS,
OZONE_ADMINISTRATORS_WILDCARD);
- conf.setTimeDuration(OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD,
- 2, TimeUnit.SECONDS);
cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newOMHABuilder(conf)
.setClusterId(clusterId)
.setScmId(scmId)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmInit.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmInit.java
index aaf07e8b7d..a93e3ce98e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmInit.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmInit.java
@@ -18,14 +18,11 @@ package org.apache.hadoop.ozone.om;
import java.io.IOException;
import java.util.UUID;
-import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD;
-
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -67,8 +64,6 @@ public class TestOmInit {
clusterId = UUID.randomUUID().toString();
scmId = UUID.randomUUID().toString();
omId = UUID.randomUUID().toString();
- conf.setTimeDuration(OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD,
- 2, TimeUnit.SECONDS);
cluster = MiniOzoneCluster.newBuilder(conf)
.setClusterId(clusterId)
.setScmId(scmId)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSecureOzoneManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSecureOzoneManager.java
index c94970cad4..32cd7beb17 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSecureOzoneManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSecureOzoneManager.java
@@ -43,7 +43,6 @@ import java.security.PrivateKey;
import java.security.PublicKey;
import java.security.cert.X509Certificate;
import java.util.UUID;
-import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY;
@@ -51,7 +50,6 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NAMES;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD;
import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
import static org.apache.ozone.test.GenericTestUtils.LogCapturer;
import static org.apache.ozone.test.GenericTestUtils.getTempPath;
@@ -85,8 +83,6 @@ public class TestSecureOzoneManager {
omId = UUID.randomUUID().toString();
conf.setBoolean(OZONE_ACL_ENABLED, true);
conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
- conf.setTimeDuration(OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD,
- 2, TimeUnit.SECONDS);
conf.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS.toString());
conf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 2);
conf.set(OZONE_SCM_NAMES, "localhost");
diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 77968815df..4b6a8dd342 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -1151,6 +1151,7 @@ message PurgePathRequest {
message DeleteOpenKeysRequest {
repeated OpenKeyBucket openKeysPerBucket = 1;
+ optional BucketLayoutProto bucketLayout = 2;
}
message OpenKeyBucket {
diff --git a/hadoop-ozone/ozone-manager/pom.xml b/hadoop-ozone/ozone-manager/pom.xml
index 0201397b4a..55b426e48e 100644
--- a/hadoop-ozone/ozone-manager/pom.xml
+++ b/hadoop-ozone/ozone-manager/pom.xml
@@ -194,6 +194,10 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-params</artifactId>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
index 7bcb00fafb..8bcffa60d2 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
@@ -225,4 +225,10 @@ public interface KeyManager extends OzoneManagerFS, IOzoneAcl {
* @return Background service.
*/
BackgroundService getDirDeletingService();
+
+ /**
+ * Returns the instance of Open Key Cleanup Service.
+ * @return Background service.
+ */
+ BackgroundService getOpenKeyCleanupService();
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index 93c59a524a..b592d25036 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -121,6 +121,10 @@ import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_INTERVAL;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_TIMEOUT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_TIMEOUT_DEFAULT;
import static org.apache.hadoop.ozone.om.OzoneManagerUtils.getBucketLayout;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.DIRECTORY_NOT_FOUND;
@@ -167,6 +171,8 @@ public class KeyManagerImpl implements KeyManager {
private final boolean enableFileSystemPaths;
private BackgroundService dirDeletingService;
+ private BackgroundService openKeyCleanupService;
+
@VisibleForTesting
public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
OMMetadataManager metadataManager, OzoneConfiguration conf, String omId,
@@ -254,6 +260,20 @@ public class KeyManagerImpl implements KeyManager {
TimeUnit.SECONDS, serviceTimeout, ozoneManager, configuration);
dirDeletingService.start();
}
+
+ if (openKeyCleanupService == null) {
+ long serviceInterval = configuration.getTimeDuration(
+ OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_INTERVAL,
+ OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ long serviceTimeout = configuration.getTimeDuration(
+ OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_TIMEOUT,
+ OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_TIMEOUT_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ openKeyCleanupService = new OpenKeyCleanupService(serviceInterval,
+ TimeUnit.MILLISECONDS, serviceTimeout, ozoneManager, configuration);
+ openKeyCleanupService.start();
+ }
}
KeyProviderCryptoExtension getKMSProvider() {
@@ -270,6 +290,10 @@ public class KeyManagerImpl implements KeyManager {
dirDeletingService.shutdown();
dirDeletingService = null;
}
+ if (openKeyCleanupService != null) {
+ openKeyCleanupService.shutdown();
+ openKeyCleanupService = null;
+ }
}
private OmBucketInfo getBucketInfo(String volumeName, String bucketName)
@@ -604,6 +628,10 @@ public class KeyManagerImpl implements KeyManager {
return dirDeletingService;
}
+ public BackgroundService getOpenKeyCleanupService() {
+ return openKeyCleanupService;
+ }
+
@Override
public OmMultipartUploadList listMultipartUploads(String volumeName,
String bucketName, String prefix) throws OMException {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java
index 1a4e17c96f..3cb9035149 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java
@@ -18,18 +18,33 @@
package org.apache.hadoop.ozone.om;
-import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.utils.BackgroundService;
import org.apache.hadoop.hdds.utils.BackgroundTask;
import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteOpenKeysRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OpenKeyBucket;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
+import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
/**
* This is the background service to delete hanging open keys.
@@ -39,32 +54,109 @@ import java.util.concurrent.TimeUnit;
* success for keys, then clean up those keys.
*/
public class OpenKeyCleanupService extends BackgroundService {
-
private static final Logger LOG =
LoggerFactory.getLogger(OpenKeyCleanupService.class);
- private static final int OPEN_KEY_DELETING_CORE_POOL_SIZE = 2;
+ // Use only a single thread for OpenKeyCleanup. Multiple threads would read
+ // from the same table and can send deletion requests for same key multiple
+ // times.
+ private static final int OPEN_KEY_DELETING_CORE_POOL_SIZE = 1;
+ private final OzoneManager ozoneManager;
private final KeyManager keyManager;
- private final ScmBlockLocationProtocol scmClient;
-
- public OpenKeyCleanupService(ScmBlockLocationProtocol scmClient,
- KeyManager keyManager, int serviceInterval,
- long serviceTimeout) {
- super("OpenKeyCleanupService", serviceInterval, TimeUnit.SECONDS,
- OPEN_KEY_DELETING_CORE_POOL_SIZE, serviceTimeout);
- this.keyManager = keyManager;
- this.scmClient = scmClient;
+ // Dummy client ID to use for response, since this is triggered by a
+ // service, not the client.
+ private final ClientId clientId = ClientId.randomId();
+ private final Duration expireThreshold;
+ private final int cleanupLimitPerTask;
+ private final AtomicLong submittedOpenKeyCount;
+ private final AtomicLong runCount;
+ private final AtomicBoolean suspended;
+
+ public OpenKeyCleanupService(long interval, TimeUnit unit, long timeout,
+ OzoneManager ozoneManager, ConfigurationSource conf) {
+ super("OpenKeyCleanupService", interval, unit,
+ OPEN_KEY_DELETING_CORE_POOL_SIZE, timeout);
+ this.ozoneManager = ozoneManager;
+ this.keyManager = ozoneManager.getKeyManager();
+
+ long expireMillis = conf.getTimeDuration(
+ OMConfigKeys.OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD,
+ OMConfigKeys.OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ this.expireThreshold = Duration.ofMillis(expireMillis);
+
+ this.cleanupLimitPerTask = conf.getInt(
+ OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_LIMIT_PER_TASK,
+ OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_LIMIT_PER_TASK_DEFAULT);
+
+ this.submittedOpenKeyCount = new AtomicLong(0);
+ this.runCount = new AtomicLong(0);
+ this.suspended = new AtomicBoolean(false);
+ }
+
+ /**
+ * Returns the number of times this Background service has run.
+ *
+ * @return Long, run count.
+ */
+ @VisibleForTesting
+ public long getRunCount() {
+ return runCount.get();
+ }
+
+ /**
+ * Suspend the service (for testing).
+ */
+ @VisibleForTesting
+ public void suspend() {
+ suspended.set(true);
+ }
+
+ /**
+ * Resume the service if suspended (for testing).
+ */
+ @VisibleForTesting
+ public void resume() {
+ suspended.set(false);
+ }
+
+ /**
+ * Returns the number of open keys that were submitted for deletion by this
+ * service. If these keys were committed from the open key table between
+ * being submitted for deletion and the actual delete operation, they will
+ * not be deleted.
+ *
+ * @return long count.
+ */
+ @VisibleForTesting
+ public long getSubmittedOpenKeyCount() {
+ return submittedOpenKeyCount.get();
}
@Override
public BackgroundTaskQueue getTasks() {
BackgroundTaskQueue queue = new BackgroundTaskQueue();
- queue.add(new OpenKeyDeletingTask());
+ queue.add(new OpenKeyCleanupTask(BucketLayout.DEFAULT));
+ queue.add(new OpenKeyCleanupTask(BucketLayout.FILE_SYSTEM_OPTIMIZED));
return queue;
}
- private class OpenKeyDeletingTask implements BackgroundTask {
+ private boolean shouldRun() {
+ return !suspended.get() && ozoneManager.isLeaderReady();
+ }
+
+ private boolean isRatisEnabled() {
+ return ozoneManager.isRatisEnabled();
+ }
+
+ private class OpenKeyCleanupTask implements BackgroundTask {
+
+ private final BucketLayout bucketLayout;
+
+ OpenKeyCleanupTask(BucketLayout bucketLayout) {
+ this.bucketLayout = bucketLayout;
+ }
@Override
public int getPriority() {
@@ -73,18 +165,72 @@ public class OpenKeyCleanupService extends BackgroundService {
@Override
public BackgroundTaskResult call() throws Exception {
- // This method is currently never used. It will be implemented in
- // HDDS-4122, and integrated into the rest of the code base in HDDS-4123.
+ if (!shouldRun()) {
+ return BackgroundTaskResult.EmptyTaskResult.newResult();
+ }
+
+ runCount.incrementAndGet();
+ long startTime = Time.monotonicNow();
+ List<OpenKeyBucket> openKeyBuckets = null;
try {
- // The new API for deleting expired open keys in OM HA will differ
- // significantly from the old implementation.
- // The old implementation has been removed so the code compiles.
- keyManager.getExpiredOpenKeys(Duration.ZERO, 0, BucketLayout.DEFAULT);
+ openKeyBuckets = keyManager.getExpiredOpenKeys(expireThreshold,
+ cleanupLimitPerTask, bucketLayout);
} catch (IOException e) {
- LOG.error("Unable to get hanging open keys, retry in"
- + " next interval", e);
+ LOG.error("Unable to get hanging open keys, retry in next interval", e);
+ }
+
+ if (openKeyBuckets != null && !openKeyBuckets.isEmpty()) {
+ int numOpenKeys = openKeyBuckets.stream()
+ .mapToInt(OpenKeyBucket::getKeysCount).sum();
+
+ OMRequest omRequest = createRequest(openKeyBuckets);
+ submitRequest(omRequest);
+
+ LOG.debug("Number of expired keys submitted for deletion: {}, elapsed"
+ + " time: {}ms", numOpenKeys, Time.monotonicNow() - startTime);
+ submittedOpenKeyCount.addAndGet(numOpenKeys);
}
return BackgroundTaskResult.EmptyTaskResult.newResult();
}
+
+ private OMRequest createRequest(List<OpenKeyBucket> openKeyBuckets) {
+ DeleteOpenKeysRequest request =
+ DeleteOpenKeysRequest.newBuilder()
+ .addAllOpenKeysPerBucket(openKeyBuckets)
+ .setBucketLayout(bucketLayout.toProto())
+ .build();
+
+ OMRequest omRequest = OMRequest.newBuilder()
+ .setCmdType(Type.DeleteOpenKeys)
+ .setDeleteOpenKeysRequest(request)
+ .setClientId(clientId.toString())
+ .build();
+
+ return omRequest;
+ }
+
+ private void submitRequest(OMRequest omRequest) {
+ try {
+ if (isRatisEnabled()) {
+ OzoneManagerRatisServer server = ozoneManager.getOmRatisServer();
+
+ RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder()
+ .setClientId(clientId)
+ .setServerId(server.getRaftPeerId())
+ .setGroupId(server.getRaftGroupId())
+ .setCallId(runCount.get())
+ .setMessage(Message.valueOf(
+ OMRatisHelper.convertRequestToByteString(omRequest)))
+ .setType(RaftClientRequest.writeRequestType())
+ .build();
+
+ server.submitRequest(omRequest, raftClientRequest);
+ } else {
+ ozoneManager.getOmServerProtocol().submitRequest(null, omRequest);
+ }
+ } catch (ServiceException e) {
+ LOG.error("Open key delete request failed. Will retry at next run.", e);
+ }
+ }
}
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
index 3c1c59644b..cd52bd7143 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.ozone.om.request.bucket.acl.OMBucketRemoveAclRequest;
import org.apache.hadoop.ozone.om.request.bucket.acl.OMBucketSetAclRequest;
import org.apache.hadoop.ozone.om.request.key.OMKeyPurgeRequest;
import org.apache.hadoop.ozone.om.request.key.OMDirectoriesPurgeRequestWithFSO;
+import org.apache.hadoop.ozone.om.request.key.OMOpenKeysDeleteRequest;
import org.apache.hadoop.ozone.om.request.key.OMTrashRecoverRequest;
import org.apache.hadoop.ozone.om.request.key.acl.OMKeyAddAclRequest;
import org.apache.hadoop.ozone.om.request.key.acl.OMKeyAddAclRequestWithFSO;
@@ -208,6 +209,13 @@ public final class OzoneManagerRatisUtils {
return new OMTenantRevokeAdminRequest(omRequest);
case SetRangerServiceVersion:
return new OMSetRangerServiceVersionRequest(omRequest);
+ case DeleteOpenKeys:
+ BucketLayout bktLayout = BucketLayout.DEFAULT;
+ if (omRequest.getDeleteOpenKeysRequest().hasBucketLayout()) {
+ bktLayout = BucketLayout.fromProto(
+ omRequest.getDeleteOpenKeysRequest().getBucketLayout());
+ }
+ return new OMOpenKeysDeleteRequest(omRequest, bktLayout);
/*
* Key requests that can have multiple variants based on the bucket layout
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOpenKeyCleanupService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOpenKeyCleanupService.java
new file mode 100644
index 0000000000..0fa4114e4c
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOpenKeyCleanupService.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.hdds.utils.db.DBConfigFromFile;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.util.ExitUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_INTERVAL;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test Key Deleting Service.
+ * <p>
+ * This test does the following things.
+ * <p>
+ * 1. Creates a bunch of keys. 2. Then executes delete key directly using
+ * Metadata Manager. 3. Waits for a while for the KeyDeleting Service to pick up
+ * and call into SCM. 4. Confirms that calls have been successful.
+ */
+public class TestOpenKeyCleanupService {
+ private OzoneManagerProtocol writeClient;
+ private OzoneManager om;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestOpenKeyCleanupService.class);
+
+ private static final Duration SERVICE_INTERVAL = Duration.ofMillis(500);
+ private static final Duration EXPIRE_THRESHOLD = Duration.ofMillis(1000);
+ private KeyManager keyManager;
+ private OMMetadataManager omMetadataManager;
+
+ @BeforeAll
+ public static void setup() {
+ ExitUtils.disableSystemExit();
+ }
+
+ @BeforeEach
+ public void createConfAndInitValues(@TempDir Path tempDir) throws Exception {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ System.setProperty(DBConfigFromFile.CONFIG_DIR, "/");
+ ServerUtils.setOzoneMetaDirPath(conf, tempDir.toString());
+ conf.setTimeDuration(OZONE_OM_OPEN_KEY_CLEANUP_SERVICE_INTERVAL,
+ SERVICE_INTERVAL.toMillis(), TimeUnit.MILLISECONDS);
+ conf.setTimeDuration(OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD,
+ EXPIRE_THRESHOLD.toMillis(), TimeUnit.MILLISECONDS);
+ conf.setQuietMode(false);
+ OmTestManagers omTestManagers = new OmTestManagers(conf);
+ keyManager = omTestManagers.getKeyManager();
+ omMetadataManager = omTestManagers.getMetadataManager();
+ writeClient = omTestManagers.getWriteClient();
+ om = omTestManagers.getOzoneManager();
+ }
+
+ @AfterEach
+ public void cleanup() throws Exception {
+ om.stop();
+ }
+
+ /**
+ * In this test, we create a bunch of keys and delete them. Then we start the
+ * KeyDeletingService and pass a SCMClient which does not fail. We make sure
+ * that all the keys that we deleted is picked up and deleted by
+ * OzoneManager.
+ *
+ * @throws IOException - on Failure.
+ */
+ @ParameterizedTest
+ @CsvSource({
+ "99, 0",
+ "0, 88",
+ "66, 77"
+ })
+ @Timeout(300)
+ public void checkIfCleanupServiceIsDeletingExpiredOpenKeys(
+ int numDEFKeys, int numFSOKeys) throws Exception {
+
+ OpenKeyCleanupService openKeyCleanupService =
+ (OpenKeyCleanupService) keyManager.getOpenKeyCleanupService();
+
+ openKeyCleanupService.suspend();
+ // wait for submitted tasks to complete
+ Thread.sleep(SERVICE_INTERVAL.toMillis());
+ final long oldkeyCount = openKeyCleanupService.getSubmittedOpenKeyCount();
+ final long oldrunCount = openKeyCleanupService.getRunCount();
+
+ final int keyCount = numDEFKeys + numFSOKeys;
+ createOpenKeys(numDEFKeys, BucketLayout.DEFAULT);
+ createOpenKeys(numFSOKeys, BucketLayout.FILE_SYSTEM_OPTIMIZED);
+
+ // wait for open keys to expire
+ Thread.sleep(EXPIRE_THRESHOLD.toMillis());
+
+ assertEquals(numDEFKeys == 0, keyManager.getExpiredOpenKeys(
+ EXPIRE_THRESHOLD, 1, BucketLayout.DEFAULT).isEmpty());
+ assertEquals(numFSOKeys == 0, keyManager.getExpiredOpenKeys(
+ EXPIRE_THRESHOLD, 1, BucketLayout.FILE_SYSTEM_OPTIMIZED).isEmpty());
+
+ openKeyCleanupService.resume();
+
+ GenericTestUtils.waitFor(() -> openKeyCleanupService
+ .getRunCount() > oldrunCount,
+ (int) SERVICE_INTERVAL.toMillis(),
+ 5 * (int) SERVICE_INTERVAL.toMillis());
+
+ // wait for requests to complete
+ Thread.sleep(SERVICE_INTERVAL.toMillis());
+
+ assertTrue(openKeyCleanupService.getSubmittedOpenKeyCount() >=
+ oldkeyCount + keyCount);
+ assertTrue(keyManager.getExpiredOpenKeys(EXPIRE_THRESHOLD,
+ 1, BucketLayout.DEFAULT).isEmpty());
+ assertTrue(keyManager.getExpiredOpenKeys(EXPIRE_THRESHOLD,
+ 1, BucketLayout.FILE_SYSTEM_OPTIMIZED).isEmpty());
+ }
+
+ private void createOpenKeys(int keyCount, BucketLayout bucketLayout)
+ throws IOException {
+ String volume = UUID.randomUUID().toString();
+ String bucket = UUID.randomUUID().toString();
+ for (int x = 0; x < keyCount; x++) {
+ if (RandomUtils.nextBoolean()) {
+ bucket = UUID.randomUUID().toString();
+ if (RandomUtils.nextBoolean()) {
+ volume = UUID.randomUUID().toString();
+ }
+ }
+ String key = UUID.randomUUID().toString();
+ createVolumeAndBucket(volume, bucket, bucketLayout);
+
+ final int numBlocks = RandomUtils.nextInt(0, 3);
+ // Create the key
+ createOpenKey(volume, bucket, key, numBlocks);
+ }
+ }
+
+ private void createVolumeAndBucket(String volumeName, String bucketName,
+ BucketLayout bucketLayout) throws IOException {
+ // cheat here, just create a volume and bucket entry so that we can
+ // create the keys, we put the same data for key and value since the
+ // system does not decode the object
+ OMRequestTestUtils.addVolumeToOM(omMetadataManager,
+ OmVolumeArgs.newBuilder()
+ .setOwnerName("o")
+ .setAdminName("a")
+ .setVolume(volumeName)
+ .build());
+
+ OMRequestTestUtils.addBucketToOM(omMetadataManager,
+ OmBucketInfo.newBuilder().setVolumeName(volumeName)
+ .setBucketName(bucketName)
+ .setBucketLayout(bucketLayout)
+ .build());
+ }
+
+ private void createOpenKey(String volumeName, String bucketName,
+ String keyName, int numBlocks) throws IOException {
+ OmKeyArgs keyArg =
+ new OmKeyArgs.Builder()
+ .setVolumeName(volumeName)
+ .setBucketName(bucketName)
+ .setKeyName(keyName)
+ .setAcls(Collections.emptyList())
+ .setReplicationConfig(StandaloneReplicationConfig.getInstance(
+ HddsProtos.ReplicationFactor.ONE))
+ .setLocationInfoList(new ArrayList<>())
+ .build();
+
+ // Open and write the key without commit it.
+ OpenKeySession session = writeClient.openKey(keyArg);
+ for (int i = 0; i < numBlocks; i++) {
+ keyArg.addLocationInfo(writeClient.allocateBlock(keyArg, session.getId(),
+ new ExcludeList()));
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org