You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by we...@apache.org on 2023/02/23 15:35:58 UTC
[ozone] branch master updated: HDDS-7782. OM lease recovery for hsync'ed files. (#4255)
This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new dabe58a2bc HDDS-7782. OM lease recovery for hsync'ed files. (#4255)
dabe58a2bc is described below
commit dabe58a2bcbb09deaeb9bc2f26ef4e9ab5a417c0
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Thu Feb 23 07:35:50 2023 -0800
HDDS-7782. OM lease recovery for hsync'ed files. (#4255)
---
.../java/org/apache/hadoop/ozone/OzoneConsts.java | 3 +
...OzoneManagerProtocolClientSideTranslatorPB.java | 35 +++++------
.../java/org/apache/hadoop/fs/ozone/TestHSync.java | 54 +++++++++++++++--
.../apache/hadoop/ozone/om/ExpiredOpenKeys.java | 68 ++++++++++++++++++++++
.../apache/hadoop/ozone/om/OMMetadataManager.java | 5 +-
.../org/apache/hadoop/ozone/om/KeyManager.java | 5 +-
.../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 3 +-
.../hadoop/ozone/om/OmMetadataManagerImpl.java | 61 +++++++++++++------
.../ozone/om/request/key/OMKeyCommitRequest.java | 4 ++
.../om/request/key/OMKeyCommitRequestWithFSO.java | 6 ++
.../ozone/om/service/OpenKeyCleanupService.java | 67 +++++++++++++++------
.../hadoop/ozone/om/TestOmMetadataManager.java | 16 ++---
.../om/service/TestOpenKeyCleanupService.java | 60 ++++++++++++-------
13 files changed, 289 insertions(+), 98 deletions(-)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index aeaca8350d..13a3319a52 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -394,6 +394,9 @@ public final class OzoneConsts {
public static final int S3_REQUEST_HEADER_METADATA_SIZE_LIMIT_KB = 2;
+ /** Metadata stored in OmKeyInfo. */
+ public static final String HSYNC_CLIENT_ID = "hsyncClientId";
+
//GDPR
public static final String GDPR_FLAG = "gdprEnabled";
public static final String GDPR_ALGORITHM_NAME = "AES";
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index 538583187e..0820e4a280 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -756,6 +756,19 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
updateKey(args, clientId, false);
}
+ public static void setReplicationConfig(ReplicationConfig replication,
+ KeyArgs.Builder b) {
+ if (replication == null) {
+ return;
+ }
+ if (replication instanceof ECReplicationConfig) {
+ b.setEcReplicationConfig(((ECReplicationConfig) replication).toProto());
+ } else {
+ b.setFactor(ReplicationConfig.getLegacyFactor(replication));
+ }
+ b.setType(replication.getReplicationType());
+ }
+
private void updateKey(OmKeyArgs args, long clientId, boolean hsync)
throws IOException {
CommitKeyRequest.Builder req = CommitKeyRequest.newBuilder();
@@ -771,16 +784,7 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
.map(info -> info.getProtobuf(ClientVersion.CURRENT_VERSION))
.collect(Collectors.toList()));
- if (args.getReplicationConfig() != null) {
- if (args.getReplicationConfig() instanceof ECReplicationConfig) {
- keyArgsBuilder.setEcReplicationConfig(
- ((ECReplicationConfig) args.getReplicationConfig()).toProto());
- } else {
- keyArgsBuilder.setFactor(
- ReplicationConfig.getLegacyFactor(args.getReplicationConfig()));
- }
- keyArgsBuilder.setType(args.getReplicationConfig().getReplicationType());
- }
+ setReplicationConfig(args.getReplicationConfig(), keyArgsBuilder);
req.setKeyArgs(keyArgsBuilder.build());
req.setClientID(clientId);
@@ -1351,16 +1355,7 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
.addAllAcls(omKeyArgs.getAcls().stream().map(a ->
OzoneAcl.toProtobuf(a)).collect(Collectors.toList()));
- if (omKeyArgs.getReplicationConfig() != null) {
- if (omKeyArgs.getReplicationConfig() instanceof ECReplicationConfig) {
- keyArgs.setEcReplicationConfig(
- ((ECReplicationConfig) omKeyArgs.getReplicationConfig()).toProto());
- } else {
- keyArgs.setFactor(ReplicationConfig
- .getLegacyFactor(omKeyArgs.getReplicationConfig()));
- }
- keyArgs.setType(omKeyArgs.getReplicationConfig().getReplicationType());
- }
+ setReplicationConfig(omKeyArgs.getReplicationConfig(), keyArgs);
multipartInfoInitiateRequest.setKeyArgs(keyArgs.build());
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
index e55c20c513..d430f322aa 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.fs.ozone;
+import java.io.Closeable;
+import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.StorageUnit;
@@ -39,6 +41,7 @@ import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -130,18 +133,57 @@ public class TestHSync {
}
}
- private void runTestHSync(FileSystem fs, Path file) throws Exception {
- final byte[] data = new byte[1 << 20];
- ThreadLocalRandom.current().nextBytes(data);
+ static void runTestHSync(FileSystem fs, Path file) throws Exception {
+ try (StreamWithLength out = new StreamWithLength(
+ fs.create(file, true))) {
+ runTestHSync(fs, file, out, 1);
+ for (int i = 1; i < 5; i++) {
+ for (int j = -1; j <= 1; j++) {
+ int dataSize = (1 << (i * 5)) + j;
+ runTestHSync(fs, file, out, dataSize);
+ }
+ }
+ }
+ }
+
+ private static class StreamWithLength implements Closeable {
+ private final FSDataOutputStream out;
+ private long length = 0;
+
+ StreamWithLength(FSDataOutputStream out) {
+ this.out = out;
+ }
- try (FSDataOutputStream stream = fs.create(file, true)) {
- stream.write(data);
- stream.hsync();
+ long getLength() {
+ return length;
}
+ void writeAndHsync(byte[] data) throws IOException {
+ out.write(data);
+ out.hsync();
+ length += data.length;
+ }
+
+ @Override
+ public void close() throws IOException {
+ out.close();
+ }
+ }
+
+ static void runTestHSync(FileSystem fs, Path file,
+ StreamWithLength out, int dataSize)
+ throws Exception {
+ final long length = out.getLength();
+ final byte[] data = new byte[dataSize];
+ ThreadLocalRandom.current().nextBytes(data);
+ out.writeAndHsync(data);
+
final byte[] buffer = new byte[4 << 10];
int offset = 0;
try (FSDataInputStream in = fs.open(file)) {
+ final long skipped = in.skip(length);
+ Assertions.assertEquals(length, skipped);
+
for (; ;) {
final int n = in.read(buffer, 0, buffer.length);
if (n <= 0) {
diff --git a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/ExpiredOpenKeys.java b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/ExpiredOpenKeys.java
new file mode 100644
index 0000000000..aba290b38c
--- /dev/null
+++ b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/ExpiredOpenKeys.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CommitKeyRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OpenKey;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OpenKeyBucket;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+
+/**
+ * The expired open keys.
+ *
+ * @see OMConfigKeys#OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD_DEFAULT
+ */
+public class ExpiredOpenKeys {
+ private final Map<String, OpenKeyBucket.Builder> openKeyBuckets
+ = new HashMap<>();
+ private final List<CommitKeyRequest.Builder> hsyncKeys
+ = new ArrayList<>();
+
+ /** @return non-hsync'ed open keys. */
+ public Collection<OpenKeyBucket.Builder> getOpenKeyBuckets() {
+ return openKeyBuckets.values();
+ }
+
+ /** @return hsync'ed open keys. */
+ public List<CommitKeyRequest.Builder> getHsyncKeys() {
+ return hsyncKeys;
+ }
+
+ void addOpenKey(OmKeyInfo info, String dbOpenKeyName) {
+ final String mapKey = info.getVolumeName() + OM_KEY_PREFIX
+ + info.getBucketName();
+ openKeyBuckets.computeIfAbsent(mapKey, k -> OpenKeyBucket.newBuilder()
+ .setVolumeName(info.getVolumeName())
+ .setBucketName(info.getBucketName()))
+ .addKeys(OpenKey.newBuilder().setName(dbOpenKeyName));
+ }
+
+ void addHsyncKey(KeyArgs.Builder keyArgs, long clientId) {
+ hsyncKeys.add(CommitKeyRequest.newBuilder()
+ .setKeyArgs(keyArgs)
+ .setClientID(clientId));
+ }
+}
diff --git a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
index 2343532ac4..0229e86dda 100644
--- a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
+++ b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
@@ -43,7 +43,6 @@ import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock;
import org.apache.hadoop.hdds.utils.TransactionInfo;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OpenKeyBucket;
import org.apache.hadoop.ozone.storage.proto.
OzoneManagerStorageProtos.PersistedUserVolumeInfo;
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
@@ -276,10 +275,10 @@ public interface OMMetadataManager extends DBStoreHAManager {
* @param count The maximum number of open keys to return.
* @param expireThreshold The threshold of open key expire age.
* @param bucketLayout The type of open keys to get (e.g. DEFAULT or FSO).
- * @return a {@link List} of {@link OpenKeyBucket}, the expired open keys.
+ * @return the expired open keys.
* @throws IOException
*/
- List<OpenKeyBucket> getExpiredOpenKeys(Duration expireThreshold, int count,
+ ExpiredOpenKeys getExpiredOpenKeys(Duration expireThreshold, int count,
BucketLayout bucketLayout) throws IOException;
/**
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
index c14e97ea2b..6b7eb26ee1 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
import org.apache.hadoop.ozone.om.fs.OzoneManagerFS;
import org.apache.hadoop.hdds.utils.BackgroundService;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OpenKeyBucket;
import java.io.IOException;
import java.time.Duration;
@@ -138,10 +137,10 @@ public interface KeyManager extends OzoneManagerFS, IOzoneAcl {
* @param count The maximum number of expired open keys to return.
* @param expireThreshold The threshold of open key expiration age.
* @param bucketLayout The type of open keys to get (e.g. DEFAULT or FSO).
- * @return a {@link List} of {@link OpenKeyBucket}, the expired open keys.
+ * @return the expired open keys.
* @throws IOException
*/
- List<OpenKeyBucket> getExpiredOpenKeys(Duration expireThreshold, int count,
+ ExpiredOpenKeys getExpiredOpenKeys(Duration expireThreshold, int count,
BucketLayout bucketLayout) throws IOException;
/**
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index a6896e4d36..4ff5ad64d1 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -84,7 +84,6 @@ import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
import org.apache.hadoop.ozone.om.service.DirectoryDeletingService;
import org.apache.hadoop.ozone.om.service.KeyDeletingService;
import org.apache.hadoop.ozone.om.service.OpenKeyCleanupService;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OpenKeyBucket;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenSecretManager;
import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
@@ -593,7 +592,7 @@ public class KeyManagerImpl implements KeyManager {
}
@Override
- public List<OpenKeyBucket> getExpiredOpenKeys(Duration expireThreshold,
+ public ExpiredOpenKeys getExpiredOpenKeys(Duration expireThreshold,
int count, BucketLayout bucketLayout) throws IOException {
return metadataManager.getExpiredOpenKeys(expireThreshold, count,
bucketLayout);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index 180e5b5e6c..2a0241e3b3 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
@@ -30,6 +31,7 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import com.google.common.base.Optional;
import org.apache.hadoop.hdds.client.BlockID;
@@ -47,6 +49,7 @@ import org.apache.hadoop.hdds.utils.db.TypedTable;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
import org.apache.hadoop.hdds.utils.db.cache.TableCache.CacheType;
+import org.apache.hadoop.ozone.ClientVersion;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.common.BlockGroup;
@@ -83,12 +86,13 @@ import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.WithMetadata;
import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock;
import org.apache.hadoop.ozone.om.lock.OmReadOnlyLock;
import org.apache.hadoop.ozone.om.lock.OzoneManagerLock;
import org.apache.hadoop.hdds.utils.TransactionInfo;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OpenKey;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OpenKeyBucket;
+import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
import org.apache.hadoop.ozone.storage.proto
.OzoneManagerStorageProtos.PersistedUserVolumeInfo;
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
@@ -1391,10 +1395,11 @@ public class OmMetadataManagerImpl implements OMMetadataManager,
}
@Override
- public List<OpenKeyBucket> getExpiredOpenKeys(Duration expireThreshold,
+ public ExpiredOpenKeys getExpiredOpenKeys(Duration expireThreshold,
int count, BucketLayout bucketLayout) throws IOException {
- Map<String, OpenKeyBucket.Builder> expiredKeys = new HashMap<>();
+ final ExpiredOpenKeys expiredKeys = new ExpiredOpenKeys();
+ final Table<String, OmKeyInfo> kt = getKeyTable(bucketLayout);
// Only check for expired keys in the open key table, not its cache.
// If a key expires while it is in the cache, it will be cleaned
// up after the cache is flushed.
@@ -1404,33 +1409,55 @@ public class OmMetadataManagerImpl implements OMMetadataManager,
final long expiredCreationTimestamp =
expireThreshold.negated().plusMillis(Time.now()).toMillis();
- OpenKey.Builder builder = OpenKey.newBuilder();
int num = 0;
while (num < count && keyValueTableIterator.hasNext()) {
KeyValue<String, OmKeyInfo> openKeyValue = keyValueTableIterator.next();
String dbOpenKeyName = openKeyValue.getKey();
+
+ final int lastPrefix = dbOpenKeyName.lastIndexOf(OM_KEY_PREFIX);
+ final String dbKeyName = dbOpenKeyName.substring(0, lastPrefix);
OmKeyInfo openKeyInfo = openKeyValue.getValue();
if (openKeyInfo.getCreationTime() <= expiredCreationTimestamp) {
- final String volume = openKeyInfo.getVolumeName();
- final String bucket = openKeyInfo.getBucketName();
- final String mapKey = volume + OM_KEY_PREFIX + bucket;
- if (!expiredKeys.containsKey(mapKey)) {
- expiredKeys.put(mapKey,
- OpenKeyBucket.newBuilder()
- .setVolumeName(volume)
- .setBucketName(bucket));
+ final String clientIdString
+ = dbOpenKeyName.substring(lastPrefix + 1);
+
+ final OmKeyInfo info = kt.get(dbKeyName);
+ final boolean isHsync = java.util.Optional.ofNullable(info)
+ .map(WithMetadata::getMetadata)
+ .map(meta -> meta.get(OzoneConsts.HSYNC_CLIENT_ID))
+ .filter(id -> id.equals(clientIdString))
+ .isPresent();
+
+ if (!isHsync) {
+ // add non-hsync'ed keys
+ expiredKeys.addOpenKey(openKeyInfo, dbOpenKeyName);
+ } else {
+ // add hsync'ed keys
+ final KeyArgs.Builder keyArgs = KeyArgs.newBuilder()
+ .setVolumeName(info.getVolumeName())
+ .setBucketName(info.getBucketName())
+ .setKeyName(info.getKeyName())
+ .setDataSize(info.getDataSize());
+ java.util.Optional.ofNullable(info.getLatestVersionLocations())
+ .map(OmKeyLocationInfoGroup::getLocationList)
+ .map(Collection::stream)
+ .orElseGet(Stream::empty)
+ .map(loc -> loc.getProtobuf(ClientVersion.CURRENT_VERSION))
+ .forEach(keyArgs::addKeyLocations);
+
+ OzoneManagerProtocolClientSideTranslatorPB.setReplicationConfig(
+ info.getReplicationConfig(), keyArgs);
+
+ expiredKeys.addHsyncKey(keyArgs, Long.parseLong(clientIdString));
}
- expiredKeys.get(mapKey)
- .addKeys(builder.setName(dbOpenKeyName).build());
num++;
}
}
}
- return expiredKeys.values().stream().map(OpenKeyBucket.Builder::build)
- .collect(Collectors.toList());
+ return expiredKeys;
}
@Override
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
index 674c4ecfb3..ecc85abd09 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
@@ -208,6 +208,10 @@ public class OMKeyCommitRequest extends OMKeyRequest {
throw new OMException("Failed to " + action + " key, as " + dbOpenKey +
"entry is not found in the OpenKey table", KEY_NOT_FOUND);
}
+ if (isHSync) {
+ omKeyInfo.getMetadata().put(OzoneConsts.HSYNC_CLIENT_ID,
+ String.valueOf(commitKeyRequest.getClientID()));
+ }
omBucketInfo = getBucketInfo(omMetadataManager, volumeName, bucketName);
omKeyInfo.setDataSize(commitKeyArgs.getDataSize());
omKeyInfo.setModificationTime(commitKeyArgs.getModificationTime());
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
index 3a3473943a..909104f99e 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.om.request.key;
+import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.audit.AuditLogger;
import org.apache.hadoop.ozone.audit.OMAction;
import org.apache.hadoop.ozone.om.OMMetadataManager;
@@ -150,6 +151,11 @@ public class OMKeyCommitRequestWithFSO extends OMKeyCommitRequest {
dbOpenFileKey + "entry is not found in the OpenKey table",
KEY_NOT_FOUND);
}
+ if (isHSync) {
+ omKeyInfo.getMetadata().put(OzoneConsts.HSYNC_CLIENT_ID,
+ String.valueOf(commitKeyRequest.getClientID()));
+ }
+
omKeyInfo.setDataSize(commitKeyArgs.getDataSize());
omKeyInfo.setModificationTime(commitKeyArgs.getModificationTime());
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OpenKeyCleanupService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OpenKeyCleanupService.java
index cda1f5ac4f..f4d09875e1 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OpenKeyCleanupService.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OpenKeyCleanupService.java
@@ -25,12 +25,14 @@ import org.apache.hadoop.hdds.utils.BackgroundService;
import org.apache.hadoop.hdds.utils.BackgroundTask;
import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.ozone.om.ExpiredOpenKeys;
import org.apache.hadoop.ozone.om.KeyManager;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CommitKeyRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteOpenKeysRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OpenKeyBucket;
@@ -44,10 +46,12 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
+import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
/**
* This is the background service to delete hanging open keys.
@@ -174,34 +178,60 @@ public class OpenKeyCleanupService extends BackgroundService {
runCount.incrementAndGet();
long startTime = Time.monotonicNow();
- List<OpenKeyBucket> openKeyBuckets = null;
+ final ExpiredOpenKeys expiredOpenKeys;
try {
- openKeyBuckets = keyManager.getExpiredOpenKeys(expireThreshold,
+ expiredOpenKeys = keyManager.getExpiredOpenKeys(expireThreshold,
cleanupLimitPerTask, bucketLayout);
} catch (IOException e) {
LOG.error("Unable to get hanging open keys, retry in next interval", e);
+ return BackgroundTaskResult.EmptyTaskResult.newResult();
}
- if (openKeyBuckets != null && !openKeyBuckets.isEmpty()) {
- int numOpenKeys = openKeyBuckets.stream()
- .mapToInt(OpenKeyBucket::getKeysCount).sum();
-
- OMRequest omRequest = createRequest(openKeyBuckets);
+ final Collection<OpenKeyBucket.Builder> openKeyBuckets
+ = expiredOpenKeys.getOpenKeyBuckets();
+ final int numOpenKeys = openKeyBuckets.stream()
+ .mapToInt(OpenKeyBucket.Builder::getKeysCount)
+ .sum();
+ if (!openKeyBuckets.isEmpty()) {
+ // delete non-hsync'ed keys
+ final OMRequest omRequest = createDeleteOpenKeysRequest(
+ openKeyBuckets.stream());
submitRequest(omRequest);
+ }
+
+ final List<CommitKeyRequest.Builder> hsyncKeys
+ = expiredOpenKeys.getHsyncKeys();
+ final int numHsyncKeys = hsyncKeys.size();
+ if (!hsyncKeys.isEmpty()) {
+ // commit hsync'ed keys
+ hsyncKeys.forEach(b -> submitRequest(createCommitKeyRequest(b)));
+ }
- LOG.debug("Number of expired keys submitted for deletion: {}, elapsed"
- + " time: {}ms", numOpenKeys, Time.monotonicNow() - startTime);
- submittedOpenKeyCount.addAndGet(numOpenKeys);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Number of expired open keys submitted for deletion: {},"
+ + " for commit: {}, elapsed time: {}ms",
+ numOpenKeys, numHsyncKeys, Time.monotonicNow() - startTime);
}
- return BackgroundTaskResult.EmptyTaskResult.newResult();
+ final int numKeys = numOpenKeys + numHsyncKeys;
+ submittedOpenKeyCount.addAndGet(numKeys);
+ return () -> numKeys;
}
- private OMRequest createRequest(List<OpenKeyBucket> openKeyBuckets) {
- DeleteOpenKeysRequest request =
- DeleteOpenKeysRequest.newBuilder()
- .addAllOpenKeysPerBucket(openKeyBuckets)
- .setBucketLayout(bucketLayout.toProto())
- .build();
+ private OMRequest createCommitKeyRequest(
+ CommitKeyRequest.Builder request) {
+ return OMRequest.newBuilder()
+ .setCmdType(Type.CommitKey)
+ .setCommitKeyRequest(request)
+ .setClientId(clientId.toString())
+ .build();
+ }
+
+ private OMRequest createDeleteOpenKeysRequest(
+ Stream<OpenKeyBucket.Builder> openKeyBuckets) {
+ final DeleteOpenKeysRequest.Builder request
+ = DeleteOpenKeysRequest.newBuilder()
+ .setBucketLayout(bucketLayout.toProto());
+ openKeyBuckets.forEach(request::addOpenKeysPerBucket);
OMRequest omRequest = OMRequest.newBuilder()
.setCmdType(Type.DeleteOpenKeys)
@@ -232,7 +262,8 @@ public class OpenKeyCleanupService extends BackgroundService {
ozoneManager.getOmServerProtocol().submitRequest(null, omRequest);
}
} catch (ServiceException e) {
- LOG.error("Open key delete request failed. Will retry at next run.", e);
+ LOG.error("Open key " + omRequest.getCmdType()
+ + " request failed. Will retry at next run.", e);
}
}
}
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java
index 815a1b4baf..fa97c34a13 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java
@@ -45,6 +45,7 @@ import org.junit.jupiter.params.provider.MethodSource;
import java.io.File;
import java.time.Duration;
import java.util.Arrays;
+import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -624,17 +625,17 @@ public class TestOmMetadataManager {
}
// Test retrieving fewer expired keys than actually exist.
- List<OpenKeyBucket> someExpiredKeys =
+ final Collection<OpenKeyBucket.Builder> someExpiredKeys =
omMetadataManager.getExpiredOpenKeys(expireThreshold,
- numExpiredOpenKeys - 1, bucketLayout);
+ numExpiredOpenKeys - 1, bucketLayout).getOpenKeyBuckets();
List<String> names = getOpenKeyNames(someExpiredKeys);
assertEquals(numExpiredOpenKeys - 1, names.size());
assertTrue(expiredKeys.containsAll(names));
// Test attempting to retrieving more expired keys than actually exist.
- List<OpenKeyBucket> allExpiredKeys =
+ Collection<OpenKeyBucket.Builder> allExpiredKeys =
omMetadataManager.getExpiredOpenKeys(expireThreshold,
- numExpiredOpenKeys + 1, bucketLayout);
+ numExpiredOpenKeys + 1, bucketLayout).getOpenKeyBuckets();
names = getOpenKeyNames(allExpiredKeys);
assertEquals(numExpiredOpenKeys, names.size());
assertTrue(expiredKeys.containsAll(names));
@@ -642,15 +643,16 @@ public class TestOmMetadataManager {
// Test retrieving exact amount of expired keys that exist.
allExpiredKeys =
omMetadataManager.getExpiredOpenKeys(expireThreshold,
- numExpiredOpenKeys, bucketLayout);
+ numExpiredOpenKeys, bucketLayout).getOpenKeyBuckets();
names = getOpenKeyNames(allExpiredKeys);
assertEquals(numExpiredOpenKeys, names.size());
assertTrue(expiredKeys.containsAll(names));
}
- private List<String> getOpenKeyNames(List<OpenKeyBucket> openKeyBuckets) {
+ private List<String> getOpenKeyNames(
+ Collection<OpenKeyBucket.Builder> openKeyBuckets) {
return openKeyBuckets.stream()
- .map(OpenKeyBucket::getKeysList)
+ .map(OpenKeyBucket.Builder::getKeysList)
.flatMap(List::stream)
.map(OpenKey::getName)
.collect(Collectors.toList());
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestOpenKeyCleanupService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestOpenKeyCleanupService.java
index 0fbdeef6c2..54939f956b 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestOpenKeyCleanupService.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestOpenKeyCleanupService.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.server.ServerUtils;
import org.apache.hadoop.hdds.utils.db.DBConfigFromFile;
+import org.apache.hadoop.ozone.om.ExpiredOpenKeys;
import org.apache.hadoop.ozone.om.KeyManager;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OmTestManagers;
@@ -77,8 +78,8 @@ public class TestOpenKeyCleanupService {
private static final Logger LOG =
LoggerFactory.getLogger(TestOpenKeyCleanupService.class);
- private static final Duration SERVICE_INTERVAL = Duration.ofMillis(500);
- private static final Duration EXPIRE_THRESHOLD = Duration.ofMillis(1000);
+ private static final Duration SERVICE_INTERVAL = Duration.ofMillis(100);
+ private static final Duration EXPIRE_THRESHOLD = Duration.ofMillis(200);
private KeyManager keyManager;
private OMMetadataManager omMetadataManager;
@@ -119,13 +120,16 @@ public class TestOpenKeyCleanupService {
*/
@ParameterizedTest
@CsvSource({
- "99, 0",
- "0, 88",
- "66, 77"
+ "9, 0, true",
+ "0, 8, true",
+ "6, 7, true",
+ "99, 0, false",
+ "0, 88, false",
+ "66, 77, false"
})
@Timeout(300)
- public void checkIfCleanupServiceIsDeletingExpiredOpenKeys(
- int numDEFKeys, int numFSOKeys) throws Exception {
+ public void testCleanupExpiredOpenKeys(
+ int numDEFKeys, int numFSOKeys, boolean hsync) throws Exception {
OpenKeyCleanupService openKeyCleanupService =
(OpenKeyCleanupService) keyManager.getOpenKeyCleanupService();
@@ -137,16 +141,15 @@ public class TestOpenKeyCleanupService {
final long oldrunCount = openKeyCleanupService.getRunCount();
final int keyCount = numDEFKeys + numFSOKeys;
- createOpenKeys(numDEFKeys, BucketLayout.DEFAULT);
- createOpenKeys(numFSOKeys, BucketLayout.FILE_SYSTEM_OPTIMIZED);
+ createOpenKeys(numDEFKeys, hsync, BucketLayout.DEFAULT);
+ createOpenKeys(numFSOKeys, hsync, BucketLayout.FILE_SYSTEM_OPTIMIZED);
// wait for open keys to expire
Thread.sleep(EXPIRE_THRESHOLD.toMillis());
- assertEquals(numDEFKeys == 0, keyManager.getExpiredOpenKeys(
- EXPIRE_THRESHOLD, 1, BucketLayout.DEFAULT).isEmpty());
- assertEquals(numFSOKeys == 0, keyManager.getExpiredOpenKeys(
- EXPIRE_THRESHOLD, 1, BucketLayout.FILE_SYSTEM_OPTIMIZED).isEmpty());
+ assertExpiredOpenKeys(numDEFKeys == 0, hsync, BucketLayout.DEFAULT);
+ assertExpiredOpenKeys(numFSOKeys == 0, hsync,
+ BucketLayout.FILE_SYSTEM_OPTIMIZED);
openKeyCleanupService.resume();
@@ -156,18 +159,28 @@ public class TestOpenKeyCleanupService {
5 * (int) SERVICE_INTERVAL.toMillis());
// wait for requests to complete
- Thread.sleep(SERVICE_INTERVAL.toMillis());
+ final int n = hsync ? numDEFKeys + numFSOKeys : 1;
+ Thread.sleep(n * SERVICE_INTERVAL.toMillis());
assertTrue(openKeyCleanupService.getSubmittedOpenKeyCount() >=
oldkeyCount + keyCount);
- assertTrue(keyManager.getExpiredOpenKeys(EXPIRE_THRESHOLD,
- 1, BucketLayout.DEFAULT).isEmpty());
- assertTrue(keyManager.getExpiredOpenKeys(EXPIRE_THRESHOLD,
- 1, BucketLayout.FILE_SYSTEM_OPTIMIZED).isEmpty());
+ assertExpiredOpenKeys(true, hsync, BucketLayout.DEFAULT);
+ assertExpiredOpenKeys(true, hsync,
+ BucketLayout.FILE_SYSTEM_OPTIMIZED);
+ }
+
+ void assertExpiredOpenKeys(boolean expectedToEmpty, boolean hsync,
+ BucketLayout layout) throws IOException {
+ final ExpiredOpenKeys expired = keyManager.getExpiredOpenKeys(
+ EXPIRE_THRESHOLD, 100, layout);
+ final int size = (hsync ? expired.getHsyncKeys()
+ : expired.getOpenKeyBuckets()).size();
+ assertEquals(expectedToEmpty, size == 0,
+ () -> "size=" + size + ", layout=" + layout);
}
- private void createOpenKeys(int keyCount, BucketLayout bucketLayout)
- throws IOException {
+ private void createOpenKeys(int keyCount, boolean hsync,
+ BucketLayout bucketLayout) throws IOException {
String volume = UUID.randomUUID().toString();
String bucket = UUID.randomUUID().toString();
for (int x = 0; x < keyCount; x++) {
@@ -182,7 +195,7 @@ public class TestOpenKeyCleanupService {
final int numBlocks = RandomUtils.nextInt(0, 3);
// Create the key
- createOpenKey(volume, bucket, key, numBlocks);
+ createOpenKey(volume, bucket, key, numBlocks, hsync);
}
}
@@ -206,7 +219,7 @@ public class TestOpenKeyCleanupService {
}
private void createOpenKey(String volumeName, String bucketName,
- String keyName, int numBlocks) throws IOException {
+ String keyName, int numBlocks, boolean hsync) throws IOException {
OmKeyArgs keyArg =
new OmKeyArgs.Builder()
.setVolumeName(volumeName)
@@ -224,5 +237,8 @@ public class TestOpenKeyCleanupService {
keyArg.addLocationInfo(writeClient.allocateBlock(keyArg, session.getId(),
new ExcludeList()));
}
+ if (hsync) {
+ writeClient.hsyncKey(keyArg, session.getId());
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org