You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by na...@apache.org on 2018/09/05 13:05:53 UTC

hadoop git commit: HDDS-358. Use DBStore and TableStore for DeleteKeyService. Contributed by Anu Engineer

Repository: hadoop
Updated Branches:
  refs/heads/trunk dffb7bfe6 -> df0d61e3a


HDDS-358. Use DBStore and TableStore for DeleteKeyService. Contributed by Anu Engineer


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/df0d61e3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/df0d61e3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/df0d61e3

Branch: refs/heads/trunk
Commit: df0d61e3a07a958fc6d71a910d928c5639011cd7
Parents: dffb7bf
Author: Nanda kumar <na...@apache.org>
Authored: Wed Sep 5 18:35:11 2018 +0530
Committer: Nanda kumar <na...@apache.org>
Committed: Wed Sep 5 18:35:33 2018 +0530

----------------------------------------------------------------------
 .../hadoop/ozone/om/helpers/OmKeyArgs.java      |  10 ++
 .../hadoop/ozone/om/KeyDeletingService.java     | 148 +++++++++------
 .../org/apache/hadoop/ozone/om/KeyManager.java  |  24 +--
 .../apache/hadoop/ozone/om/KeyManagerImpl.java  |  40 ++++-
 .../hadoop/ozone/om/OmMetadataManagerImpl.java  |  48 +++--
 .../ozone/om/ScmBlockLocationTestIngClient.java | 178 +++++++++++++++++++
 .../hadoop/ozone/om/TestKeyDeletingService.java | 164 +++++++++++++++++
 7 files changed, 516 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/df0d61e3/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
index d8d41d5..e56ad7f 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
@@ -16,11 +16,13 @@
  * limitations under the License.
  */
 package org.apache.hadoop.ozone.om.helpers;
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.audit.Auditable;
 
+import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -102,6 +104,14 @@ public final class OmKeyArgs implements Auditable {
     return auditMap;
   }
 
+  @VisibleForTesting
+  public void addLocationInfo(OmKeyLocationInfo locationInfo) {
+    if (this.locationInfoList == null) {
+      locationInfoList = new ArrayList<>();
+    }
+    locationInfoList.add(locationInfo);
+  }
+
   /**
    * Builder class of OmKeyArgs.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/df0d61e3/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java
index ee23fe0..41a876b 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java
@@ -1,52 +1,54 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
  * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p>
  * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
  */
 package org.apache.hadoop.ozone.om;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
-import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.utils.BackgroundService;
 import org.apache.hadoop.utils.BackgroundTask;
 import org.apache.hadoop.utils.BackgroundTaskQueue;
 import org.apache.hadoop.utils.BackgroundTaskResult;
 import org.apache.hadoop.utils.BackgroundTaskResult.EmptyTaskResult;
+import org.apache.hadoop.utils.db.Table;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT;
 
 /**
- * This is the background service to delete keys.
- * Scan the metadata of om periodically to get
- * the keys with prefix "#deleting" and ask scm to
- * delete metadata accordingly, if scm returns
- * success for keys, then clean up those keys.
+ * This is the background service to delete keys. Scan the metadata of om
+ * periodically to get the keys from DeletedTable and ask scm to delete
+ * metadata accordingly, if scm returns success for keys, then clean up those
+ * keys.
  */
 public class KeyDeletingService extends BackgroundService {
-
   private static final Logger LOG =
       LoggerFactory.getLogger(KeyDeletingService.class);
 
@@ -56,6 +58,8 @@ public class KeyDeletingService extends BackgroundService {
   private final ScmBlockLocationProtocol scmClient;
   private final KeyManager manager;
   private final int keyLimitPerTask;
+  private final AtomicLong deletedKeyCount;
+  private final AtomicLong runCount;
 
   public KeyDeletingService(ScmBlockLocationProtocol scmClient,
       KeyManager manager, long serviceInterval,
@@ -66,6 +70,28 @@ public class KeyDeletingService extends BackgroundService {
     this.manager = manager;
     this.keyLimitPerTask = conf.getInt(OZONE_KEY_DELETING_LIMIT_PER_TASK,
         OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT);
+    this.deletedKeyCount = new AtomicLong(0);
+    this.runCount = new AtomicLong(0);
+  }
+
+  /**
+   * Returns the number of times this Background service has run.
+   *
+   * @return Long, run count.
+   */
+  @VisibleForTesting
+  public AtomicLong getRunCount() {
+    return runCount;
+  }
+
+  /**
+   * Returns the number of keys deleted by the background service.
+   *
+   * @return Long count.
+   */
+  @VisibleForTesting
+  public AtomicLong getDeletedKeyCount() {
+    return deletedKeyCount;
   }
 
   @Override
@@ -76,11 +102,11 @@ public class KeyDeletingService extends BackgroundService {
   }
 
   /**
-   * A key deleting task scans OM DB and looking for a certain number
-   * of pending-deletion keys, sends these keys along with their associated
-   * blocks to SCM for deletion. Once SCM confirms keys are deleted (once
-   * SCM persisted the blocks info in its deletedBlockLog), it removes
-   * these keys from the DB.
+   * A key deleting task scans OM DB and looking for a certain number of
+   * pending-deletion keys, sends these keys along with their associated blocks
+   * to SCM for deletion. Once SCM confirms keys are deleted (once SCM persisted
+   * the blocks info in its deletedBlockLog), it removes these keys from the
+   * DB.
    */
   private class KeyDeletingTask implements
       BackgroundTask<BackgroundTaskResult> {
@@ -92,51 +118,55 @@ public class KeyDeletingService extends BackgroundService {
 
     @Override
     public BackgroundTaskResult call() throws Exception {
+      runCount.incrementAndGet();
       try {
         long startTime = Time.monotonicNow();
         List<BlockGroup> keyBlocksList = manager
             .getPendingDeletionKeys(keyLimitPerTask);
-        if (keyBlocksList.size() > 0) {
-          LOG.info("Found {} to-delete keys in OM", keyBlocksList.size());
+        if (keyBlocksList != null && keyBlocksList.size() > 0) {
           List<DeleteBlockGroupResult> results =
               scmClient.deleteKeyBlocks(keyBlocksList);
-          for (DeleteBlockGroupResult result : results) {
-            if (result.isSuccess()) {
-              try {
-                // Purge key from OM DB.
-                manager.deletePendingDeletionKey(result.getObjectKey());
-                LOG.debug("Key {} deleted from OM DB", result.getObjectKey());
-              } catch (IOException e) {
-                // if a pending deletion key is failed to delete,
-                // print a warning here and retain it in this state,
-                // so that it can be attempt to delete next time.
-                LOG.warn("Failed to delete pending-deletion key {}",
-                    result.getObjectKey(), e);
-              }
-            } else {
-              // Key deletion failed, retry in next interval.
-              LOG.warn("Key {} deletion failed because some of the blocks"
-                  + " were failed to delete, failed blocks: {}",
-                  result.getObjectKey(),
-                  StringUtils.join(",", result.getFailedBlocks()));
-            }
+          if (results != null) {
+            int delCount = deleteAllKeys(results);
+            LOG.debug("Number of keys deleted: {}, elapsed time: {}ms",
+                delCount, Time.monotonicNow() - startTime);
+            deletedKeyCount.addAndGet(delCount);
           }
-
-          if (!results.isEmpty()) {
-            LOG.info("Number of key deleted from OM DB: {},"
-                + " task elapsed time: {}ms",
-                results.size(), Time.monotonicNow() - startTime);
-          }
-
-          return results::size;
-        } else {
-          LOG.debug("No pending deletion key found in OM");
         }
       } catch (IOException e) {
-        LOG.error("Unable to get pending deletion keys, retry in"
-            + " next interval", e);
+        LOG.error("Error while running delete keys background task. Will " +
+            "retry at next run.", e);
       }
+      // By desing, no one cares about the results of this call back.
       return EmptyTaskResult.newResult();
     }
+
+    /**
+     * Deletes all the keys that SCM has acknowledged and queued for delete.
+     *
+     * @param results DeleteBlockGroups returned by SCM.
+     * @throws RocksDBException on Error.
+     * @throws IOException      on Error
+     */
+    private int deleteAllKeys(List<DeleteBlockGroupResult> results)
+        throws RocksDBException, IOException {
+      Table deletedTable = manager.getMetadataManager().getDeletedTable();
+      // Put all keys to delete in a single transaction and call for delete.
+      int deletedCount = 0;
+      try (WriteBatch writeBatch = new WriteBatch()) {
+        for (DeleteBlockGroupResult result : results) {
+          if (result.isSuccess()) {
+            // Purge key from OM DB.
+            writeBatch.delete(deletedTable.getHandle(),
+                DFSUtil.string2Bytes(result.getObjectKey()));
+            LOG.debug("Key {} deleted from OM DB", result.getObjectKey());
+            deletedCount++;
+          }
+        }
+        // Write a single transaction for delete.
+        manager.getMetadataManager().getStore().write(writeBatch);
+      }
+      return deletedCount;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/df0d61e3/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
index a512d7b..83363e7 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.utils.BackgroundService;
 
 import java.io.IOException;
 import java.util.List;
@@ -144,16 +145,6 @@ public interface KeyManager {
   List<BlockGroup> getPendingDeletionKeys(int count) throws IOException;
 
   /**
-   * Deletes a pending deletion key by its name. This is often called when
-   * key can be safely deleted from this layer. Once called, all footprints
-   * of the key will be purged from OM DB.
-   *
-   * @param objectKeyName object key name with #deleting# prefix.
-   * @throws IOException if specified key doesn't exist or other I/O errors.
-   */
-  void deletePendingDeletionKey(String objectKeyName) throws IOException;
-
-  /**
    * Returns a list of all still open key info. Which contains the info about
    * the key name and all its associated block IDs. A pending open key has
    * prefix #open# in OM DB.
@@ -172,4 +163,17 @@ public interface KeyManager {
    * @throws IOException if specified key doesn't exist or other I/O errors.
    */
   void deleteExpiredOpenKey(String objectKeyName) throws IOException;
+
+  /**
+   * Returns the metadataManager.
+   * @return OMMetadataManager.
+   */
+  OMMetadataManager getMetadataManager();
+
+  /**
+   * Returns the instance of Deleting Service.
+   * @return Background service.
+   */
+  BackgroundService getDeletingService();
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/df0d61e3/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index d585523..06d2587 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo;
 import org.apache.hadoop.util.Time;
+import org.apache.hadoop.utils.BackgroundService;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.WriteBatch;
 import org.slf4j.Logger;
@@ -43,9 +44,14 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
@@ -69,6 +75,8 @@ public class KeyManagerImpl implements KeyManager {
   private final long preallocateMax;
   private final String omId;
 
+  private final BackgroundService keyDeletingService;
+
   public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
       OMMetadataManager metadataManager,
       OzoneConfiguration conf,
@@ -82,15 +90,28 @@ public class KeyManagerImpl implements KeyManager {
     this.preallocateMax = conf.getLong(
         OZONE_KEY_PREALLOCATION_MAXSIZE,
         OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT);
+    long blockDeleteInterval = conf.getTimeDuration(
+        OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
+        OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    long serviceTimeout = conf.getTimeDuration(
+        OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
+        OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    keyDeletingService = new KeyDeletingService(
+        scmBlockClient, this, blockDeleteInterval, serviceTimeout, conf);
+
     this.omId = omId;
   }
 
   @Override
   public void start() {
+    keyDeletingService.start();
   }
 
   @Override
   public void stop() throws IOException {
+    keyDeletingService.shutdown();
   }
 
   private void validateBucket(String volumeName, String bucketName)
@@ -460,14 +481,7 @@ public class KeyManagerImpl implements KeyManager {
   @Override
   public List<BlockGroup> getPendingDeletionKeys(final int count)
       throws IOException {
-    //TODO: Fix this in later patches.
-    return null;
-  }
-
-  @Override
-  public void deletePendingDeletionKey(String objectKeyName)
-      throws IOException {
-    // TODO : Fix in later patches.
+    return  metadataManager.getPendingDeletionKeys(count);
   }
 
   @Override
@@ -485,4 +499,14 @@ public class KeyManagerImpl implements KeyManager {
     Preconditions.checkNotNull(objectKeyName);
     // TODO: Fix this in later patches.
   }
+
+  @Override
+  public OMMetadataManager getMetadataManager() {
+    return metadataManager;
+  }
+
+  @Override
+  public BackgroundService getDeletingService() {
+    return keyDeletingService;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/df0d61e3/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index 151fddf..16625dc 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -273,7 +273,7 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
 
   @Override
   public byte[] getOpenKeyBytes(String volume, String bucket,
-                                    String key, long id) {
+      String key, long id) {
     String openKey = OM_KEY_PREFIX + volume + OM_KEY_PREFIX + bucket +
         OM_KEY_PREFIX + key + OM_KEY_PREFIX + id;
     return DFSUtil.string2Bytes(openKey);
@@ -573,27 +573,37 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
   }
 
   @Override
-  public List<BlockGroup> getPendingDeletionKeys(final int count)
+  public List<BlockGroup> getPendingDeletionKeys(final int keyCount)
       throws IOException {
     List<BlockGroup> keyBlocksList = Lists.newArrayList();
-    // TODO: Fix this later, Not part of this patch.
-    List<Map.Entry<byte[], byte[]>> rangeResult = Collections.emptyList();
-    for (Map.Entry<byte[], byte[]> entry : rangeResult) {
-      OmKeyInfo info =
-          OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(entry.getValue()));
-      // Get block keys as a list.
-      OmKeyLocationInfoGroup latest = info.getLatestVersionLocations();
-      if (latest == null) {
-        return Collections.emptyList();
+    try (TableIterator<Table.KeyValue> keyIter = getDeletedTable().iterator()) {
+      int currentCount = 0;
+      while (keyIter.hasNext() && currentCount < keyCount) {
+        Table.KeyValue kv = keyIter.next();
+        if (kv != null) {
+          OmKeyInfo info =
+              OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(kv.getValue()));
+          // Get block keys as a list.
+          OmKeyLocationInfoGroup latest = info.getLatestVersionLocations();
+          if (latest == null) {
+            // This means that we have a key without any blocks.
+            // BUG-BUG: if this happens the key will never be deleted.
+            // TODO: Right thing to do is to remove this key right here.
+            LOG.warn("Found a key without blocks: {}, skipping for now.",
+                DFSUtil.bytes2String(kv.getKey()));
+            continue;
+          }
+          List<BlockID> item = latest.getLocationList().stream()
+              .map(b -> new BlockID(b.getContainerID(), b.getLocalID()))
+              .collect(Collectors.toList());
+          BlockGroup keyBlocks = BlockGroup.newBuilder()
+              .setKeyName(DFSUtil.bytes2String(kv.getKey()))
+              .addAllBlockIDs(item)
+              .build();
+          keyBlocksList.add(keyBlocks);
+          currentCount++;
+        }
       }
-      List<BlockID> item = latest.getLocationList().stream()
-          .map(b -> new BlockID(b.getContainerID(), b.getLocalID()))
-          .collect(Collectors.toList());
-      BlockGroup keyBlocks = BlockGroup.newBuilder()
-          .setKeyName(DFSUtil.bytes2String(entry.getKey()))
-          .addAllBlockIDs(item)
-          .build();
-      keyBlocksList.add(keyBlocks);
     }
     return keyBlocksList;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/df0d61e3/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestIngClient.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestIngClient.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestIngClient.java
new file mode 100644
index 0000000..2da60de
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestIngClient.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmInfo;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
+import org.apache.hadoop.hdds.scm.container.common.helpers.DeleteBlockResult;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.hadoop.hdds.protocol.proto
+    .ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result;
+import static org.apache.hadoop.hdds.protocol.proto
+    .ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result.success;
+import static org.apache.hadoop.hdds.protocol.proto
+    .ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result.unknownFailure;
+
+/**
+ * This is a testing client that allows us to intercept calls from OzoneManager
+ * to SCM.
+ * <p>
+ * TODO: OzoneManager#getScmBlockClient -- so that we can load this class up via
+ * config setting into OzoneManager. Right now, we just pass this to
+ * KeyDeletingService only.
+ * <p>
+ * TODO: Move this class to a generic test utils so we can use this class in
+ * other Ozone Manager tests.
+ */
+public class ScmBlockLocationTestIngClient implements ScmBlockLocationProtocol {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ScmBlockLocationTestIngClient.class);
+  private final String clusterID;
+  private final String scmId;
+
+  // 0 means no calls will fail, +1 means all calls will fail, +2 means every
+  // second call will fail, +3 means every third and so on.
+  private final int failCallsFrequency;
+  private int currentCall = 0;
+
+  /**
+   * If ClusterID or SCMID is blank a per instance ID is generated.
+   *
+   * @param clusterID - String or blank.
+   * @param scmId - String or Blank.
+   * @param failCallsFrequency - Set to 0 for no failures, 1 for always to fail,
+   * a positive number for that frequency of failure.
+   */
+  public ScmBlockLocationTestIngClient(String clusterID, String scmId,
+      int failCallsFrequency) {
+    this.clusterID = StringUtils.isNotBlank(clusterID) ? clusterID :
+        UUID.randomUUID().toString();
+    this.scmId = StringUtils.isNotBlank(scmId) ? scmId :
+        UUID.randomUUID().toString();
+    this.failCallsFrequency = Math.abs(failCallsFrequency);
+    switch (this.failCallsFrequency) {
+    case 0:
+      LOG.debug("Set to no failure mode, all delete block calls will " +
+          "succeed.");
+      break;
+    case 1:
+      LOG.debug("Set to all failure mode. All delete block calls to SCM" +
+          " will fail.");
+      break;
+    default:
+      LOG.debug("Set to Mix mode, every {} -th call will fail",
+          this.failCallsFrequency);
+    }
+
+  }
+
+  /**
+   * Returns Fake blocks to the KeyManager so we get blocks in the Database.
+   * @param size - size of the block.
+   * @param type Replication Type
+   * @param factor - Replication factor
+   * @param owner - String owner.
+   * @return
+   * @throws IOException
+   */
+  @Override
+  public AllocatedBlock allocateBlock(long size,
+      HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor,
+      String owner) throws IOException {
+    DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
+    Pipeline pipeline = createPipeline(datanodeDetails);
+    long containerID = Time.monotonicNow();
+    long localID = Time.monotonicNow();
+    AllocatedBlock.Builder abb =
+        new AllocatedBlock.Builder()
+            .setBlockID(new BlockID(containerID, localID))
+            .setPipeline(pipeline)
+            .setShouldCreateContainer(false);
+    return abb.build();
+  }
+
+  private Pipeline createPipeline(DatanodeDetails datanode) {
+    final Pipeline pipeline =
+        new Pipeline(datanode.getUuidString(), HddsProtos.LifeCycleState.OPEN,
+            HddsProtos.ReplicationType.STAND_ALONE,
+            HddsProtos.ReplicationFactor.ONE,
+            PipelineID.randomId());
+    pipeline.addMember(datanode);
+    return pipeline;
+  }
+
+  @Override
+  public List<DeleteBlockGroupResult> deleteKeyBlocks(
+      List<BlockGroup> keyBlocksInfoList) throws IOException {
+    List<DeleteBlockGroupResult> results = new ArrayList<>();
+    List<DeleteBlockResult> blockResultList = new ArrayList<>();
+    Result result;
+    for (BlockGroup keyBlocks : keyBlocksInfoList) {
+      for (BlockID blockKey : keyBlocks.getBlockIDList()) {
+        currentCall++;
+        switch (this.failCallsFrequency) {
+        case 0:
+          result = success;
+          break;
+        case 1:
+          result = unknownFailure;
+          break;
+        default:
+          if (currentCall % this.failCallsFrequency == 0) {
+            result = unknownFailure;
+          } else {
+            result = success;
+          }
+        }
+        blockResultList.add(new DeleteBlockResult(blockKey, result));
+      }
+      results.add(new DeleteBlockGroupResult(keyBlocks.getGroupID(),
+          blockResultList));
+    }
+    return results;
+  }
+
+  @Override
+  public ScmInfo getScmInfo() throws IOException {
+    ScmInfo.Builder builder =
+        new ScmInfo.Builder()
+            .setClusterId(clusterID)
+            .setScmId(scmId);
+    return builder.build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/df0d61e3/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java
new file mode 100644
index 0000000..44e3bdf
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.utils.db.DBConfigFromFile;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys
+    .HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
+
+/**
+ * Test Key Deleting Service.
+ * <p>
+ * This test does the following things.
+ * <p>
+ * 1. Creates a bunch of keys. 2. Then executes delete key directly using
+ * Metadata Manager. 3. Waits for a while for the KeyDeleting Service to pick up
+ * and call into SCM. 4. Confirms that calls have been successful.
+ */
+public class TestKeyDeletingService {
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+
+  private OzoneConfiguration createConfAndInitValues() throws IOException {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    File newFolder = folder.newFolder();
+    if (!newFolder.exists()) {
+      Assert.assertTrue(newFolder.mkdirs());
+    }
+    System.setProperty(DBConfigFromFile.CONFIG_DIR, "/");
+    ServerUtils.setOzoneMetaDirPath(conf, newFolder.toString());
+    conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100,
+        TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200,
+        TimeUnit.MILLISECONDS);
+    conf.setQuietMode(false);
+
+    return conf;
+  }
+
+  /**
+   * In this test, we create a bunch of keys and delete them. Then we start the
+   * KeyDeletingService and pass a SCMClient which does not fail. We make sure
+   * that all the keys that we deleted is picked up and deleted by
+   * OzoneManager.
+   *
+   * @throws IOException - on Failure.
+   */
+
+  @Test(timeout = 30000)
+  public void checkIfDeleteServiceisDeletingKeys()
+      throws IOException, TimeoutException, InterruptedException {
+    OzoneConfiguration conf = createConfAndInitValues();
+    OmMetadataManagerImpl metaMgr = new OmMetadataManagerImpl(conf);
+    KeyManager keyManager =
+        new KeyManagerImpl(
+            new ScmBlockLocationTestIngClient(null, null, 0),
+            metaMgr, conf, UUID.randomUUID().toString());
+    final int keyCount = 100;
+    createAndDeleteKeys(keyManager, keyCount);
+    KeyDeletingService keyDeletingService =
+        (KeyDeletingService) keyManager.getDeletingService();
+    keyManager.start();
+    GenericTestUtils.waitFor(
+        () -> keyDeletingService.getDeletedKeyCount().get() >= keyCount,
+        1000, 10000);
+    Assert.assertTrue(keyDeletingService.getRunCount().get() > 1);
+  }
+
+  @Test(timeout = 30000)
+  public void checkIfDeleteServiceWithFailingSCM()
+      throws IOException, TimeoutException, InterruptedException {
+    OzoneConfiguration conf = createConfAndInitValues();
+    OmMetadataManagerImpl metaMgr = new OmMetadataManagerImpl(conf);
+    //failCallsFrequency = 1 , means all calls fail.
+    KeyManager keyManager =
+        new KeyManagerImpl(
+            new ScmBlockLocationTestIngClient(null, null, 1),
+            metaMgr, conf, UUID.randomUUID().toString());
+    final int keyCount = 100;
+    createAndDeleteKeys(keyManager, keyCount);
+    KeyDeletingService keyDeletingService =
+        (KeyDeletingService) keyManager.getDeletingService();
+    keyManager.start();
+    // Make sure that we have run the background thread 5 times more
+    GenericTestUtils.waitFor(
+        () -> keyDeletingService.getRunCount().get() >= 5,
+        100, 1000);
+    // Since SCM calls are failing, deletedKeyCount should be zero.
+    Assert.assertEquals(keyDeletingService.getDeletedKeyCount().get(), 0);
+
+
+  }
+
+  private void createAndDeleteKeys(KeyManager keyManager, int keyCount)
+      throws IOException {
+    for (int x = 0; x < keyCount; x++) {
+      String volumeName = String.format("volume%s",
+          RandomStringUtils.randomAlphanumeric(5));
+      String bucketName = String.format("bucket%s",
+          RandomStringUtils.randomAlphanumeric(5));
+      String keyName = String.format("key%s",
+          RandomStringUtils.randomAlphanumeric(5));
+      byte[] volumeBytes =
+          keyManager.getMetadataManager().getVolumeKey(volumeName);
+      byte[] bucketBytes =
+          keyManager.getMetadataManager().getBucketKey(volumeName, bucketName);
+      // cheat here, just create a volume and bucket entry so that we can
+      // create the keys, we put the same data for key and value since the
+      // system does not decode the object
+      keyManager.getMetadataManager().getVolumeTable().put(volumeBytes,
+          volumeBytes);
+
+      keyManager.getMetadataManager().getBucketTable().put(bucketBytes,
+          bucketBytes);
+
+      OmKeyArgs arg =
+          new OmKeyArgs.Builder()
+              .setVolumeName(volumeName)
+              .setBucketName(bucketName)
+              .setKeyName(keyName)
+              .build();
+      //Open, Commit and Delete the Keys in the Key Manager.
+      OpenKeySession session = keyManager.openKey(arg);
+      arg.addLocationInfo(keyManager.allocateBlock(arg, session.getId()));
+      keyManager.commitKey(arg, session.getId());
+      keyManager.deleteKey(arg);
+    }
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org