You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by we...@apache.org on 2023/02/23 15:35:58 UTC

[ozone] branch master updated: HDDS-7782. OM lease recovery for hsync'ed files. (#4255)

This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new dabe58a2bc HDDS-7782. OM lease recovery for hsync'ed files. (#4255)
dabe58a2bc is described below

commit dabe58a2bcbb09deaeb9bc2f26ef4e9ab5a417c0
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Thu Feb 23 07:35:50 2023 -0800

    HDDS-7782. OM lease recovery for hsync'ed files. (#4255)
---
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |  3 +
 ...OzoneManagerProtocolClientSideTranslatorPB.java | 35 +++++------
 .../java/org/apache/hadoop/fs/ozone/TestHSync.java | 54 +++++++++++++++--
 .../apache/hadoop/ozone/om/ExpiredOpenKeys.java    | 68 ++++++++++++++++++++++
 .../apache/hadoop/ozone/om/OMMetadataManager.java  |  5 +-
 .../org/apache/hadoop/ozone/om/KeyManager.java     |  5 +-
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java |  3 +-
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     | 61 +++++++++++++------
 .../ozone/om/request/key/OMKeyCommitRequest.java   |  4 ++
 .../om/request/key/OMKeyCommitRequestWithFSO.java  |  6 ++
 .../ozone/om/service/OpenKeyCleanupService.java    | 67 +++++++++++++++------
 .../hadoop/ozone/om/TestOmMetadataManager.java     | 16 ++---
 .../om/service/TestOpenKeyCleanupService.java      | 60 ++++++++++++-------
 13 files changed, 289 insertions(+), 98 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index aeaca8350d..13a3319a52 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -394,6 +394,9 @@ public final class OzoneConsts {
 
   public static final int S3_REQUEST_HEADER_METADATA_SIZE_LIMIT_KB = 2;
 
+  /** Metadata stored in OmKeyInfo. */
+  public static final String HSYNC_CLIENT_ID = "hsyncClientId";
+
   //GDPR
   public static final String GDPR_FLAG = "gdprEnabled";
   public static final String GDPR_ALGORITHM_NAME = "AES";
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index 538583187e..0820e4a280 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -756,6 +756,19 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
     updateKey(args, clientId, false);
   }
 
+  public static void setReplicationConfig(ReplicationConfig replication,
+      KeyArgs.Builder b) {
+    if (replication == null) {
+      return;
+    }
+    if (replication instanceof ECReplicationConfig) {
+      b.setEcReplicationConfig(((ECReplicationConfig) replication).toProto());
+    } else {
+      b.setFactor(ReplicationConfig.getLegacyFactor(replication));
+    }
+    b.setType(replication.getReplicationType());
+  }
+
   private void updateKey(OmKeyArgs args, long clientId, boolean hsync)
       throws IOException {
     CommitKeyRequest.Builder req = CommitKeyRequest.newBuilder();
@@ -771,16 +784,7 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
             .map(info -> info.getProtobuf(ClientVersion.CURRENT_VERSION))
             .collect(Collectors.toList()));
 
-    if (args.getReplicationConfig() != null) {
-      if (args.getReplicationConfig() instanceof ECReplicationConfig) {
-        keyArgsBuilder.setEcReplicationConfig(
-            ((ECReplicationConfig) args.getReplicationConfig()).toProto());
-      } else {
-        keyArgsBuilder.setFactor(
-            ReplicationConfig.getLegacyFactor(args.getReplicationConfig()));
-      }
-      keyArgsBuilder.setType(args.getReplicationConfig().getReplicationType());
-    }
+    setReplicationConfig(args.getReplicationConfig(), keyArgsBuilder);
 
     req.setKeyArgs(keyArgsBuilder.build());
     req.setClientID(clientId);
@@ -1351,16 +1355,7 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
         .addAllAcls(omKeyArgs.getAcls().stream().map(a ->
             OzoneAcl.toProtobuf(a)).collect(Collectors.toList()));
 
-    if (omKeyArgs.getReplicationConfig() != null) {
-      if (omKeyArgs.getReplicationConfig() instanceof ECReplicationConfig) {
-        keyArgs.setEcReplicationConfig(
-            ((ECReplicationConfig) omKeyArgs.getReplicationConfig()).toProto());
-      } else {
-        keyArgs.setFactor(ReplicationConfig
-            .getLegacyFactor(omKeyArgs.getReplicationConfig()));
-      }
-      keyArgs.setType(omKeyArgs.getReplicationConfig().getReplicationType());
-    }
+    setReplicationConfig(omKeyArgs.getReplicationConfig(), keyArgs);
 
     multipartInfoInitiateRequest.setKeyArgs(keyArgs.build());
 
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
index e55c20c513..d430f322aa 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.fs.ozone;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import org.apache.hadoop.conf.StorageUnit;
@@ -39,6 +41,7 @@ import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
@@ -130,18 +133,57 @@ public class TestHSync {
     }
   }
 
-  private void runTestHSync(FileSystem fs, Path file) throws Exception {
-    final byte[] data = new byte[1 << 20];
-    ThreadLocalRandom.current().nextBytes(data);
+  static void runTestHSync(FileSystem fs, Path file) throws Exception {
+    try (StreamWithLength out = new StreamWithLength(
+        fs.create(file, true))) {
+      runTestHSync(fs, file, out, 1);
+      for (int i = 1; i < 5; i++) {
+        for (int j = -1; j <= 1; j++) {
+          int dataSize = (1 << (i * 5)) + j;
+          runTestHSync(fs, file, out, dataSize);
+        }
+      }
+    }
+  }
+
+  private static class StreamWithLength implements Closeable {
+    private final FSDataOutputStream out;
+    private long length = 0;
+
+    StreamWithLength(FSDataOutputStream out) {
+      this.out = out;
+    }
 
-    try (FSDataOutputStream stream = fs.create(file, true)) {
-      stream.write(data);
-      stream.hsync();
+    long getLength() {
+      return length;
     }
 
+    void writeAndHsync(byte[] data) throws IOException {
+      out.write(data);
+      out.hsync();
+      length += data.length;
+    }
+
+    @Override
+    public void close() throws IOException {
+      out.close();
+    }
+  }
+
+  static void runTestHSync(FileSystem fs, Path file,
+      StreamWithLength out, int dataSize)
+      throws Exception {
+    final long length = out.getLength();
+    final byte[] data = new byte[dataSize];
+    ThreadLocalRandom.current().nextBytes(data);
+    out.writeAndHsync(data);
+
     final byte[] buffer = new byte[4 << 10];
     int offset = 0;
     try (FSDataInputStream in = fs.open(file)) {
+      final long skipped = in.skip(length);
+      Assertions.assertEquals(length, skipped);
+
       for (; ;) {
         final int n = in.read(buffer, 0, buffer.length);
         if (n <= 0) {
diff --git a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/ExpiredOpenKeys.java b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/ExpiredOpenKeys.java
new file mode 100644
index 0000000000..aba290b38c
--- /dev/null
+++ b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/ExpiredOpenKeys.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CommitKeyRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OpenKey;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OpenKeyBucket;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+
+/**
+ * The expired open keys.
+ *
+ * @see OMConfigKeys#OZONE_OM_OPEN_KEY_EXPIRE_THRESHOLD_DEFAULT
+ */
+public class ExpiredOpenKeys {
+  private final Map<String, OpenKeyBucket.Builder> openKeyBuckets
+      = new HashMap<>();
+  private final List<CommitKeyRequest.Builder> hsyncKeys
+      = new ArrayList<>();
+
+  /** @return non-hsync'ed open keys. */
+  public Collection<OpenKeyBucket.Builder> getOpenKeyBuckets() {
+    return openKeyBuckets.values();
+  }
+
+  /** @return hsync'ed open keys. */
+  public List<CommitKeyRequest.Builder> getHsyncKeys() {
+    return hsyncKeys;
+  }
+
+  void addOpenKey(OmKeyInfo info, String dbOpenKeyName) {
+    final String mapKey = info.getVolumeName() + OM_KEY_PREFIX
+        + info.getBucketName();
+    openKeyBuckets.computeIfAbsent(mapKey, k -> OpenKeyBucket.newBuilder()
+            .setVolumeName(info.getVolumeName())
+            .setBucketName(info.getBucketName()))
+        .addKeys(OpenKey.newBuilder().setName(dbOpenKeyName));
+  }
+
+  void addHsyncKey(KeyArgs.Builder keyArgs, long clientId) {
+    hsyncKeys.add(CommitKeyRequest.newBuilder()
+        .setKeyArgs(keyArgs)
+        .setClientID(clientId));
+  }
+}
diff --git a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
index 2343532ac4..0229e86dda 100644
--- a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
+++ b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
@@ -43,7 +43,6 @@ import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock;
 import org.apache.hadoop.hdds.utils.TransactionInfo;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OpenKeyBucket;
 import org.apache.hadoop.ozone.storage.proto.
     OzoneManagerStorageProtos.PersistedUserVolumeInfo;
 import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
@@ -276,10 +275,10 @@ public interface OMMetadataManager extends DBStoreHAManager {
    * @param count The maximum number of open keys to return.
    * @param expireThreshold The threshold of open key expire age.
    * @param bucketLayout The type of open keys to get (e.g. DEFAULT or FSO).
-   * @return a {@link List} of {@link OpenKeyBucket}, the expired open keys.
+   * @return the expired open keys.
    * @throws IOException
    */
-  List<OpenKeyBucket> getExpiredOpenKeys(Duration expireThreshold, int count,
+  ExpiredOpenKeys getExpiredOpenKeys(Duration expireThreshold, int count,
       BucketLayout bucketLayout) throws IOException;
 
   /**
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
index c14e97ea2b..6b7eb26ee1 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
 import org.apache.hadoop.ozone.om.fs.OzoneManagerFS;
 import org.apache.hadoop.hdds.utils.BackgroundService;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OpenKeyBucket;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -138,10 +137,10 @@ public interface KeyManager extends OzoneManagerFS, IOzoneAcl {
    * @param count The maximum number of expired open keys to return.
    * @param expireThreshold The threshold of open key expiration age.
    * @param bucketLayout The type of open keys to get (e.g. DEFAULT or FSO).
-   * @return a {@link List} of {@link OpenKeyBucket}, the expired open keys.
+   * @return the expired open keys.
    * @throws IOException
    */
-  List<OpenKeyBucket> getExpiredOpenKeys(Duration expireThreshold, int count,
+  ExpiredOpenKeys getExpiredOpenKeys(Duration expireThreshold, int count,
       BucketLayout bucketLayout) throws IOException;
 
   /**
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index a6896e4d36..4ff5ad64d1 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -84,7 +84,6 @@ import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
 import org.apache.hadoop.ozone.om.service.DirectoryDeletingService;
 import org.apache.hadoop.ozone.om.service.KeyDeletingService;
 import org.apache.hadoop.ozone.om.service.OpenKeyCleanupService;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OpenKeyBucket;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenSecretManager;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
@@ -593,7 +592,7 @@ public class KeyManagerImpl implements KeyManager {
   }
 
   @Override
-  public List<OpenKeyBucket> getExpiredOpenKeys(Duration expireThreshold,
+  public ExpiredOpenKeys getExpiredOpenKeys(Duration expireThreshold,
       int count, BucketLayout bucketLayout) throws IOException {
     return metadataManager.getExpiredOpenKeys(expireThreshold, count,
         bucketLayout);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index 180e5b5e6c..2a0241e3b3 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.nio.file.Paths;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -30,6 +31,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import com.google.common.base.Optional;
 import org.apache.hadoop.hdds.client.BlockID;
@@ -47,6 +49,7 @@ import org.apache.hadoop.hdds.utils.db.TypedTable;
 import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import org.apache.hadoop.hdds.utils.db.cache.TableCache.CacheType;
+import org.apache.hadoop.ozone.ClientVersion;
 import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.common.BlockGroup;
@@ -83,12 +86,13 @@ import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.WithMetadata;
 import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock;
 import org.apache.hadoop.ozone.om.lock.OmReadOnlyLock;
 import org.apache.hadoop.ozone.om.lock.OzoneManagerLock;
 import org.apache.hadoop.hdds.utils.TransactionInfo;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OpenKey;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OpenKeyBucket;
+import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
 import org.apache.hadoop.ozone.storage.proto
     .OzoneManagerStorageProtos.PersistedUserVolumeInfo;
 import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
@@ -1391,10 +1395,11 @@ public class OmMetadataManagerImpl implements OMMetadataManager,
   }
 
   @Override
-  public List<OpenKeyBucket> getExpiredOpenKeys(Duration expireThreshold,
+  public ExpiredOpenKeys getExpiredOpenKeys(Duration expireThreshold,
       int count, BucketLayout bucketLayout) throws IOException {
-    Map<String, OpenKeyBucket.Builder> expiredKeys = new HashMap<>();
+    final ExpiredOpenKeys expiredKeys = new ExpiredOpenKeys();
 
+    final Table<String, OmKeyInfo> kt = getKeyTable(bucketLayout);
     // Only check for expired keys in the open key table, not its cache.
     // If a key expires while it is in the cache, it will be cleaned
     // up after the cache is flushed.
@@ -1404,33 +1409,55 @@ public class OmMetadataManagerImpl implements OMMetadataManager,
       final long expiredCreationTimestamp =
           expireThreshold.negated().plusMillis(Time.now()).toMillis();
 
-      OpenKey.Builder builder = OpenKey.newBuilder();
 
       int num = 0;
       while (num < count && keyValueTableIterator.hasNext()) {
         KeyValue<String, OmKeyInfo> openKeyValue = keyValueTableIterator.next();
         String dbOpenKeyName = openKeyValue.getKey();
+
+        final int lastPrefix = dbOpenKeyName.lastIndexOf(OM_KEY_PREFIX);
+        final String dbKeyName = dbOpenKeyName.substring(0, lastPrefix);
         OmKeyInfo openKeyInfo = openKeyValue.getValue();
 
         if (openKeyInfo.getCreationTime() <= expiredCreationTimestamp) {
-          final String volume = openKeyInfo.getVolumeName();
-          final String bucket = openKeyInfo.getBucketName();
-          final String mapKey = volume + OM_KEY_PREFIX + bucket;
-          if (!expiredKeys.containsKey(mapKey)) {
-            expiredKeys.put(mapKey,
-                OpenKeyBucket.newBuilder()
-                    .setVolumeName(volume)
-                    .setBucketName(bucket));
+          final String clientIdString
+              = dbOpenKeyName.substring(lastPrefix + 1);
+
+          final OmKeyInfo info = kt.get(dbKeyName);
+          final boolean isHsync = java.util.Optional.ofNullable(info)
+              .map(WithMetadata::getMetadata)
+              .map(meta -> meta.get(OzoneConsts.HSYNC_CLIENT_ID))
+              .filter(id -> id.equals(clientIdString))
+              .isPresent();
+
+          if (!isHsync) {
+            // add non-hsync'ed keys
+            expiredKeys.addOpenKey(openKeyInfo, dbOpenKeyName);
+          } else {
+            // add hsync'ed keys
+            final KeyArgs.Builder keyArgs = KeyArgs.newBuilder()
+                .setVolumeName(info.getVolumeName())
+                .setBucketName(info.getBucketName())
+                .setKeyName(info.getKeyName())
+                .setDataSize(info.getDataSize());
+            java.util.Optional.ofNullable(info.getLatestVersionLocations())
+                .map(OmKeyLocationInfoGroup::getLocationList)
+                .map(Collection::stream)
+                .orElseGet(Stream::empty)
+                .map(loc -> loc.getProtobuf(ClientVersion.CURRENT_VERSION))
+                .forEach(keyArgs::addKeyLocations);
+
+            OzoneManagerProtocolClientSideTranslatorPB.setReplicationConfig(
+                info.getReplicationConfig(), keyArgs);
+
+            expiredKeys.addHsyncKey(keyArgs, Long.parseLong(clientIdString));
           }
-          expiredKeys.get(mapKey)
-              .addKeys(builder.setName(dbOpenKeyName).build());
           num++;
         }
       }
     }
 
-    return expiredKeys.values().stream().map(OpenKeyBucket.Builder::build)
-        .collect(Collectors.toList());
+    return expiredKeys;
   }
 
   @Override
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
index 674c4ecfb3..ecc85abd09 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java
@@ -208,6 +208,10 @@ public class OMKeyCommitRequest extends OMKeyRequest {
         throw new OMException("Failed to " + action + " key, as " + dbOpenKey +
             "entry is not found in the OpenKey table", KEY_NOT_FOUND);
       }
+      if (isHSync) {
+        omKeyInfo.getMetadata().put(OzoneConsts.HSYNC_CLIENT_ID,
+            String.valueOf(commitKeyRequest.getClientID()));
+      }
       omBucketInfo = getBucketInfo(omMetadataManager, volumeName, bucketName);
       omKeyInfo.setDataSize(commitKeyArgs.getDataSize());
       omKeyInfo.setModificationTime(commitKeyArgs.getModificationTime());
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
index 3a3473943a..909104f99e 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ozone.om.request.key;
 
+import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.audit.AuditLogger;
 import org.apache.hadoop.ozone.audit.OMAction;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
@@ -150,6 +151,11 @@ public class OMKeyCommitRequestWithFSO extends OMKeyCommitRequest {
                 dbOpenFileKey + "entry is not found in the OpenKey table",
                 KEY_NOT_FOUND);
       }
+      if (isHSync) {
+        omKeyInfo.getMetadata().put(OzoneConsts.HSYNC_CLIENT_ID,
+            String.valueOf(commitKeyRequest.getClientID()));
+      }
+
       omKeyInfo.setDataSize(commitKeyArgs.getDataSize());
 
       omKeyInfo.setModificationTime(commitKeyArgs.getModificationTime());
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OpenKeyCleanupService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OpenKeyCleanupService.java
index cda1f5ac4f..f4d09875e1 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OpenKeyCleanupService.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OpenKeyCleanupService.java
@@ -25,12 +25,14 @@ import org.apache.hadoop.hdds.utils.BackgroundService;
 import org.apache.hadoop.hdds.utils.BackgroundTask;
 import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
 import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.ozone.om.ExpiredOpenKeys;
 import org.apache.hadoop.ozone.om.KeyManager;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
 import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CommitKeyRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteOpenKeysRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OpenKeyBucket;
@@ -44,10 +46,12 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.time.Duration;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
 
 /**
  * This is the background service to delete hanging open keys.
@@ -174,34 +178,60 @@ public class OpenKeyCleanupService extends BackgroundService {
 
       runCount.incrementAndGet();
       long startTime = Time.monotonicNow();
-      List<OpenKeyBucket> openKeyBuckets = null;
+      final ExpiredOpenKeys expiredOpenKeys;
       try {
-        openKeyBuckets = keyManager.getExpiredOpenKeys(expireThreshold,
+        expiredOpenKeys = keyManager.getExpiredOpenKeys(expireThreshold,
             cleanupLimitPerTask, bucketLayout);
       } catch (IOException e) {
         LOG.error("Unable to get hanging open keys, retry in next interval", e);
+        return BackgroundTaskResult.EmptyTaskResult.newResult();
       }
 
-      if (openKeyBuckets != null && !openKeyBuckets.isEmpty()) {
-        int numOpenKeys = openKeyBuckets.stream()
-            .mapToInt(OpenKeyBucket::getKeysCount).sum();
-
-        OMRequest omRequest = createRequest(openKeyBuckets);
+      final Collection<OpenKeyBucket.Builder> openKeyBuckets
+          = expiredOpenKeys.getOpenKeyBuckets();
+      final int numOpenKeys = openKeyBuckets.stream()
+          .mapToInt(OpenKeyBucket.Builder::getKeysCount)
+          .sum();
+      if (!openKeyBuckets.isEmpty()) {
+        // delete non-hsync'ed keys
+        final OMRequest omRequest = createDeleteOpenKeysRequest(
+            openKeyBuckets.stream());
         submitRequest(omRequest);
+      }
+
+      final List<CommitKeyRequest.Builder> hsyncKeys
+          = expiredOpenKeys.getHsyncKeys();
+      final int numHsyncKeys = hsyncKeys.size();
+      if (!hsyncKeys.isEmpty()) {
+        // commit hsync'ed keys
+        hsyncKeys.forEach(b -> submitRequest(createCommitKeyRequest(b)));
+      }
 
-        LOG.debug("Number of expired keys submitted for deletion: {}, elapsed"
-            + " time: {}ms", numOpenKeys, Time.monotonicNow() - startTime);
-        submittedOpenKeyCount.addAndGet(numOpenKeys);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Number of expired open keys submitted for deletion: {},"
+            + " for commit: {}, elapsed time: {}ms",
+            numOpenKeys, numHsyncKeys, Time.monotonicNow() - startTime);
       }
-      return BackgroundTaskResult.EmptyTaskResult.newResult();
+      final int numKeys = numOpenKeys + numHsyncKeys;
+      submittedOpenKeyCount.addAndGet(numKeys);
+      return () -> numKeys;
     }
 
-    private OMRequest createRequest(List<OpenKeyBucket> openKeyBuckets) {
-      DeleteOpenKeysRequest request =
-          DeleteOpenKeysRequest.newBuilder()
-              .addAllOpenKeysPerBucket(openKeyBuckets)
-              .setBucketLayout(bucketLayout.toProto())
-              .build();
+    private OMRequest createCommitKeyRequest(
+        CommitKeyRequest.Builder request) {
+      return OMRequest.newBuilder()
+          .setCmdType(Type.CommitKey)
+          .setCommitKeyRequest(request)
+          .setClientId(clientId.toString())
+          .build();
+    }
+
+    private OMRequest createDeleteOpenKeysRequest(
+        Stream<OpenKeyBucket.Builder> openKeyBuckets) {
+      final DeleteOpenKeysRequest.Builder request
+          = DeleteOpenKeysRequest.newBuilder()
+          .setBucketLayout(bucketLayout.toProto());
+      openKeyBuckets.forEach(request::addOpenKeysPerBucket);
 
       OMRequest omRequest = OMRequest.newBuilder()
           .setCmdType(Type.DeleteOpenKeys)
@@ -232,7 +262,8 @@ public class OpenKeyCleanupService extends BackgroundService {
           ozoneManager.getOmServerProtocol().submitRequest(null, omRequest);
         }
       } catch (ServiceException e) {
-        LOG.error("Open key delete request failed. Will retry at next run.", e);
+        LOG.error("Open key " + omRequest.getCmdType()
+            + " request failed. Will retry at next run.", e);
       }
     }
   }
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java
index 815a1b4baf..fa97c34a13 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java
@@ -45,6 +45,7 @@ import org.junit.jupiter.params.provider.MethodSource;
 import java.io.File;
 import java.time.Duration;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -624,17 +625,17 @@ public class TestOmMetadataManager {
     }
 
     // Test retrieving fewer expired keys than actually exist.
-    List<OpenKeyBucket> someExpiredKeys =
+    final Collection<OpenKeyBucket.Builder> someExpiredKeys =
         omMetadataManager.getExpiredOpenKeys(expireThreshold,
-            numExpiredOpenKeys - 1, bucketLayout);
+            numExpiredOpenKeys - 1, bucketLayout).getOpenKeyBuckets();
     List<String> names = getOpenKeyNames(someExpiredKeys);
     assertEquals(numExpiredOpenKeys - 1, names.size());
     assertTrue(expiredKeys.containsAll(names));
 
     // Test attempting to retrieving more expired keys than actually exist.
-    List<OpenKeyBucket> allExpiredKeys =
+    Collection<OpenKeyBucket.Builder> allExpiredKeys =
         omMetadataManager.getExpiredOpenKeys(expireThreshold,
-            numExpiredOpenKeys + 1, bucketLayout);
+            numExpiredOpenKeys + 1, bucketLayout).getOpenKeyBuckets();
     names = getOpenKeyNames(allExpiredKeys);
     assertEquals(numExpiredOpenKeys, names.size());
     assertTrue(expiredKeys.containsAll(names));
@@ -642,15 +643,16 @@ public class TestOmMetadataManager {
     // Test retrieving exact amount of expired keys that exist.
     allExpiredKeys =
         omMetadataManager.getExpiredOpenKeys(expireThreshold,
-            numExpiredOpenKeys, bucketLayout);
+            numExpiredOpenKeys, bucketLayout).getOpenKeyBuckets();
     names = getOpenKeyNames(allExpiredKeys);
     assertEquals(numExpiredOpenKeys, names.size());
     assertTrue(expiredKeys.containsAll(names));
   }
 
-  private List<String> getOpenKeyNames(List<OpenKeyBucket> openKeyBuckets) {
+  private List<String> getOpenKeyNames(
+      Collection<OpenKeyBucket.Builder> openKeyBuckets) {
     return openKeyBuckets.stream()
-        .map(OpenKeyBucket::getKeysList)
+        .map(OpenKeyBucket.Builder::getKeysList)
         .flatMap(List::stream)
         .map(OpenKey::getName)
         .collect(Collectors.toList());
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestOpenKeyCleanupService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestOpenKeyCleanupService.java
index 0fbdeef6c2..54939f956b 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestOpenKeyCleanupService.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestOpenKeyCleanupService.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.server.ServerUtils;
 import org.apache.hadoop.hdds.utils.db.DBConfigFromFile;
+import org.apache.hadoop.ozone.om.ExpiredOpenKeys;
 import org.apache.hadoop.ozone.om.KeyManager;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OmTestManagers;
@@ -77,8 +78,8 @@ public class TestOpenKeyCleanupService {
   private static final Logger LOG =
       LoggerFactory.getLogger(TestOpenKeyCleanupService.class);
 
-  private static final Duration SERVICE_INTERVAL = Duration.ofMillis(500);
-  private static final Duration EXPIRE_THRESHOLD = Duration.ofMillis(1000);
+  private static final Duration SERVICE_INTERVAL = Duration.ofMillis(100);
+  private static final Duration EXPIRE_THRESHOLD = Duration.ofMillis(200);
   private KeyManager keyManager;
   private OMMetadataManager omMetadataManager;
 
@@ -119,13 +120,16 @@ public class TestOpenKeyCleanupService {
    */
   @ParameterizedTest
   @CsvSource({
-      "99, 0",
-      "0, 88",
-      "66, 77"
+      "9, 0, true",
+      "0, 8, true",
+      "6, 7, true",
+      "99, 0, false",
+      "0, 88, false",
+      "66, 77, false"
   })
   @Timeout(300)
-  public void checkIfCleanupServiceIsDeletingExpiredOpenKeys(
-      int numDEFKeys, int numFSOKeys) throws Exception {
+  public void testCleanupExpiredOpenKeys(
+      int numDEFKeys, int numFSOKeys, boolean hsync) throws Exception {
 
     OpenKeyCleanupService openKeyCleanupService =
         (OpenKeyCleanupService) keyManager.getOpenKeyCleanupService();
@@ -137,16 +141,15 @@ public class TestOpenKeyCleanupService {
     final long oldrunCount = openKeyCleanupService.getRunCount();
 
     final int keyCount = numDEFKeys + numFSOKeys;
-    createOpenKeys(numDEFKeys, BucketLayout.DEFAULT);
-    createOpenKeys(numFSOKeys, BucketLayout.FILE_SYSTEM_OPTIMIZED);
+    createOpenKeys(numDEFKeys, hsync, BucketLayout.DEFAULT);
+    createOpenKeys(numFSOKeys, hsync, BucketLayout.FILE_SYSTEM_OPTIMIZED);
 
     // wait for open keys to expire
     Thread.sleep(EXPIRE_THRESHOLD.toMillis());
 
-    assertEquals(numDEFKeys == 0, keyManager.getExpiredOpenKeys(
-        EXPIRE_THRESHOLD, 1, BucketLayout.DEFAULT).isEmpty());
-    assertEquals(numFSOKeys == 0, keyManager.getExpiredOpenKeys(
-        EXPIRE_THRESHOLD, 1, BucketLayout.FILE_SYSTEM_OPTIMIZED).isEmpty());
+    assertExpiredOpenKeys(numDEFKeys == 0, hsync, BucketLayout.DEFAULT);
+    assertExpiredOpenKeys(numFSOKeys == 0, hsync,
+        BucketLayout.FILE_SYSTEM_OPTIMIZED);
 
     openKeyCleanupService.resume();
 
@@ -156,18 +159,28 @@ public class TestOpenKeyCleanupService {
         5 * (int) SERVICE_INTERVAL.toMillis());
 
     // wait for requests to complete
-    Thread.sleep(SERVICE_INTERVAL.toMillis());
+    final int n = hsync ? numDEFKeys + numFSOKeys : 1;
+    Thread.sleep(n * SERVICE_INTERVAL.toMillis());
 
     assertTrue(openKeyCleanupService.getSubmittedOpenKeyCount() >=
         oldkeyCount + keyCount);
-    assertTrue(keyManager.getExpiredOpenKeys(EXPIRE_THRESHOLD,
-        1, BucketLayout.DEFAULT).isEmpty());
-    assertTrue(keyManager.getExpiredOpenKeys(EXPIRE_THRESHOLD,
-        1, BucketLayout.FILE_SYSTEM_OPTIMIZED).isEmpty());
+    assertExpiredOpenKeys(true, hsync, BucketLayout.DEFAULT);
+    assertExpiredOpenKeys(true, hsync,
+        BucketLayout.FILE_SYSTEM_OPTIMIZED);
+  }
+
+  void assertExpiredOpenKeys(boolean expectedToEmpty, boolean hsync,
+      BucketLayout layout) throws IOException {
+    final ExpiredOpenKeys expired = keyManager.getExpiredOpenKeys(
+        EXPIRE_THRESHOLD, 100, layout);
+    final int size = (hsync ? expired.getHsyncKeys()
+        : expired.getOpenKeyBuckets()).size();
+    assertEquals(expectedToEmpty, size == 0,
+        () -> "size=" + size + ", layout=" + layout);
   }
 
-  private void createOpenKeys(int keyCount, BucketLayout bucketLayout)
-      throws IOException {
+  private void createOpenKeys(int keyCount, boolean hsync,
+      BucketLayout bucketLayout) throws IOException {
     String volume = UUID.randomUUID().toString();
     String bucket = UUID.randomUUID().toString();
     for (int x = 0; x < keyCount; x++) {
@@ -182,7 +195,7 @@ public class TestOpenKeyCleanupService {
 
       final int numBlocks = RandomUtils.nextInt(0, 3);
       // Create the key
-      createOpenKey(volume, bucket, key, numBlocks);
+      createOpenKey(volume, bucket, key, numBlocks, hsync);
     }
   }
 
@@ -206,7 +219,7 @@ public class TestOpenKeyCleanupService {
   }
 
   private void createOpenKey(String volumeName, String bucketName,
-      String keyName, int numBlocks) throws IOException {
+      String keyName, int numBlocks, boolean hsync) throws IOException {
     OmKeyArgs keyArg =
         new OmKeyArgs.Builder()
             .setVolumeName(volumeName)
@@ -224,5 +237,8 @@ public class TestOpenKeyCleanupService {
       keyArg.addLocationInfo(writeClient.allocateBlock(keyArg, session.getId(),
           new ExcludeList()));
     }
+    if (hsync) {
+      writeClient.hsyncKey(keyArg, session.getId());
+    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org