You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ww...@apache.org on 2017/08/11 10:46:23 UTC
hadoop git commit: HDFS-12196. Ozone: DeleteKey-2: Implement block
deleting service to delete stale blocks at background. Contributed by Weiwei
Yang.
Repository: hadoop
Updated Branches:
refs/heads/HDFS-7240 0e32bf179 -> 482c09462
HDFS-12196. Ozone: DeleteKey-2: Implement block deleting service to delete stale blocks at background. Contributed by Weiwei Yang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/482c0946
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/482c0946
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/482c0946
Branch: refs/heads/HDFS-7240
Commit: 482c0946221a56e8020fb6ec1a1ff27ceea9ff00
Parents: 0e32bf1
Author: Weiwei Yang <ww...@apache.org>
Authored: Fri Aug 11 18:45:55 2017 +0800
Committer: Weiwei Yang <ww...@apache.org>
Committed: Fri Aug 11 18:45:55 2017 +0800
----------------------------------------------------------------------
.../apache/hadoop/ozone/OzoneConfigKeys.java | 18 ++
.../common/impl/ContainerManagerImpl.java | 2 +-
.../background/BlockDeletingService.java | 211 +++++++++++++
.../statemachine/background/package-info.java | 18 ++
.../container/ozoneimpl/OzoneContainer.java | 14 +
.../apache/hadoop/utils/BackgroundService.java | 147 +++++++++
.../org/apache/hadoop/utils/BackgroundTask.java | 28 ++
.../hadoop/utils/BackgroundTaskQueue.java | 64 ++++
.../hadoop/utils/BackgroundTaskResult.java | 29 ++
.../src/main/resources/ozone-default.xml | 31 ++
.../TestUtils/BlockDeletingServiceTestImpl.java | 99 ++++++
.../common/TestBlockDeletingService.java | 312 +++++++++++++++++++
12 files changed, 972 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/482c0946/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 55b5f88..92017a0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -94,6 +94,24 @@ public final class OzoneConfigKeys {
"ozone.client.connection.timeout.ms";
public static final int OZONE_CLIENT_CONNECTION_TIMEOUT_MS_DEFAULT = 5000;
+ /**
+ * Configuration properties for Ozone Block Deleting Service.
+ */
+ public static final String OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS =
+ "ozone.block.deleting.service.interval.ms";
+ public static final int OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT
+ = 60000;
+
+ public static final String OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER =
+ "ozone.block.deleting.limit.per.task";
+ public static final int OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT
+ = 1000;
+
+ public static final String OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL
+ = "ozone.block.deleting.container.limit.per.interval";
+ public static final int
+ OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT = 10;
+
public static final String DFS_CONTAINER_RATIS_ENABLED_KEY
= ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY;
public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/482c0946/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
index b77ac55..aa6946c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
@@ -658,7 +658,7 @@ public class ContainerManagerImpl implements ContainerManager {
@VisibleForTesting
- ConcurrentSkipListMap<String, ContainerStatus> getContainerMap() {
+ public ConcurrentSkipListMap<String, ContainerStatus> getContainerMap() {
return containerMap;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/482c0946/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java
new file mode 100644
index 0000000..618fa42
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.statemachine.background;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
+import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
+import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.utils.BackgroundService;
+import org.apache.hadoop.utils.BackgroundTaskResult;
+import org.apache.hadoop.utils.BackgroundTaskQueue;
+import org.apache.hadoop.utils.BackgroundTask;
+import org.apache.hadoop.utils.BatchOperation;
+import org.apache.hadoop.utils.MetadataStore;
+import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+ .OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+ .OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+ .OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+ .OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT;
+
+/**
+ * A per-datanode container block deleting service takes in charge
+ * of deleting staled ozone blocks.
+ */
+public class BlockDeletingService extends BackgroundService{
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(BlockDeletingService.class);
+
+ private final ContainerManager containerManager;
+ private final Configuration conf;
+
+ // Throttle number of blocks to delete per task,
+ // set to 1 for testing
+ private final int blockLimitPerTask;
+
+ // Throttle the number of containers to process concurrently at a time,
+ private final int containerLimitPerInterval;
+
+ // Task priority is useful when a to-delete block has weight.
+ private final static int TASK_PRIORITY_DEFAULT = 1;
+ // Core pool size for container tasks
+ private final static int BLOCK_DELETING_SERVICE_CORE_POOL_SIZE = 10;
+
+ public BlockDeletingService(ContainerManager containerManager,
+ int serviceInterval, Configuration conf) {
+ super("BlockDeletingService", serviceInterval,
+ TimeUnit.MILLISECONDS, BLOCK_DELETING_SERVICE_CORE_POOL_SIZE);
+ this.containerManager = containerManager;
+ this.conf = conf;
+ this.blockLimitPerTask = conf.getInt(
+ OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER,
+ OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT);
+ this.containerLimitPerInterval = conf.getInt(
+ OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL,
+ OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT);
+ }
+
+ @Override
+ public BackgroundTaskQueue getTasks() {
+ BackgroundTaskQueue queue = new BackgroundTaskQueue();
+ List<ContainerData> containers = Lists.newArrayList();
+ try {
+ // We at most list a number of containers a time,
+ // in case there are too many containers and start too many workers.
+ // We must ensure there is no empty container in this result.
+ containerManager.listContainer(null, containerLimitPerInterval,
+ null, containers);
+
+ // TODO
+ // in case we always fetch a few same containers,
+ // should we list some more containers a time and shuffle them?
+ for(ContainerData container : containers) {
+ BlockDeletingTask containerTask =
+ new BlockDeletingTask(container, TASK_PRIORITY_DEFAULT);
+ queue.add(containerTask);
+ }
+ } catch (StorageContainerException e) {
+ LOG.warn("Failed to initiate block deleting tasks, "
+ + "caused by unable to get containers info. "
+ + "Retry in next interval. ", e);
+ }
+ return queue;
+ }
+
+ private static class ContainerBackgroundTaskResult
+ implements BackgroundTaskResult {
+ private List<String> deletedBlockIds;
+
+ ContainerBackgroundTaskResult() {
+ deletedBlockIds = new LinkedList<>();
+ }
+
+ public void addBlockId(String blockId) {
+ deletedBlockIds.add(blockId);
+ }
+
+ public void addAll(List<String> blockIds) {
+ deletedBlockIds.addAll(blockIds);
+ }
+
+ public List<String> getDeletedBlocks() {
+ return deletedBlockIds;
+ }
+
+ @Override
+ public int getSize() {
+ return deletedBlockIds.size();
+ }
+ }
+
+ private class BlockDeletingTask
+ implements BackgroundTask<BackgroundTaskResult> {
+
+ private final int priority;
+ private final ContainerData containerData;
+
+ BlockDeletingTask(ContainerData containerName, int priority) {
+ this.priority = priority;
+ this.containerData = containerName;
+ }
+
+ @Override
+ public BackgroundTaskResult call() throws Exception {
+ // Scan container's db and get list of under deletion blocks
+ MetadataStore meta = KeyUtils.getDB(containerData, conf);
+ // # of blocks to delete is throttled
+ KeyPrefixFilter filter = new KeyPrefixFilter(
+ OzoneConsts.DELETING_KEY_PREFIX);
+ List<Map.Entry<byte[], byte[]>> toDeleteBlocks =
+ meta.getRangeKVs(null, blockLimitPerTask, filter);
+ if (toDeleteBlocks.isEmpty()) {
+ LOG.info("No under deletion block found in container : {}",
+ containerData.getContainerName());
+ }
+
+ List<String> succeedBlocks = new LinkedList<>();
+ LOG.info("Container : {}, To-Delete blocks : {}",
+ containerData.getContainerName(), toDeleteBlocks.size());
+ toDeleteBlocks.forEach(entry -> {
+ String blockName = DFSUtil.bytes2String(entry.getKey());
+ LOG.info("Deleting block {}", blockName);
+ try {
+ ContainerProtos.KeyData data =
+ ContainerProtos.KeyData.parseFrom(entry.getValue());
+
+ for (ContainerProtos.ChunkInfo chunkInfo : data.getChunksList()) {
+ File chunkFile = new File(chunkInfo.getChunkName());
+ if (FileUtils.deleteQuietly(chunkFile)) {
+ LOG.info("block {} chunk {} deleted", blockName,
+ chunkFile.getAbsolutePath());
+ }
+ }
+ succeedBlocks.add(blockName);
+ } catch (InvalidProtocolBufferException e) {
+ LOG.error("Failed to parse block info for block {}", blockName, e);
+ }
+ });
+
+ // Once files are deleted ... clean up DB
+ BatchOperation batch = new BatchOperation();
+ succeedBlocks.forEach(entry ->
+ batch.delete(DFSUtil.string2Bytes(entry)));
+ meta.writeBatch(batch);
+
+ ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult();
+ crr.addAll(succeedBlocks);
+ return crr;
+ }
+
+ @Override
+ public int getPriority() {
+ return priority;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/482c0946/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/package-info.java
new file mode 100644
index 0000000..a9e202e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.background;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/482c0946/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index f7caf5a..792c132 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.ozone.container.common.interfaces.ChunkManager;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
import org.apache.hadoop.ozone.container.common.interfaces.KeyManager;
+import org.apache.hadoop.ozone.container.common.statemachine.background.BlockDeletingService;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
import org.apache.hadoop.ozone.protocol.proto
@@ -45,6 +46,10 @@ import java.util.List;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_ROOT_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+ .OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+ .OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT;
/**
* Ozone main class sets up the network server and initializes the container
@@ -60,6 +65,7 @@ public class OzoneContainer {
private final XceiverServerSpi server;
private final ChunkManager chunkManager;
private final KeyManager keyManager;
+ private final BlockDeletingService blockDeletingService;
/**
* Creates a network endpoint and enables Ozone container.
@@ -90,6 +96,12 @@ public class OzoneContainer {
this.keyManager = new KeyManagerImpl(manager, ozoneConfig);
manager.setKeyManager(this.keyManager);
+ int svcInterval = ozoneConfig.getInt(
+ OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS,
+ OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT);
+ this.blockDeletingService = new BlockDeletingService(manager,
+ svcInterval, ozoneConfig);
+
this.dispatcher = new Dispatcher(manager, this.ozoneConfig);
final boolean useRatis = ozoneConfig.getBoolean(
@@ -107,6 +119,7 @@ public class OzoneContainer {
*/
public void start() throws IOException {
server.start();
+ blockDeletingService.start();
dispatcher.init();
}
@@ -152,6 +165,7 @@ public class OzoneContainer {
this.chunkManager.shutdown();
this.keyManager.shutdown();
this.manager.shutdown();
+ this.blockDeletingService.shutdown();
LOG.info("container services shutdown complete.");
} catch (IOException ex) {
LOG.warn("container service shutdown error:", ex);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/482c0946/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundService.java
new file mode 100644
index 0000000..b057533
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundService.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * An abstract class for a background service in ozone.
+ * A background service schedules multiple child tasks in parallel
+ * in a certain period. In each interval, it waits until all the tasks
+ * finish execution and then schedule next interval.
+ */
+public abstract class BackgroundService {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(BackgroundService.class);
+
+ // Executor to launch child tasks
+ private final ScheduledExecutorService exec;
+ private final ThreadGroup threadGroup;
+ private final ThreadFactory threadFactory;
+ private final String serviceName;
+ private final int interval;
+ private final TimeUnit unit;
+
+ public BackgroundService(String serviceName, int interval,
+ TimeUnit unit, int threadPoolSize) {
+ this.interval = interval;
+ this.unit = unit;
+ this.serviceName = serviceName;
+ threadGroup = new ThreadGroup(serviceName);
+ ThreadFactory tf = r -> new Thread(threadGroup, r);
+ threadFactory = new ThreadFactoryBuilder()
+ .setThreadFactory(tf)
+ .setDaemon(true)
+ .setNameFormat(serviceName + "#%d")
+ .build();
+ exec = Executors.newScheduledThreadPool(threadPoolSize, threadFactory);
+ }
+
+ protected ExecutorService getExecutorService() {
+ return this.exec;
+ }
+
+ @VisibleForTesting
+ public int getThreadCount() {
+ return threadGroup.activeCount();
+ }
+
+
+ // start service
+ public void start() {
+ exec.scheduleWithFixedDelay(new PeriodicalTask(), 0, interval, unit);
+ }
+
+ public abstract BackgroundTaskQueue getTasks();
+
+ /**
+ * Run one or more background tasks concurrently.
+ * Wait until all tasks to return the result.
+ */
+ public class PeriodicalTask implements Runnable {
+ @Override
+ public void run() {
+ LOG.info("Running background service : {}", serviceName);
+ BackgroundTaskQueue tasks = getTasks();
+ if (tasks.isEmpty()) {
+ // No task found, or some problems to init tasks
+ // return and retry in next interval.
+ return;
+ }
+
+ LOG.info("Number of background tasks to execute : {}", tasks.size());
+ CompletionService<BackgroundTaskResult> taskCompletionService =
+ new ExecutorCompletionService<>(exec);
+
+ List<Future<BackgroundTaskResult>> results = Lists.newArrayList();
+ while (tasks.size() > 0) {
+ BackgroundTask task = tasks.poll();
+ Future<BackgroundTaskResult> result =
+ taskCompletionService.submit(task);
+ results.add(result);
+ }
+
+ results.parallelStream().forEach(taskResultFuture -> {
+ try {
+ // Collect task results
+ // TODO timeout in case task hangs
+ BackgroundTaskResult result = taskResultFuture.get();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("task execution result size {}", result.getSize());
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.warn(
+ "Background task fails to execute, "
+ + "retrying in next interval", e);
+ }
+ });
+ }
+ }
+
+ // shutdown and make sure all threads are properly released.
+ public void shutdown() {
+ LOG.info("Shutting down service {}", this.serviceName);
+ exec.shutdown();
+ try {
+ if (!exec.awaitTermination(60, TimeUnit.SECONDS)) {
+ exec.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ exec.shutdownNow();
+ }
+ if (threadGroup.activeCount() == 0 && !threadGroup.isDestroyed()) {
+ threadGroup.destroy();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/482c0946/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTask.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTask.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTask.java
new file mode 100644
index 0000000..47e8ebc
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTask.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.utils;
+
+import java.util.concurrent.Callable;
+
+/**
+ * A task thread to run by {@link BackgroundService}.
+ */
+public interface BackgroundTask<T> extends Callable<T> {
+
+ int getPriority();
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/482c0946/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTaskQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTaskQueue.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTaskQueue.java
new file mode 100644
index 0000000..b56ef0c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTaskQueue.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.utils;
+
+import java.util.PriorityQueue;
+
+/**
+ * A priority queue that stores a number of {@link BackgroundTask}.
+ */
+public class BackgroundTaskQueue {
+
+ private final PriorityQueue<BackgroundTask> tasks;
+
+ public BackgroundTaskQueue() {
+ tasks = new PriorityQueue<>((task1, task2)
+ -> task1.getPriority() - task2.getPriority());
+ }
+
+ /**
+ * @return the head task in this queue.
+ */
+ public synchronized BackgroundTask poll() {
+ return tasks.poll();
+ }
+
+ /**
+ * Add a {@link BackgroundTask} to the queue,
+ * the task will be sorted by its priority.
+ *
+ * @param task
+ */
+ public synchronized void add(BackgroundTask task) {
+ tasks.add(task);
+ }
+
+ /**
+ * @return true if the queue contains no task, false otherwise.
+ */
+ public synchronized boolean isEmpty() {
+ return tasks.isEmpty();
+ }
+
+ /**
+ * @return the size of the queue.
+ */
+ public synchronized int size() {
+ return tasks.size();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/482c0946/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTaskResult.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTaskResult.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTaskResult.java
new file mode 100644
index 0000000..b37a5db
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTaskResult.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.utils;
+
+/**
+ * Result of a {@link BackgroundTask}.
+ */
+public interface BackgroundTaskResult {
+
+ /**
+ * Returns the size of entries included in this result.
+ */
+ int getSize();
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/482c0946/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
index be9a48a..17a127d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
@@ -360,6 +360,37 @@
</property>
<property>
+ <name>ozone.block.deleting.service.interval.ms</name>
+ <value>60000</value>
+ <description>
+ Time interval in milliseconds of the block deleting service.
+ The block deleting service runs on each datanode to scan staled
+ blocks and delete them asynchronously.
+ </description>
+ </property>
+
+ <property>
+ <name>ozone.block.deleting.limit.per.task</name>
+ <value>1000</value>
+ <description>
+ Maximum number of blocks to be deleted by block deleting service
+ per time interval. This property is used to throttle the actual number
+ of block deletions on a datanode per container.
+ </description>
+ </property>
+
+ <property>
+ <name>ozone.block.deleting.container.limit.per.interval</name>
+ <value>10</value>
+ <description>
+ Maximum number of containers to be scanned by block deleting service
+ per time interval. The block deleting service spawns a thread to handle
+ block deletions in a container. This property is used to throttle
+ the number of threads spawned for block deletions.
+ </description>
+ </property>
+
+ <property>
<name>dfs.container.ipc</name>
<value>50011</value>
<description>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/482c0946/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/BlockDeletingServiceTestImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/BlockDeletingServiceTestImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/BlockDeletingServiceTestImpl.java
new file mode 100644
index 0000000..0fde964
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/BlockDeletingServiceTestImpl.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.TestUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
+import org.apache.hadoop.ozone.container.common.statemachine.background.BlockDeletingService;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A test class implementation for {@link BlockDeletingService}.
+ */
+public class BlockDeletingServiceTestImpl
+ extends BlockDeletingService {
+
+ // tests only
+ private CountDownLatch latch;
+ private Thread testingThread;
+ private AtomicInteger numOfProcessed = new AtomicInteger(0);
+
+ public BlockDeletingServiceTestImpl(ContainerManager containerManager,
+ int serviceInterval, Configuration conf) {
+ super(containerManager, serviceInterval, conf);
+ }
+
+ @VisibleForTesting
+ public void runDeletingTasks() {
+ if (latch.getCount() > 0) {
+ this.latch.countDown();
+ } else {
+ throw new IllegalStateException("Count already reaches zero");
+ }
+ }
+
+ @VisibleForTesting
+ public boolean isStarted() {
+ return latch != null && testingThread.isAlive();
+ }
+
+ public int getTimesOfProcessed() {
+ return numOfProcessed.get();
+ }
+
+ // Override the implementation to start a single on-call control thread.
+ @Override public void start() {
+ PeriodicalTask svc = new PeriodicalTask();
+ // In test mode, relies on a latch countdown to runDeletingTasks tasks.
+ Runnable r = () -> {
+ while (true) {
+ latch = new CountDownLatch(1);
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ break;
+ }
+ Future<?> future = this.getExecutorService().submit(svc);
+ try {
+ // for tests, we only wait for 3s for completion
+ future.get(3, TimeUnit.SECONDS);
+ numOfProcessed.incrementAndGet();
+ } catch (Exception e) {
+ return;
+ }
+ }
+ };
+
+ testingThread = new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .build()
+ .newThread(r);
+ testingThread.start();
+ }
+
+ @Override
+ public void shutdown() {
+ testingThread.interrupt();
+ super.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/482c0946/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
new file mode 100644
index 0000000..2d4b5b2
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
@@ -0,0 +1,312 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.TestUtils.BlockDeletingServiceTestImpl;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
+import org.apache.hadoop.ozone.container.common.helpers.KeyData;
+import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
+import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.utils.MetadataKeyFilters;
+import org.apache.hadoop.utils.MetadataStore;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.BeforeClass;
+import org.junit.Before;
+import org.junit.After;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+ .OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+ .OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+ .OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL;
+import static org.apache.hadoop.ozone.container
+ .ContainerTestHelper.createSingleNodePipeline;
+
+/**
+ * Tests to test block deleting service.
+ */
+public class TestBlockDeletingService {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestBlockDeletingService.class);
+
+ private static File testRoot;
+ private static File containersDir;
+ private static File chunksDir;
+
+ @BeforeClass
+ public static void init() {
+ testRoot = GenericTestUtils
+ .getTestDir(TestBlockDeletingService.class.getSimpleName());
+ chunksDir = new File(testRoot, "chunks");
+ containersDir = new File(testRoot, "containers");
+ }
+
+ @Before
+ public void setup() throws IOException {
+ if (chunksDir.exists()) {
+ FileUtils.deleteDirectory(chunksDir);
+ }
+ }
+
+ @After
+ public void cleanup() throws IOException {
+ FileUtils.deleteDirectory(chunksDir);
+ FileUtils.deleteDirectory(containersDir);
+ FileUtils.deleteDirectory(testRoot);
+ }
+
+ private ContainerManager createContainerManager(Configuration conf)
+ throws Exception {
+ conf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT,
+ containersDir.getAbsolutePath());
+ if (containersDir.exists()) {
+ FileUtils.deleteDirectory(containersDir);
+ }
+ ContainerManager containerManager = new ContainerManagerImpl();
+ List<StorageLocation> pathLists = new LinkedList<>();
+ pathLists.add(StorageLocation.parse(containersDir.getAbsolutePath()));
+ containerManager.init(conf, pathLists);
+ return containerManager;
+ }
+
+ /**
+ * A helper method to create some blocks and put them under deletion
+ * state for testing. This method directly updates container.db and
+ * creates some fake chunk files for testing.
+ */
+ private void createToDeleteBlocks(ContainerManager mgr,
+ Configuration conf, int numOfContainers, int numOfBlocksPerContainer,
+ int numOfChunksPerBlock, File chunkDir) throws IOException {
+ for (int x = 0; x < numOfContainers; x++) {
+ String containerName = OzoneUtils.getRequestID();
+ ContainerData data = new ContainerData(containerName);
+ mgr.createContainer(createSingleNodePipeline(containerName), data);
+ data = mgr.readContainer(containerName);
+ MetadataStore metadata = KeyUtils.getDB(data, conf);
+ for (int j = 0; j<numOfBlocksPerContainer; j++) {
+ String blockName = containerName + "b" + j;
+ String deleteStateName = OzoneConsts.DELETING_KEY_PREFIX + blockName;
+ KeyData kd = new KeyData(containerName, deleteStateName);
+ List<ContainerProtos.ChunkInfo> chunks = Lists.newArrayList();
+ for (int k = 0; k<numOfChunksPerBlock; k++) {
+ // offset doesn't matter here
+ String chunkName = blockName + "_chunk_" + k;
+ File chunk = new File(chunkDir, chunkName);
+ FileUtils.writeStringToFile(chunk, "a chunk");
+ LOG.info("Creating file {}", chunk.getAbsolutePath());
+ // make sure file exists
+ Assert.assertTrue(chunk.isFile() && chunk.exists());
+ ContainerProtos.ChunkInfo info =
+ ContainerProtos.ChunkInfo.newBuilder()
+ .setChunkName(chunk.getAbsolutePath())
+ .setLen(0)
+ .setOffset(0)
+ .setChecksum("")
+ .build();
+ chunks.add(info);
+ }
+ kd.setChunks(chunks);
+ metadata.put(DFSUtil.string2Bytes(deleteStateName),
+ kd.getProtoBufMessage().toByteArray());
+ }
+ }
+ }
+
+ /**
+ * Run service runDeletingTasks and wait for it's been processed.
+ */
+ private void deleteAndWait(BlockDeletingServiceTestImpl service,
+ int timesOfProcessed) throws TimeoutException, InterruptedException {
+ service.runDeletingTasks();
+ GenericTestUtils.waitFor(()
+ -> service.getTimesOfProcessed() == timesOfProcessed, 100, 3000);
+ }
+
+ /**
+ * Get under deletion blocks count from DB,
+ * note this info is parsed from container.db.
+ */
+ private int getUnderDeletionBlocksCount(MetadataStore meta)
+ throws IOException {
+ List<Map.Entry<byte[], byte[]>> underDeletionBlocks =
+ meta.getRangeKVs(null, 100, new MetadataKeyFilters.KeyPrefixFilter(
+ OzoneConsts.DELETING_KEY_PREFIX));
+ return underDeletionBlocks.size();
+ }
+
+ @Test
+ public void testBlockDeletion() throws Exception {
+ Configuration conf = new OzoneConfiguration();
+ conf.setInt(OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, 10);
+ conf.setInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, 2);
+ ContainerManager containerManager = createContainerManager(conf);
+ createToDeleteBlocks(containerManager, conf, 1, 3, 1, chunksDir);
+
+ BlockDeletingServiceTestImpl svc =
+ new BlockDeletingServiceTestImpl(containerManager, 1000, conf);
+ svc.start();
+ GenericTestUtils.waitFor(() -> svc.isStarted(), 100, 3000);
+
+ // Ensure 1 container was created
+ List<ContainerData> containerData = Lists.newArrayList();
+ containerManager.listContainer(null, 1, "", containerData);
+ Assert.assertEquals(1, containerData.size());
+ MetadataStore meta = KeyUtils.getDB(containerData.get(0), conf);
+
+ // Ensure there is 100 blocks under deletion
+ Assert.assertEquals(3, getUnderDeletionBlocksCount(meta));
+
+ // An interval will delete 1 * 2 blocks
+ deleteAndWait(svc, 1);
+ Assert.assertEquals(1, getUnderDeletionBlocksCount(meta));
+
+ deleteAndWait(svc, 2);
+ Assert.assertEquals(0, getUnderDeletionBlocksCount(meta));
+
+ deleteAndWait(svc, 3);
+ Assert.assertEquals(0, getUnderDeletionBlocksCount(meta));
+
+ svc.shutdown();
+ }
+
+ @Test
+ public void testShutdownService() throws Exception {
+ Configuration conf = new OzoneConfiguration();
+ conf.setInt(OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS, 500);
+ conf.setInt(OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, 10);
+ conf.setInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, 10);
+ ContainerManager containerManager = createContainerManager(conf);
+ // Create 1 container with 100 blocks
+ createToDeleteBlocks(containerManager, conf, 1, 100, 1, chunksDir);
+
+ BlockDeletingServiceTestImpl service =
+ new BlockDeletingServiceTestImpl(containerManager, 1000, conf);
+ service.start();
+ GenericTestUtils.waitFor(() -> service.isStarted(), 100, 3000);
+
+ // Run some deleting tasks and verify there are threads running
+ service.runDeletingTasks();
+ GenericTestUtils.waitFor(() -> service.getThreadCount() > 0, 100, 1000);
+
+ // Wait for 1 or 2 intervals
+ Thread.sleep(1000);
+
+ // Shutdown service and verify all threads are stopped
+ service.shutdown();
+ GenericTestUtils.waitFor(() -> service.getThreadCount() == 0, 100, 1000);
+ }
+
+ @Test(timeout = 30000)
+ public void testContainerThrottle() throws Exception {
+ // Properties :
+ // - Number of containers : 2
+ // - Number of blocks per container : 1
+ // - Number of chunks per block : 10
+ // - Container limit per interval : 1
+ // - Block limit per container : 1
+ //
+ // Each time only 1 container can be processed, so each time
+ // 1 block from 1 container can be deleted.
+ Configuration conf = new OzoneConfiguration();
+ // Process 1 container per interval
+ conf.setInt(OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, 1);
+ conf.setInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, 1);
+ ContainerManager containerManager = createContainerManager(conf);
+ createToDeleteBlocks(containerManager, conf, 2, 1, 10, chunksDir);
+
+ BlockDeletingServiceTestImpl service =
+ new BlockDeletingServiceTestImpl(containerManager, 1000, conf);
+ service.start();
+
+ try {
+ GenericTestUtils.waitFor(() -> service.isStarted(), 100, 3000);
+ // 1st interval processes 1 container 1 block and 10 chunks
+ deleteAndWait(service, 1);
+ Assert.assertEquals(10, chunksDir.listFiles().length);
+ } finally {
+ service.shutdown();
+ }
+ }
+
+
+ @Test(timeout = 30000)
+ public void testBlockThrottle() throws Exception {
+ // Properties :
+ // - Number of containers : 5
+ // - Number of blocks per container : 3
+ // - Number of chunks per block : 1
+ // - Container limit per interval : 10
+ // - Block limit per container : 2
+ //
+ // Each time containers can be all scanned, but only 2 blocks
+ // per container can be actually deleted. So it requires 2 waves
+ // to cleanup all blocks.
+ Configuration conf = new OzoneConfiguration();
+ conf.setInt(OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, 10);
+ conf.setInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, 2);
+ ContainerManager containerManager = createContainerManager(conf);
+ createToDeleteBlocks(containerManager, conf, 5, 3, 1, chunksDir);
+
+ // Make sure chunks are created
+ Assert.assertEquals(15, chunksDir.listFiles().length);
+
+ BlockDeletingServiceTestImpl service =
+ new BlockDeletingServiceTestImpl(containerManager, 1000, conf);
+ service.start();
+
+ try {
+ GenericTestUtils.waitFor(() -> service.isStarted(), 100, 3000);
+ // Total blocks = 3 * 5 = 15
+ // block per task = 2
+ // number of containers = 5
+ // each interval will at most runDeletingTasks 5 * 2 = 10 blocks
+ deleteAndWait(service, 1);
+ Assert.assertEquals(5, chunksDir.listFiles().length);
+
+ // There is only 5 blocks left to runDeletingTasks
+ deleteAndWait(service, 2);
+ Assert.assertEquals(0, chunksDir.listFiles().length);
+ } finally {
+ service.shutdown();
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org