You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ww...@apache.org on 2017/08/11 10:46:23 UTC

hadoop git commit: HDFS-12196. Ozone: DeleteKey-2: Implement block deleting service to delete stale blocks at background. Contributed by Weiwei Yang.

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7240 0e32bf179 -> 482c09462


HDFS-12196. Ozone: DeleteKey-2: Implement block deleting service to delete stale blocks at background. Contributed by Weiwei Yang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/482c0946
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/482c0946
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/482c0946

Branch: refs/heads/HDFS-7240
Commit: 482c0946221a56e8020fb6ec1a1ff27ceea9ff00
Parents: 0e32bf1
Author: Weiwei Yang <ww...@apache.org>
Authored: Fri Aug 11 18:45:55 2017 +0800
Committer: Weiwei Yang <ww...@apache.org>
Committed: Fri Aug 11 18:45:55 2017 +0800

----------------------------------------------------------------------
 .../apache/hadoop/ozone/OzoneConfigKeys.java    |  18 ++
 .../common/impl/ContainerManagerImpl.java       |   2 +-
 .../background/BlockDeletingService.java        | 211 +++++++++++++
 .../statemachine/background/package-info.java   |  18 ++
 .../container/ozoneimpl/OzoneContainer.java     |  14 +
 .../apache/hadoop/utils/BackgroundService.java  | 147 +++++++++
 .../org/apache/hadoop/utils/BackgroundTask.java |  28 ++
 .../hadoop/utils/BackgroundTaskQueue.java       |  64 ++++
 .../hadoop/utils/BackgroundTaskResult.java      |  29 ++
 .../src/main/resources/ozone-default.xml        |  31 ++
 .../TestUtils/BlockDeletingServiceTestImpl.java |  99 ++++++
 .../common/TestBlockDeletingService.java        | 312 +++++++++++++++++++
 12 files changed, 972 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/482c0946/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 55b5f88..92017a0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -94,6 +94,24 @@ public final class OzoneConfigKeys {
       "ozone.client.connection.timeout.ms";
   public static final int OZONE_CLIENT_CONNECTION_TIMEOUT_MS_DEFAULT = 5000;
 
+  /**
+   * Configuration properties for Ozone Block Deleting Service.
+   */
+  public static final String OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS =
+      "ozone.block.deleting.service.interval.ms";
+  public static final int OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT
+      = 60000;
+
+  public static final String OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER =
+      "ozone.block.deleting.limit.per.task";
+  public static final int OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT
+      = 1000;
+
+  public static final String OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL
+      = "ozone.block.deleting.container.limit.per.interval";
+  public static final int
+      OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT = 10;
+
   public static final String DFS_CONTAINER_RATIS_ENABLED_KEY
       = ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY;
   public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT

http://git-wip-us.apache.org/repos/asf/hadoop/blob/482c0946/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
index b77ac55..aa6946c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
@@ -658,7 +658,7 @@ public class ContainerManagerImpl implements ContainerManager {
 
 
   @VisibleForTesting
-  ConcurrentSkipListMap<String, ContainerStatus> getContainerMap() {
+  public ConcurrentSkipListMap<String, ContainerStatus> getContainerMap() {
     return containerMap;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/482c0946/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java
new file mode 100644
index 0000000..618fa42
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.statemachine.background;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
+import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
+import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.utils.BackgroundService;
+import org.apache.hadoop.utils.BackgroundTaskResult;
+import org.apache.hadoop.utils.BackgroundTaskQueue;
+import org.apache.hadoop.utils.BackgroundTask;
+import org.apache.hadoop.utils.BatchOperation;
+import org.apache.hadoop.utils.MetadataStore;
+import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT;
+
+/**
+ * A per-datanode container block deleting service takes in charge
+ * of deleting staled ozone blocks.
+ */
+public class BlockDeletingService extends BackgroundService{
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(BlockDeletingService.class);
+
+  private final ContainerManager containerManager;
+  private final Configuration conf;
+
+  // Throttle number of blocks to delete per task,
+  // set to 1 for testing
+  private final int blockLimitPerTask;
+
+  // Throttle the number of containers to process concurrently at a time,
+  private final int containerLimitPerInterval;
+
+  // Task priority is useful when a to-delete block has weight.
+  private final static int TASK_PRIORITY_DEFAULT = 1;
+  // Core pool size for container tasks
+  private final static int BLOCK_DELETING_SERVICE_CORE_POOL_SIZE = 10;
+
+  public BlockDeletingService(ContainerManager containerManager,
+      int serviceInterval, Configuration conf) {
+    super("BlockDeletingService", serviceInterval,
+        TimeUnit.MILLISECONDS, BLOCK_DELETING_SERVICE_CORE_POOL_SIZE);
+    this.containerManager = containerManager;
+    this.conf = conf;
+    this.blockLimitPerTask = conf.getInt(
+        OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER,
+        OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT);
+    this.containerLimitPerInterval = conf.getInt(
+        OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL,
+        OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT);
+  }
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    BackgroundTaskQueue queue = new BackgroundTaskQueue();
+    List<ContainerData> containers = Lists.newArrayList();
+    try {
+      // We at most list a number of containers a time,
+      // in case there are too many containers and start too many workers.
+      // We must ensure there is no empty container in this result.
+      containerManager.listContainer(null, containerLimitPerInterval,
+          null, containers);
+
+      // TODO
+      // in case we always fetch a few same containers,
+      // should we list some more containers a time and shuffle them?
+      for(ContainerData container : containers) {
+        BlockDeletingTask containerTask =
+            new BlockDeletingTask(container, TASK_PRIORITY_DEFAULT);
+        queue.add(containerTask);
+      }
+    } catch (StorageContainerException e) {
+      LOG.warn("Failed to initiate block deleting tasks, "
+          + "caused by unable to get containers info. "
+          + "Retry in next interval. ", e);
+    }
+    return queue;
+  }
+
+  private static class ContainerBackgroundTaskResult
+      implements BackgroundTaskResult {
+    private List<String> deletedBlockIds;
+
+    ContainerBackgroundTaskResult() {
+      deletedBlockIds = new LinkedList<>();
+    }
+
+    public void addBlockId(String blockId) {
+      deletedBlockIds.add(blockId);
+    }
+
+    public void addAll(List<String> blockIds) {
+      deletedBlockIds.addAll(blockIds);
+    }
+
+    public List<String> getDeletedBlocks() {
+      return deletedBlockIds;
+    }
+
+    @Override
+    public int getSize() {
+      return deletedBlockIds.size();
+    }
+  }
+
+  private class BlockDeletingTask
+      implements BackgroundTask<BackgroundTaskResult> {
+
+    private final int priority;
+    private final ContainerData containerData;
+
+    BlockDeletingTask(ContainerData containerName, int priority) {
+      this.priority = priority;
+      this.containerData = containerName;
+    }
+
+    @Override
+    public BackgroundTaskResult call() throws Exception {
+      // Scan container's db and get list of under deletion blocks
+      MetadataStore meta = KeyUtils.getDB(containerData, conf);
+      // # of blocks to delete is throttled
+      KeyPrefixFilter filter = new KeyPrefixFilter(
+          OzoneConsts.DELETING_KEY_PREFIX);
+      List<Map.Entry<byte[], byte[]>> toDeleteBlocks =
+          meta.getRangeKVs(null, blockLimitPerTask, filter);
+      if (toDeleteBlocks.isEmpty()) {
+        LOG.info("No under deletion block found in container : {}",
+            containerData.getContainerName());
+      }
+
+      List<String> succeedBlocks = new LinkedList<>();
+      LOG.info("Container : {}, To-Delete blocks : {}",
+          containerData.getContainerName(), toDeleteBlocks.size());
+      toDeleteBlocks.forEach(entry -> {
+        String blockName = DFSUtil.bytes2String(entry.getKey());
+        LOG.info("Deleting block {}", blockName);
+        try {
+          ContainerProtos.KeyData data =
+              ContainerProtos.KeyData.parseFrom(entry.getValue());
+
+          for (ContainerProtos.ChunkInfo chunkInfo : data.getChunksList()) {
+            File chunkFile = new File(chunkInfo.getChunkName());
+            if (FileUtils.deleteQuietly(chunkFile)) {
+              LOG.info("block {} chunk {} deleted", blockName,
+                  chunkFile.getAbsolutePath());
+            }
+          }
+          succeedBlocks.add(blockName);
+        } catch (InvalidProtocolBufferException e) {
+          LOG.error("Failed to parse block info for block {}", blockName, e);
+        }
+      });
+
+      // Once files are deleted ... clean up DB
+      BatchOperation batch = new BatchOperation();
+      succeedBlocks.forEach(entry ->
+          batch.delete(DFSUtil.string2Bytes(entry)));
+      meta.writeBatch(batch);
+
+      ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult();
+      crr.addAll(succeedBlocks);
+      return crr;
+    }
+
+    @Override
+    public int getPriority() {
+      return priority;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/482c0946/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/package-info.java
new file mode 100644
index 0000000..a9e202e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.background;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/482c0946/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index f7caf5a..792c132 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.ozone.container.common.interfaces.ChunkManager;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
 import org.apache.hadoop.ozone.container.common.interfaces.KeyManager;
+import org.apache.hadoop.ozone.container.common.statemachine.background.BlockDeletingService;
 import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer;
 import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
 import org.apache.hadoop.ozone.protocol.proto
@@ -45,6 +46,10 @@ import java.util.List;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
 import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_ROOT_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT;
 
 /**
  * Ozone main class sets up the network server and initializes the container
@@ -60,6 +65,7 @@ public class OzoneContainer {
   private final XceiverServerSpi server;
   private final ChunkManager chunkManager;
   private final KeyManager keyManager;
+  private final BlockDeletingService blockDeletingService;
 
   /**
    * Creates a network endpoint and enables Ozone container.
@@ -90,6 +96,12 @@ public class OzoneContainer {
     this.keyManager = new KeyManagerImpl(manager, ozoneConfig);
     manager.setKeyManager(this.keyManager);
 
+    int svcInterval = ozoneConfig.getInt(
+        OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS,
+        OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT);
+    this.blockDeletingService = new BlockDeletingService(manager,
+        svcInterval, ozoneConfig);
+
     this.dispatcher = new Dispatcher(manager, this.ozoneConfig);
 
     final boolean useRatis = ozoneConfig.getBoolean(
@@ -107,6 +119,7 @@ public class OzoneContainer {
    */
   public void start() throws IOException {
     server.start();
+    blockDeletingService.start();
     dispatcher.init();
   }
 
@@ -152,6 +165,7 @@ public class OzoneContainer {
       this.chunkManager.shutdown();
       this.keyManager.shutdown();
       this.manager.shutdown();
+      this.blockDeletingService.shutdown();
       LOG.info("container services shutdown complete.");
     } catch (IOException ex) {
       LOG.warn("container service shutdown error:", ex);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/482c0946/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundService.java
new file mode 100644
index 0000000..b057533
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundService.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * An abstract class for a background service in ozone.
+ * A background service schedules multiple child tasks in parallel
+ * in a certain period. In each interval, it waits until all the tasks
+ * finish execution and then schedule next interval.
+ */
+public abstract class BackgroundService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(BackgroundService.class);
+
+  // Executor to launch child tasks
+  private final ScheduledExecutorService exec;
+  private final ThreadGroup threadGroup;
+  private final ThreadFactory threadFactory;
+  private final String serviceName;
+  private final int interval;
+  private final TimeUnit unit;
+
+  public BackgroundService(String serviceName, int interval,
+      TimeUnit unit, int threadPoolSize) {
+    this.interval = interval;
+    this.unit = unit;
+    this.serviceName = serviceName;
+    threadGroup = new ThreadGroup(serviceName);
+    ThreadFactory tf = r -> new Thread(threadGroup, r);
+    threadFactory = new ThreadFactoryBuilder()
+        .setThreadFactory(tf)
+        .setDaemon(true)
+        .setNameFormat(serviceName + "#%d")
+        .build();
+    exec = Executors.newScheduledThreadPool(threadPoolSize, threadFactory);
+  }
+
+  protected ExecutorService getExecutorService() {
+    return this.exec;
+  }
+
+  @VisibleForTesting
+  public int getThreadCount() {
+    return threadGroup.activeCount();
+  }
+
+
+  // start service
+  public void start() {
+    exec.scheduleWithFixedDelay(new PeriodicalTask(), 0, interval, unit);
+  }
+
+  public abstract BackgroundTaskQueue getTasks();
+
+  /**
+   * Run one or more background tasks concurrently.
+   * Wait until all tasks to return the result.
+   */
+  public class PeriodicalTask implements Runnable {
+    @Override
+    public void run() {
+      LOG.info("Running background service : {}", serviceName);
+      BackgroundTaskQueue tasks = getTasks();
+      if (tasks.isEmpty()) {
+        // No task found, or some problems to init tasks
+        // return and retry in next interval.
+        return;
+      }
+
+      LOG.info("Number of background tasks to execute : {}", tasks.size());
+      CompletionService<BackgroundTaskResult> taskCompletionService =
+          new ExecutorCompletionService<>(exec);
+
+      List<Future<BackgroundTaskResult>> results = Lists.newArrayList();
+      while (tasks.size() > 0) {
+        BackgroundTask task = tasks.poll();
+        Future<BackgroundTaskResult> result =
+            taskCompletionService.submit(task);
+        results.add(result);
+      }
+
+      results.parallelStream().forEach(taskResultFuture -> {
+        try {
+          // Collect task results
+          // TODO timeout in case task hangs
+          BackgroundTaskResult result = taskResultFuture.get();
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("task execution result size {}", result.getSize());
+          }
+        } catch (InterruptedException | ExecutionException e) {
+          LOG.warn(
+              "Background task fails to execute, "
+                  + "retrying in next interval", e);
+        }
+      });
+    }
+  }
+
+  // shutdown and make sure all threads are properly released.
+  public void shutdown() {
+    LOG.info("Shutting down service {}", this.serviceName);
+    exec.shutdown();
+    try {
+      if (!exec.awaitTermination(60, TimeUnit.SECONDS)) {
+        exec.shutdownNow();
+      }
+    } catch (InterruptedException e) {
+      exec.shutdownNow();
+    }
+    if (threadGroup.activeCount() == 0 && !threadGroup.isDestroyed()) {
+      threadGroup.destroy();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/482c0946/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTask.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTask.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTask.java
new file mode 100644
index 0000000..47e8ebc
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTask.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.utils;
+
+import java.util.concurrent.Callable;
+
+/**
+ * A task thread to run by {@link BackgroundService}.
+ */
+public interface BackgroundTask<T> extends Callable<T> {
+
+  int getPriority();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/482c0946/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTaskQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTaskQueue.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTaskQueue.java
new file mode 100644
index 0000000..b56ef0c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTaskQueue.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.utils;
+
+import java.util.PriorityQueue;
+
+/**
+ * A priority queue that stores a number of {@link BackgroundTask}.
+ */
+public class BackgroundTaskQueue {
+
+  private final PriorityQueue<BackgroundTask> tasks;
+
+  public BackgroundTaskQueue() {
+    tasks = new PriorityQueue<>((task1, task2)
+        -> task1.getPriority() - task2.getPriority());
+  }
+
+  /**
+   * @return the head task in this queue.
+   */
+  public synchronized BackgroundTask poll() {
+    return tasks.poll();
+  }
+
+  /**
+   * Add a {@link BackgroundTask} to the queue,
+   * the task will be sorted by its priority.
+   *
+   * @param task
+   */
+  public synchronized void add(BackgroundTask task) {
+    tasks.add(task);
+  }
+
+  /**
+   * @return true if the queue contains no task, false otherwise.
+   */
+  public synchronized boolean isEmpty() {
+    return tasks.isEmpty();
+  }
+
+  /**
+   * @return the size of the queue.
+   */
+  public synchronized int size() {
+    return tasks.size();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/482c0946/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTaskResult.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTaskResult.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTaskResult.java
new file mode 100644
index 0000000..b37a5db
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTaskResult.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.utils;
+
+/**
+ * Result of a {@link BackgroundTask}.
+ */
+public interface BackgroundTaskResult {
+
+  /**
+   *   Returns the size of entries included in this result.
+   */
+  int getSize();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/482c0946/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
index be9a48a..17a127d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
@@ -360,6 +360,37 @@
   </property>
 
   <property>
+    <name>ozone.block.deleting.service.interval.ms</name>
+    <value>60000</value>
+    <description>
+      Time interval in milliseconds of the block deleting service.
+      The block deleting service runs on each datanode to scan staled
+      blocks and delete them asynchronously.
+    </description>
+  </property>
+
+  <property>
+    <name>ozone.block.deleting.limit.per.task</name>
+    <value>1000</value>
+    <description>
+      Maximum number of blocks to be deleted by block deleting service
+      per time interval. This property is used to throttle the actual number
+      of block deletions on a datanode per container.
+    </description>
+  </property>
+
+  <property>
+    <name>ozone.block.deleting.container.limit.per.interval</name>
+    <value>10</value>
+    <description>
+      Maximum number of containers to be scanned by block deleting service
+      per time interval. The block deleting service spawns a thread to handle
+      block deletions in a container. This property is used to throttle
+      the number of threads spawned for block deletions.
+    </description>
+  </property>
+
+  <property>
     <name>dfs.container.ipc</name>
     <value>50011</value>
     <description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/482c0946/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/BlockDeletingServiceTestImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/BlockDeletingServiceTestImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/BlockDeletingServiceTestImpl.java
new file mode 100644
index 0000000..0fde964
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/BlockDeletingServiceTestImpl.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.TestUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
+import org.apache.hadoop.ozone.container.common.statemachine.background.BlockDeletingService;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A test class implementation for {@link BlockDeletingService}.
+ */
+public class BlockDeletingServiceTestImpl
+    extends BlockDeletingService {
+
+  // tests only
+  private CountDownLatch latch;
+  private Thread testingThread;
+  private AtomicInteger numOfProcessed = new AtomicInteger(0);
+
+  public BlockDeletingServiceTestImpl(ContainerManager containerManager,
+      int serviceInterval, Configuration conf) {
+    super(containerManager, serviceInterval, conf);
+  }
+
+  @VisibleForTesting
+  public void runDeletingTasks() {
+    if (latch.getCount() > 0) {
+      this.latch.countDown();
+    } else {
+      throw new IllegalStateException("Count already reaches zero");
+    }
+  }
+
+  @VisibleForTesting
+  public boolean isStarted() {
+    return latch != null && testingThread.isAlive();
+  }
+
+  public int getTimesOfProcessed() {
+    return numOfProcessed.get();
+  }
+
+  // Override the implementation to start a single on-call control thread.
+  @Override public void start() {
+    PeriodicalTask svc = new PeriodicalTask();
+    // In test mode, relies on a latch countdown to runDeletingTasks tasks.
+    Runnable r = () -> {
+      while (true) {
+        latch = new CountDownLatch(1);
+        try {
+          latch.await();
+        } catch (InterruptedException e) {
+          break;
+        }
+        Future<?> future = this.getExecutorService().submit(svc);
+        try {
+          // for tests, we only wait for 3s for completion
+          future.get(3, TimeUnit.SECONDS);
+          numOfProcessed.incrementAndGet();
+        } catch (Exception e) {
+          return;
+        }
+      }
+    };
+
+    testingThread = new ThreadFactoryBuilder()
+        .setDaemon(true)
+        .build()
+        .newThread(r);
+    testingThread.start();
+  }
+
+  @Override
+  public void shutdown() {
+    testingThread.interrupt();
+    super.shutdown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/482c0946/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
new file mode 100644
index 0000000..2d4b5b2
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
@@ -0,0 +1,312 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.TestUtils.BlockDeletingServiceTestImpl;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
+import org.apache.hadoop.ozone.container.common.helpers.KeyData;
+import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
+import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.utils.MetadataKeyFilters;
+import org.apache.hadoop.utils.MetadataStore;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.BeforeClass;
+import org.junit.Before;
+import org.junit.After;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL;
+import static org.apache.hadoop.ozone.container
+    .ContainerTestHelper.createSingleNodePipeline;
+
+/**
+ * Tests to test block deleting service.
+ */
+public class TestBlockDeletingService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestBlockDeletingService.class);
+
+  private static File testRoot;
+  private static File containersDir;
+  private static File chunksDir;
+
+  @BeforeClass
+  public static void init() {
+    testRoot = GenericTestUtils
+        .getTestDir(TestBlockDeletingService.class.getSimpleName());
+    chunksDir = new File(testRoot, "chunks");
+    containersDir = new File(testRoot, "containers");
+  }
+
+  @Before
+  public void setup() throws IOException {
+    if (chunksDir.exists()) {
+      FileUtils.deleteDirectory(chunksDir);
+    }
+  }
+
+  @After
+  public void cleanup() throws IOException {
+    FileUtils.deleteDirectory(chunksDir);
+    FileUtils.deleteDirectory(containersDir);
+    FileUtils.deleteDirectory(testRoot);
+  }
+
+  private ContainerManager createContainerManager(Configuration conf)
+      throws Exception {
+    conf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT,
+        containersDir.getAbsolutePath());
+    if (containersDir.exists()) {
+      FileUtils.deleteDirectory(containersDir);
+    }
+    ContainerManager containerManager = new ContainerManagerImpl();
+    List<StorageLocation> pathLists = new LinkedList<>();
+    pathLists.add(StorageLocation.parse(containersDir.getAbsolutePath()));
+    containerManager.init(conf, pathLists);
+    return containerManager;
+  }
+
+  /**
+   * A helper method to create some blocks and put them under deletion
+   * state for testing. This method directly updates container.db and
+   * creates some fake chunk files for testing.
+   */
+  private void createToDeleteBlocks(ContainerManager mgr,
+      Configuration conf, int numOfContainers, int numOfBlocksPerContainer,
+      int numOfChunksPerBlock, File chunkDir) throws IOException {
+    for (int x = 0; x < numOfContainers; x++) {
+      String containerName = OzoneUtils.getRequestID();
+      ContainerData data = new ContainerData(containerName);
+      mgr.createContainer(createSingleNodePipeline(containerName), data);
+      data = mgr.readContainer(containerName);
+      MetadataStore metadata = KeyUtils.getDB(data, conf);
+      for (int j = 0; j<numOfBlocksPerContainer; j++) {
+        String blockName = containerName + "b" + j;
+        String deleteStateName = OzoneConsts.DELETING_KEY_PREFIX + blockName;
+        KeyData kd = new KeyData(containerName, deleteStateName);
+        List<ContainerProtos.ChunkInfo> chunks = Lists.newArrayList();
+        for (int k = 0; k<numOfChunksPerBlock; k++) {
+          // offset doesn't matter here
+          String chunkName = blockName + "_chunk_" + k;
+          File chunk = new File(chunkDir, chunkName);
+          FileUtils.writeStringToFile(chunk, "a chunk");
+          LOG.info("Creating file {}", chunk.getAbsolutePath());
+          // make sure file exists
+          Assert.assertTrue(chunk.isFile() && chunk.exists());
+          ContainerProtos.ChunkInfo info =
+              ContainerProtos.ChunkInfo.newBuilder()
+                  .setChunkName(chunk.getAbsolutePath())
+                  .setLen(0)
+                  .setOffset(0)
+                  .setChecksum("")
+                  .build();
+          chunks.add(info);
+        }
+        kd.setChunks(chunks);
+        metadata.put(DFSUtil.string2Bytes(deleteStateName),
+            kd.getProtoBufMessage().toByteArray());
+      }
+    }
+  }
+
+  /**
+   *  Run service runDeletingTasks and wait for it's been processed.
+   */
+  private void deleteAndWait(BlockDeletingServiceTestImpl service,
+      int timesOfProcessed) throws TimeoutException, InterruptedException {
+    service.runDeletingTasks();
+    GenericTestUtils.waitFor(()
+        -> service.getTimesOfProcessed() == timesOfProcessed, 100, 3000);
+  }
+
+  /**
+   * Get under deletion blocks count from DB,
+   * note this info is parsed from container.db.
+   */
+  private int getUnderDeletionBlocksCount(MetadataStore meta)
+      throws IOException {
+    List<Map.Entry<byte[], byte[]>> underDeletionBlocks =
+        meta.getRangeKVs(null, 100, new MetadataKeyFilters.KeyPrefixFilter(
+            OzoneConsts.DELETING_KEY_PREFIX));
+    return underDeletionBlocks.size();
+  }
+
+  @Test
+  public void testBlockDeletion() throws Exception {
+    Configuration conf = new OzoneConfiguration();
+    conf.setInt(OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, 10);
+    conf.setInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, 2);
+    ContainerManager containerManager = createContainerManager(conf);
+    createToDeleteBlocks(containerManager, conf, 1, 3, 1, chunksDir);
+
+    BlockDeletingServiceTestImpl svc =
+        new BlockDeletingServiceTestImpl(containerManager, 1000, conf);
+    svc.start();
+    GenericTestUtils.waitFor(() -> svc.isStarted(), 100, 3000);
+
+    // Ensure 1 container was created
+    List<ContainerData> containerData = Lists.newArrayList();
+    containerManager.listContainer(null, 1, "", containerData);
+    Assert.assertEquals(1, containerData.size());
+    MetadataStore meta = KeyUtils.getDB(containerData.get(0), conf);
+
+    // Ensure there is 100 blocks under deletion
+    Assert.assertEquals(3, getUnderDeletionBlocksCount(meta));
+
+    // An interval will delete 1 * 2 blocks
+    deleteAndWait(svc, 1);
+    Assert.assertEquals(1, getUnderDeletionBlocksCount(meta));
+
+    deleteAndWait(svc, 2);
+    Assert.assertEquals(0, getUnderDeletionBlocksCount(meta));
+
+    deleteAndWait(svc, 3);
+    Assert.assertEquals(0, getUnderDeletionBlocksCount(meta));
+
+    svc.shutdown();
+  }
+
+  @Test
+  public void testShutdownService() throws Exception {
+    Configuration conf = new OzoneConfiguration();
+    conf.setInt(OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS, 500);
+    conf.setInt(OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, 10);
+    conf.setInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, 10);
+    ContainerManager containerManager = createContainerManager(conf);
+    // Create 1 container with 100 blocks
+    createToDeleteBlocks(containerManager, conf, 1, 100, 1, chunksDir);
+
+    BlockDeletingServiceTestImpl service =
+        new BlockDeletingServiceTestImpl(containerManager, 1000, conf);
+    service.start();
+    GenericTestUtils.waitFor(() -> service.isStarted(), 100, 3000);
+
+    // Run some deleting tasks and verify there are threads running
+    service.runDeletingTasks();
+    GenericTestUtils.waitFor(() -> service.getThreadCount() > 0, 100, 1000);
+
+    // Wait for 1 or 2 intervals
+    Thread.sleep(1000);
+
+    // Shutdown service and verify all threads are stopped
+    service.shutdown();
+    GenericTestUtils.waitFor(() -> service.getThreadCount() == 0, 100, 1000);
+  }
+
+  @Test(timeout = 30000)
+  public void testContainerThrottle() throws Exception {
+    // Properties :
+    //  - Number of containers : 2
+    //  - Number of blocks per container : 1
+    //  - Number of chunks per block : 10
+    //  - Container limit per interval : 1
+    //  - Block limit per container : 1
+    //
+    // Each time only 1 container can be processed, so each time
+    // 1 block from 1 container can be deleted.
+    Configuration conf = new OzoneConfiguration();
+    // Process 1 container per interval
+    conf.setInt(OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, 1);
+    conf.setInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, 1);
+    ContainerManager containerManager = createContainerManager(conf);
+    createToDeleteBlocks(containerManager, conf, 2, 1, 10, chunksDir);
+
+    BlockDeletingServiceTestImpl service =
+        new BlockDeletingServiceTestImpl(containerManager, 1000, conf);
+    service.start();
+
+    try {
+      GenericTestUtils.waitFor(() -> service.isStarted(), 100, 3000);
+      // 1st interval processes 1 container 1 block and 10 chunks
+      deleteAndWait(service, 1);
+      Assert.assertEquals(10, chunksDir.listFiles().length);
+    } finally {
+      service.shutdown();
+    }
+  }
+
+
+  @Test(timeout = 30000)
+  public void testBlockThrottle() throws Exception {
+    // Properties :
+    //  - Number of containers : 5
+    //  - Number of blocks per container : 3
+    //  - Number of chunks per block : 1
+    //  - Container limit per interval : 10
+    //  - Block limit per container : 2
+    //
+    // Each time containers can be all scanned, but only 2 blocks
+    // per container can be actually deleted. So it requires 2 waves
+    // to cleanup all blocks.
+    Configuration conf = new OzoneConfiguration();
+    conf.setInt(OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, 10);
+    conf.setInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, 2);
+    ContainerManager containerManager = createContainerManager(conf);
+    createToDeleteBlocks(containerManager, conf, 5, 3, 1, chunksDir);
+
+    // Make sure chunks are created
+    Assert.assertEquals(15, chunksDir.listFiles().length);
+
+    BlockDeletingServiceTestImpl service =
+        new BlockDeletingServiceTestImpl(containerManager, 1000, conf);
+    service.start();
+
+    try {
+      GenericTestUtils.waitFor(() -> service.isStarted(), 100, 3000);
+      // Total blocks = 3 * 5 = 15
+      // block per task = 2
+      // number of containers = 5
+      // each interval will at most runDeletingTasks 5 * 2 = 10 blocks
+      deleteAndWait(service, 1);
+      Assert.assertEquals(5, chunksDir.listFiles().length);
+
+      // There is only 5 blocks left to runDeletingTasks
+      deleteAndWait(service, 2);
+      Assert.assertEquals(0, chunksDir.listFiles().length);
+    } finally {
+      service.shutdown();
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org