You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2017/12/12 23:53:39 UTC

hadoop git commit: HDFS-12626. Ozone : delete open key entries that will no longer be closed. Contributed by Chen Liang.

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7240 5f16cdbd2 -> 4541ead5e


HDFS-12626. Ozone : delete open key entries that will no longer be closed. Contributed by Chen Liang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4541ead5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4541ead5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4541ead5

Branch: refs/heads/HDFS-7240
Commit: 4541ead5e3db6775b097015bb0b777be8bd4cd54
Parents: 5f16cdb
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Tue Dec 12 15:46:31 2017 -0800
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Tue Dec 12 15:46:31 2017 -0800

----------------------------------------------------------------------
 .../apache/hadoop/ozone/OzoneConfigKeys.java    |  17 +++
 .../hadoop/ozone/ksm/helpers/KsmKeyInfo.java    |   5 +
 .../hadoop/ozone/ksm/KSMMetadataManager.java    |  10 ++
 .../ozone/ksm/KSMMetadataManagerImpl.java       |  38 +++++-
 .../org/apache/hadoop/ozone/ksm/KeyManager.java |  20 ++++
 .../apache/hadoop/ozone/ksm/KeyManagerImpl.java |  50 +++++++-
 .../hadoop/ozone/ksm/OpenKeyCleanupService.java | 116 +++++++++++++++++++
 .../src/main/resources/ozone-default.xml        |  21 ++++
 .../hadoop/ozone/ksm/TestKeySpaceManager.java   |  79 ++++++++++++-
 9 files changed, 352 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4541ead5/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 9474ba3..d23af37 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -171,6 +171,23 @@ public final class OzoneConfigKeys {
   public static final int OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT
       = 60000;
 
+  /**
+   * The interval of open key clean service.
+   */
+  public static final String OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS =
+      "ozone.open.key.cleanup.service.interval.seconds";
+  public static final int
+      OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS_DEFAULT
+      = 24 * 3600; // a total of 24 hour
+
+  /**
+   * An open key gets cleaned up when it is being in open state for too long.
+   */
+  public static final String OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS =
+      "ozone.open.key.expire.threshold";
+  public static final int OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT =
+      24 * 3600;
+
   public static final String OZONE_BLOCK_DELETING_SERVICE_TIMEOUT =
       "ozone.block.deleting.service.timeout";
   public static final int OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4541ead5/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/KsmKeyInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/KsmKeyInfo.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/KsmKeyInfo.java
index 3b532fe..b6054eb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/KsmKeyInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/helpers/KsmKeyInfo.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.ozone.ksm.helpers;
 
 import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyInfo;
+import org.apache.hadoop.util.Time;
 
 import java.util.List;
 import java.util.stream.Collectors;
@@ -73,6 +74,10 @@ public final class KsmKeyInfo {
     return keyLocationList;
   }
 
+  public void updateModifcationTime() {
+    this.modificationTime = Time.monotonicNow();
+  }
+
   public void appendKeyLocation(KsmKeyLocationInfo newLocation) {
     keyLocationList.add(newLocation);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4541ead5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManager.java
index bada9bf..f5a2d5b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManager.java
@@ -240,4 +240,14 @@ public interface KSMMetadataManager {
    * @throws IOException
    */
   List<BlockGroup> getPendingDeletionKeys(int count) throws IOException;
+
+  /**
+   * Returns a list of all still open key info. Which contains the info about
+   * the key name and all its associated block IDs. A pending open key has
+   * prefix #open# in KSM DB.
+   *
+   * @return a list of {@link BlockGroup} representing keys and blocks.
+   * @throws IOException
+   */
+  List<BlockGroup> getExpiredOpenKeys() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4541ead5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java
index cb04668..fbc1131 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyI
 import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeInfo;
 import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeList;
 import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.utils.BatchOperation;
 import org.apache.hadoop.utils.MetadataKeyFilters;
 import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
@@ -52,6 +53,8 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConsts.DELETING_KEY_PREFIX;
 import static org.apache.hadoop.ozone.OzoneConsts.KSM_DB_NAME;
 import static org.apache.hadoop.ozone.OzoneConsts.OPEN_KEY_ID_DELIMINATOR;
@@ -68,7 +71,7 @@ public class KSMMetadataManagerImpl implements KSMMetadataManager {
 
   private final MetadataStore store;
   private final ReadWriteLock lock;
-
+  private final long openKeyExpireThresholdMS;
 
   public KSMMetadataManagerImpl(OzoneConfiguration conf) throws IOException {
     File metaDir = OzoneUtils.getScmMetadirPath(conf);
@@ -81,6 +84,9 @@ public class KSMMetadataManagerImpl implements KSMMetadataManager {
         .setCacheSize(cacheSize * OzoneConsts.MB)
         .build();
     this.lock = new ReentrantReadWriteLock();
+    this.openKeyExpireThresholdMS = 1000 * conf.getInt(
+        OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS,
+        OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT);
   }
 
   /**
@@ -478,4 +484,34 @@ public class KSMMetadataManagerImpl implements KSMMetadataManager {
     }
     return keyBlocksList;
   }
+
+  @Override
+  public List<BlockGroup> getExpiredOpenKeys() throws IOException {
+    List<BlockGroup> keyBlocksList = Lists.newArrayList();
+    long now = Time.now();
+    final MetadataKeyFilter openKeyFilter =
+        new KeyPrefixFilter(OPEN_KEY_PREFIX);
+    List<Map.Entry<byte[], byte[]>> rangeResult =
+        store.getSequentialRangeKVs(null, Integer.MAX_VALUE,
+            openKeyFilter);
+    for (Map.Entry<byte[], byte[]> entry : rangeResult) {
+      KsmKeyInfo info =
+          KsmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(entry.getValue()));
+      long lastModify = info.getModificationTime();
+      if (now - lastModify < this.openKeyExpireThresholdMS) {
+        // consider as may still be active, not hanging.
+        continue;
+      }
+      // Get block keys as a list.
+      List<String> item = info.getKeyLocationList().stream()
+          .map(KsmKeyLocationInfo::getBlockID)
+          .collect(Collectors.toList());
+      BlockGroup keyBlocks = BlockGroup.newBuilder()
+          .setKeyName(DFSUtil.bytes2String(entry.getKey()))
+          .addAllBlockIDs(item)
+          .build();
+      keyBlocksList.add(keyBlocks);
+    }
+    return keyBlocksList;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4541ead5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManager.java
index ccc97aa..e71ce5f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManager.java
@@ -142,4 +142,24 @@ public interface KeyManager {
    * @throws IOException if specified key doesn't exist or other I/O errors.
    */
   void deletePendingDeletionKey(String objectKeyName) throws IOException;
+
+  /**
+   * Returns a list of all still open key info. Which contains the info about
+   * the key name and all its associated block IDs. A pending open key has
+   * prefix #open# in KSM DB.
+   *
+   * @return a list of {@link BlockGroup} representing keys and blocks.
+   * @throws IOException
+   */
+  List<BlockGroup> getExpiredOpenKeys() throws IOException;
+
+  /**
+   * Deletes a expired open key by its name. Called when a hanging key has been
+   * lingering for too long. Once called, the open key entries gets removed
+   * from KSM mdata data.
+   *
+   * @param objectKeyName object key name with #open# prefix.
+   * @throws IOException if specified key doesn't exist or other I/O errors.
+   */
+  void deleteExpiredOpenKey(String objectKeyName) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4541ead5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java
index 1f2af95..620816a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java
@@ -60,6 +60,10 @@ import static org.apache.hadoop.ozone
     .OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE;
 import static org.apache.hadoop.ozone
     .OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS_DEFAULT;
 import static org.apache.hadoop.ozone
     .OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
 import static org.apache.hadoop.ozone
@@ -85,6 +89,7 @@ public class KeyManagerImpl implements KeyManager {
   private final long scmBlockSize;
   private final boolean useRatis;
   private final BackgroundService keyDeletingService;
+  private final BackgroundService openKeyCleanupService;
 
   private final long preallocateMax;
   private final Random random;
@@ -97,7 +102,7 @@ public class KeyManagerImpl implements KeyManager {
         OZONE_SCM_BLOCK_SIZE_DEFAULT) * OzoneConsts.MB;
     this.useRatis = conf.getBoolean(DFS_CONTAINER_RATIS_ENABLED_KEY,
         DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
-    int svcInterval = conf.getInt(
+    int blockDeleteInterval = conf.getInt(
         OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS,
         OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT);
     long serviceTimeout = conf.getTimeDuration(
@@ -107,18 +112,25 @@ public class KeyManagerImpl implements KeyManager {
         OZONE_KEY_PREALLOCATION_MAXSIZE,
         OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT);
     keyDeletingService = new KeyDeletingService(
-        scmBlockClient, this, svcInterval, serviceTimeout, conf);
+        scmBlockClient, this, blockDeleteInterval, serviceTimeout, conf);
+    int openkeyCheckInterval = conf.getInt(
+        OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS,
+        OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS_DEFAULT);
+    openKeyCleanupService = new OpenKeyCleanupService(
+        scmBlockClient, this, openkeyCheckInterval, serviceTimeout);
     random = new Random();
   }
 
   @Override
   public void start() {
     keyDeletingService.start();
+    openKeyCleanupService.start();
   }
 
   @Override
   public void stop() throws IOException {
     keyDeletingService.shutdown();
+    openKeyCleanupService.shutdown();
   }
 
   private void validateBucket(String volumeName, String bucketName)
@@ -186,6 +198,7 @@ public class KeyManagerImpl implements KeyManager {
           .setIndex(keyInfo.getKeyLocationList().size())
           .build();
       keyInfo.appendKeyLocation(info);
+      keyInfo.updateModifcationTime();
       metadataManager.put(openKey, keyInfo.getProtobuf().toByteArray());
       return info;
     } finally {
@@ -435,4 +448,37 @@ public class KeyManagerImpl implements KeyManager {
       metadataManager.writeLock().unlock();
     }
   }
+
+  @Override
+  public List<BlockGroup> getExpiredOpenKeys() throws IOException {
+    metadataManager.readLock().lock();
+    try {
+      return metadataManager.getExpiredOpenKeys();
+    } finally {
+      metadataManager.readLock().unlock();
+    }
+  }
+
+  @Override
+  public void deleteExpiredOpenKey(String objectKeyName) throws IOException {
+    Preconditions.checkNotNull(objectKeyName);
+    if (!objectKeyName.startsWith(OzoneConsts.OPEN_KEY_PREFIX)) {
+      throw new IllegalArgumentException("Invalid key name,"
+          + " the name should be the key name with open key prefix");
+    }
+
+    // Simply removes the entry from KSM DB.
+    metadataManager.writeLock().lock();
+    try {
+      byte[] openKey = DFSUtil.string2Bytes(objectKeyName);
+      byte[] delKeyValue = metadataManager.get(openKey);
+      if (delKeyValue == null) {
+        throw new IOException("Failed to delete key " + objectKeyName
+            + " because it is not found in DB");
+      }
+      metadataManager.delete(openKey);
+    } finally {
+      metadataManager.writeLock().unlock();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4541ead5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/OpenKeyCleanupService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/OpenKeyCleanupService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/OpenKeyCleanupService.java
new file mode 100644
index 0000000..7f60bf8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/OpenKeyCleanupService.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.ksm;
+
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
+import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.utils.BackgroundService;
+import org.apache.hadoop.utils.BackgroundTask;
+import org.apache.hadoop.utils.BackgroundTaskQueue;
+import org.apache.hadoop.utils.BackgroundTaskResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This is the background service to delete hanging open keys.
+ * Scan the metadata of ksm periodically to get
+ * the keys with prefix "#open#" and ask scm to
+ * delete metadata accordingly, if scm returns
+ * success for keys, then clean up those keys.
+ */
+public class OpenKeyCleanupService extends BackgroundService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OpenKeyCleanupService.class);
+
+  private final static int OPEN_KEY_DELETING_CORE_POOL_SIZE = 2;
+
+  private final KeyManager keyManager;
+  private final ScmBlockLocationProtocol scmClient;
+
+  public OpenKeyCleanupService(ScmBlockLocationProtocol scmClient,
+      KeyManager keyManager, int serviceInterval,
+      long serviceTimeout) {
+    super("OpenKeyCleanupService", serviceInterval, TimeUnit.SECONDS,
+        OPEN_KEY_DELETING_CORE_POOL_SIZE, serviceTimeout);
+    this.keyManager = keyManager;
+    this.scmClient = scmClient;
+  }
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    BackgroundTaskQueue queue = new BackgroundTaskQueue();
+    queue.add(new OpenKeyDeletingTask());
+    return queue;
+  }
+
+  private class OpenKeyDeletingTask
+      implements BackgroundTask<BackgroundTaskResult> {
+
+    @Override
+    public int getPriority() {
+      return 0;
+    }
+
+    @Override
+    public BackgroundTaskResult call() throws Exception {
+      try {
+        List<BlockGroup> keyBlocksList = keyManager.getExpiredOpenKeys();
+        if (keyBlocksList.size() > 0) {
+          int toDeleteSize = keyBlocksList.size();
+          LOG.debug("Found {} to-delete open keys in KSM", toDeleteSize);
+          List<DeleteBlockGroupResult> results =
+              scmClient.deleteKeyBlocks(keyBlocksList);
+          int deletedSize = 0;
+          for (DeleteBlockGroupResult result : results) {
+            if (result.isSuccess()) {
+              try {
+                keyManager.deleteExpiredOpenKey(result.getObjectKey());
+                LOG.debug("Key {} deleted from KSM DB", result.getObjectKey());
+                deletedSize += 1;
+              } catch (IOException e) {
+                LOG.warn("Failed to delete hanging-open key {}",
+                    result.getObjectKey(), e);
+              }
+            } else {
+              LOG.warn("Deleting open Key {} failed because some of the blocks"
+                      + " were failed to delete, failed blocks: {}",
+                  result.getObjectKey(),
+                  String.join(",", result.getFailedBlocks()));
+            }
+          }
+          LOG.info("Found {} expired open key entries, successfully " +
+              "cleaned up {} entries", toDeleteSize, deletedSize);
+          return results::size;
+        } else {
+          LOG.debug("No hanging open key fond in KSM");
+        }
+      } catch (IOException e) {
+        LOG.error("Unable to get hanging open keys, retry in"
+            + " next interval", e);
+      }
+      return BackgroundTaskResult.EmptyTaskResult.newResult();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4541ead5/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
index c7c78b3..0f928bd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
@@ -1196,4 +1196,25 @@
       connection is limited by ozone.rest.client.http.connection.max property.
     </description>
   </property>
+
+  <property>
+    <name>ozone.open.key.cleanup.service.interval.seconds</name>
+    <value>86400</value>
+    <tag>OZONE, KSM, PERFORMANCE</tag>
+    <description>
+      A background job periodically checks open key entries and delete the expired ones. This entry controls the
+      interval of this cleanup check.
+    </description>
+  </property>
+
+  <property>
+    <name>ozone.open.key.expire.threshold </name>
+    <value>86400</value>
+    <tag>OZONE, KSM, PERFORMANCE</tag>
+    <description>
+      Controls how long an open key operation is considered active. Specifically, if a key
+      has been open longer than the value of this config entry, that open key is considered as
+      expired (e.g. due to client crash). Default to 24 hours.
+    </description>
+  </property>
 </configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4541ead5/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java
index 09cf38f..3ff753c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.client.rest.OzoneException;
 import org.apache.hadoop.ozone.web.handlers.BucketArgs;
 import org.apache.hadoop.ozone.web.handlers.KeyArgs;
@@ -69,7 +70,10 @@ import java.util.Set;
 import java.util.List;
 import java.util.UUID;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS;
 import static org.apache.hadoop.ozone.OzoneConsts.DELETING_KEY_PREFIX;
 /**
  * Test Key Space Manager operation in distributed handler scenario.
@@ -101,6 +105,8 @@ public class TestKeySpaceManager {
     scmId = UUID.randomUUID().toString();
     conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
         OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
+    conf.setInt(OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS, 2);
+    conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2);
     cluster = new MiniOzoneClassicCluster.Builder(conf)
         .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED)
         .setClusterId(clusterId)
@@ -955,7 +961,7 @@ public class TestKeySpaceManager {
    */
   @Test
   public void testGetKeyInfo() throws IOException,
-                          OzoneException, ParseException {
+      OzoneException, ParseException {
     String userName = "user" + RandomStringUtils.randomNumeric(5);
     String adminName = "admin" + RandomStringUtils.randomNumeric(5);
     String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
@@ -1053,4 +1059,75 @@ public class TestKeySpaceManager {
     Assert.assertEquals(clusterId, info.getClusterId());
     Assert.assertEquals(scmId, info.getScmId());
   }
+
+
+  @Test
+  public void testExpiredOpenKey() throws Exception {
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
+
+    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
+    createVolumeArgs.setUserName(userName);
+    createVolumeArgs.setAdminName(adminName);
+    storageHandler.createVolume(createVolumeArgs);
+
+    BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
+    bucketArgs.setAddAcls(new LinkedList<>());
+    bucketArgs.setRemoveAcls(new LinkedList<>());
+    bucketArgs.setStorageType(StorageType.DISK);
+    storageHandler.createBucket(bucketArgs);
+
+    // open some keys.
+
+    Thread.sleep(1000);
+
+    KeyArgs keyArgs1 = new KeyArgs("testKey1", bucketArgs);
+    KeyArgs keyArgs2 = new KeyArgs("testKey2", bucketArgs);
+    KeyArgs keyArgs3 = new KeyArgs("testKey3", bucketArgs);
+    KeyArgs keyArgs4 = new KeyArgs("testKey4", bucketArgs);
+    List<BlockGroup> openKeys;
+    try (OutputStream s1 = storageHandler.newKeyWriter(keyArgs1);
+         OutputStream s2 = storageHandler.newKeyWriter(keyArgs2)) {
+      storageHandler.newKeyWriter(keyArgs3);
+      storageHandler.newKeyWriter(keyArgs4);
+      // now all k1-k4 should be in open state
+      openKeys = cluster.getKeySpaceManager()
+          .getMetadataManager().getExpiredOpenKeys();
+      Assert.assertEquals(0, openKeys.size());
+
+      Thread.sleep(2000);
+
+      openKeys = cluster.getKeySpaceManager().getMetadataManager()
+          .getExpiredOpenKeys();
+      Assert.assertEquals(4, openKeys.size());
+
+      Set<String> expected = Stream.of(
+          "testKey1", "testKey2", "testKey3", "testKey4")
+          .collect(Collectors.toSet());
+      openKeys =
+          cluster.getKeySpaceManager().getMetadataManager().getExpiredOpenKeys();
+      for (BlockGroup bg : openKeys) {
+        String[] subs = bg.getGroupID().split("/");
+        String keyName = subs[subs.length - 1];
+        Assert.assertTrue(expected.remove(keyName));
+      }
+      Assert.assertEquals(0, expected.size());
+    }
+
+    KeyArgs keyArgs5 = new KeyArgs("testKey5", bucketArgs);
+    storageHandler.newKeyWriter(keyArgs5);
+
+    // k1 and k2 are closed, so should be removed from meta data, k3 and k4
+    // should still be there.
+    Thread.sleep(2000);
+
+    openKeys =
+        cluster.getKeySpaceManager().getMetadataManager().getExpiredOpenKeys();
+    Assert.assertEquals(1, openKeys.size());
+    String[] subs = openKeys.get(0).getGroupID().split("/");
+    String keyName = subs[subs.length - 1];
+    Assert.assertEquals("testKey5", keyName);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org