You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ww...@apache.org on 2017/08/28 02:05:22 UTC
hadoop git commit: HDFS-12282. Ozone: DeleteKey-4: Block delete via
HB between SCM and DN. Contributed by Weiwei Yang.
Repository: hadoop
Updated Branches:
refs/heads/HDFS-7240 1586f20fc -> 76a156ebc
HDFS-12282. Ozone: DeleteKey-4: Block delete via HB between SCM and DN. Contributed by Weiwei Yang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/76a156eb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/76a156eb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/76a156eb
Branch: refs/heads/HDFS-7240
Commit: 76a156ebce06291a4fbb4f2cf8e2a08528e1c2e2
Parents: 1586f20
Author: Weiwei Yang <ww...@apache.org>
Authored: Mon Aug 28 10:04:46 2017 +0800
Committer: Weiwei Yang <ww...@apache.org>
Committed: Mon Aug 28 10:04:46 2017 +0800
----------------------------------------------------------------------
.../helpers/DeletedContainerBlocksSummary.java | 103 +++++++++
.../statemachine/DatanodeStateMachine.java | 14 +-
.../background/BlockDeletingService.java | 30 ++-
.../DeleteBlocksCommandHandler.java | 198 +++++++++++++++++
.../states/endpoint/HeartbeatEndpointTask.java | 15 ++
.../StorageContainerDatanodeProtocol.java | 12 +-
.../protocol/commands/DeleteBlocksCommand.java | 63 ++++++
...rDatanodeProtocolClientSideTranslatorPB.java | 14 ++
...rDatanodeProtocolServerSideTranslatorPB.java | 13 ++
.../ozone/scm/StorageContainerManager.java | 66 +++++-
.../hadoop/ozone/scm/block/BlockManager.java | 17 ++
.../ozone/scm/block/BlockManagerImpl.java | 43 ++++
.../hadoop/ozone/scm/block/DeletedBlockLog.java | 21 +-
.../ozone/scm/block/DeletedBlockLogImpl.java | 53 ++++-
.../scm/block/SCMBlockDeletingService.java | 217 +++++++++++++++++++
.../hadoop/ozone/scm/node/NodeManager.java | 9 +
.../hadoop/ozone/scm/node/SCMNodeManager.java | 5 +
.../hadoop/utils/BackgroundTaskResult.java | 17 +-
.../StorageContainerDatanodeProtocol.proto | 27 ++-
.../ozone/TestStorageContainerManager.java | 116 +++++++++-
.../TestStorageContainerManagerHelper.java | 180 +++++++++++++++
.../ozone/container/common/ScmTestMock.java | 9 +
22 files changed, 1216 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DeletedContainerBlocksSummary.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DeletedContainerBlocksSummary.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DeletedContainerBlocksSummary.java
new file mode 100644
index 0000000..de5e2d0
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DeletedContainerBlocksSummary.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.helpers;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * A helper class to wrap the info about under deletion container blocks.
+ */
+public final class DeletedContainerBlocksSummary {
+
+ private final List<DeletedBlocksTransaction> blocks;
+ // key : txID
+ // value : times of this tx has been processed
+ private final Map<Long, Integer> txSummary;
+ // key : container name
+ // value : the number of blocks need to be deleted in this container
+ // if the message contains multiple entries for same block,
+ // blocks will be merged
+ private final Map<String, Integer> blockSummary;
+ // total number of blocks in this message
+ private int numOfBlocks;
+
+ private DeletedContainerBlocksSummary(List<DeletedBlocksTransaction> blocks) {
+ this.blocks = blocks;
+ txSummary = Maps.newHashMap();
+ blockSummary = Maps.newHashMap();
+ blocks.forEach(entry -> {
+ txSummary.put(entry.getTxID(), entry.getCount());
+ if (blockSummary.containsKey(entry.getContainerName())) {
+ blockSummary.put(entry.getContainerName(),
+ blockSummary.get(entry.getContainerName())
+ + entry.getBlockIDCount());
+ } else {
+ blockSummary.put(entry.getContainerName(), entry.getBlockIDCount());
+ }
+ numOfBlocks += entry.getBlockIDCount();
+ });
+ }
+
+ public static DeletedContainerBlocksSummary getFrom(
+ List<DeletedBlocksTransaction> blocks) {
+ return new DeletedContainerBlocksSummary(blocks);
+ }
+
+ public int getNumOfBlocks() {
+ return numOfBlocks;
+ }
+
+ public int getNumOfContainers() {
+ return blockSummary.size();
+ }
+
+ public String getTXIDs() {
+ return String.join(",", txSummary.keySet()
+ .stream().map(String::valueOf).collect(Collectors.toList()));
+ }
+
+ public String getTxIDSummary() {
+ List<String> txSummaryEntry = txSummary.entrySet().stream()
+ .map(entry -> entry.getKey() + "(" + entry.getValue() + ")")
+ .collect(Collectors.toList());
+ return "[" + String.join(",", txSummaryEntry) + "]";
+ }
+
+ @Override public String toString() {
+ StringBuffer sb = new StringBuffer();
+ for (DeletedBlocksTransaction blks : blocks) {
+ sb.append(" ")
+ .append("TXID=")
+ .append(blks.getTxID())
+ .append(", ")
+ .append("TimesProceed=")
+ .append(blks.getCount())
+ .append(", ")
+ .append(blks.getContainerName())
+ .append(" : [")
+ .append(String.join(",", blks.getBlockIDList())).append("]")
+ .append("\n");
+ }
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 66992af..45e47db 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.client.OzoneClientUtils;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.CommandDispatcher;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ContainerReportHandler;
+import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteBlocksCommandHandler;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.util.Time;
@@ -77,15 +78,16 @@ public class DatanodeStateMachine implements Closeable {
this.datanodeID = datanodeID;
nextHB = new AtomicLong(Time.monotonicNow());
-
// When we add new handlers just adding a new handler here should do the
// trick.
commandDispatcher = CommandDispatcher.newBuilder()
- .addHandler(new ContainerReportHandler())
- .setConnectionManager(connectionManager)
- .setContainer(container)
- .setContext(context)
- .build();
+ .addHandler(new ContainerReportHandler())
+ .addHandler(new DeleteBlocksCommandHandler(
+ container.getContainerManager(), conf))
+ .setConnectionManager(connectionManager)
+ .setContainer(container)
+ .setContext(context)
+ .build();
}
public void setDatanodeID(DatanodeID datanodeID) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java
index 498264e..48cf329 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
@@ -116,6 +117,11 @@ public class BlockDeletingService extends BackgroundService{
LOG.warn("Failed to initiate block deleting tasks, "
+ "caused by unable to get containers info. "
+ "Retry in next interval. ", e);
+ } catch (Exception e) {
+ // In case listContainer call throws any uncaught RuntimeException.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Unexpected error occurs during deleting blocks.", e);
+ }
}
return queue;
}
@@ -159,6 +165,7 @@ public class BlockDeletingService extends BackgroundService{
@Override
public BackgroundTaskResult call() throws Exception {
+ ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult();
long startTime = Time.monotonicNow();
// Scan container's db and get list of under deletion blocks
MetadataStore meta = KeyUtils.getDB(containerData, conf);
@@ -175,17 +182,24 @@ public class BlockDeletingService extends BackgroundService{
List<String> succeedBlocks = new LinkedList<>();
LOG.debug("Container : {}, To-Delete blocks : {}",
containerData.getContainerName(), toDeleteBlocks.size());
+ File dataDir = ContainerUtils.getDataDirectory(containerData).toFile();
+ if (!dataDir.exists() || !dataDir.isDirectory()) {
+ LOG.error("Invalid container data dir {} : "
+ + "not exist or not a directory", dataDir.getAbsolutePath());
+ return crr;
+ }
+
toDeleteBlocks.forEach(entry -> {
String blockName = DFSUtil.bytes2String(entry.getKey());
LOG.debug("Deleting block {}", blockName);
try {
ContainerProtos.KeyData data =
ContainerProtos.KeyData.parseFrom(entry.getValue());
-
for (ContainerProtos.ChunkInfo chunkInfo : data.getChunksList()) {
- File chunkFile = new File(chunkInfo.getChunkName());
+ File chunkFile = dataDir.toPath()
+ .resolve(chunkInfo.getChunkName()).toFile();
if (FileUtils.deleteQuietly(chunkFile)) {
- LOG.debug("block {} chunk {} deleted", blockName,
+ LOG.info("block {} chunk {} deleted", blockName,
chunkFile.getAbsolutePath());
}
}
@@ -201,11 +215,11 @@ public class BlockDeletingService extends BackgroundService{
batch.delete(DFSUtil.string2Bytes(entry)));
meta.writeBatch(batch);
- LOG.info("The elapsed time of task@{} for"
- + " deleting blocks: {}ms.",
- Integer.toHexString(this.hashCode()),
- Time.monotonicNow() - startTime);
- ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult();
+ if (!succeedBlocks.isEmpty()) {
+ LOG.info("Container: {}, deleted blocks: {}, task elapsed time: {}ms",
+ containerData.getContainerName(), succeedBlocks.size(),
+ Time.monotonicNow() - startTime);
+ }
crr.addAll(succeedBlocks);
return crr;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
new file mode 100644
index 0000000..a833cdf
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
+import org.apache.hadoop.ozone.container.common.helpers.DeletedContainerBlocksSummary;
+import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
+import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.utils.BatchOperation;
+import org.apache.hadoop.utils.MetadataStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Handle block deletion commands.
+ */
+public class DeleteBlocksCommandHandler implements CommandHandler {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DeleteBlocksCommandHandler.class);
+
+ private ContainerManager containerManager;
+ private Configuration conf;
+ private int invocationCount;
+ private long totalTime;
+
+ public DeleteBlocksCommandHandler(ContainerManager containerManager,
+ Configuration conf) {
+ this.containerManager = containerManager;
+ this.conf = conf;
+ }
+
+ @Override
+ public void handle(SCMCommand command, OzoneContainer container,
+ StateContext context, SCMConnectionManager connectionManager) {
+ if (command.getType() != Type.deleteBlocksCommand) {
+ LOG.warn("Skipping handling command, expected command "
+ + "type {} but found {}",
+ Type.deleteBlocksCommand, command.getType());
+ return;
+ }
+ LOG.debug("Processing block deletion command.");
+ invocationCount++;
+ long startTime = Time.monotonicNow();
+
+ // move blocks to deleting state.
+ // this is a metadata update, the actual deletion happens in another
+ // recycling thread.
+ DeleteBlocksCommand cmd = (DeleteBlocksCommand) command;
+ List<DeletedBlocksTransaction> containerBlocks = cmd.blocksTobeDeleted();
+
+
+ DeletedContainerBlocksSummary summary =
+ DeletedContainerBlocksSummary.getFrom(containerBlocks);
+ LOG.info("Start to delete container blocks, TXIDs={}, "
+ + "numOfContainers={}, numOfBlocks={}",
+ summary.getTxIDSummary(),
+ summary.getNumOfContainers(),
+ summary.getNumOfBlocks());
+
+ ContainerBlocksDeletionACKProto.Builder resultBuilder =
+ ContainerBlocksDeletionACKProto.newBuilder();
+ containerBlocks.forEach(entry -> {
+ DeleteBlockTransactionResult.Builder txResultBuilder =
+ DeleteBlockTransactionResult.newBuilder();
+ txResultBuilder.setTxID(entry.getTxID());
+ try {
+ deleteContainerBlocks(entry, conf);
+ txResultBuilder.setSuccess(true);
+ } catch (IOException e) {
+ LOG.warn("Failed to delete blocks for container={}, TXID={}",
+ entry.getContainerName(), entry.getTxID(), e);
+ txResultBuilder.setSuccess(false);
+ }
+ resultBuilder.addResults(txResultBuilder.build());
+ });
+ ContainerBlocksDeletionACKProto blockDeletionACK = resultBuilder.build();
+
+ // Send ACK back to SCM as long as meta updated
+ // TODO Or we should wait until the blocks are actually deleted?
+ if (!containerBlocks.isEmpty()) {
+ for (EndpointStateMachine endPoint : connectionManager.getValues()) {
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sending following block deletion ACK to SCM");
+ for (DeleteBlockTransactionResult result :
+ blockDeletionACK.getResultsList()) {
+ LOG.debug(result.getTxID() + " : " + result.getSuccess());
+ }
+ }
+ endPoint.getEndPoint()
+ .sendContainerBlocksDeletionACK(blockDeletionACK);
+ } catch (IOException e) {
+ LOG.error("Unable to send block deletion ACK to SCM {}",
+ endPoint.getAddress().toString(), e);
+ }
+ }
+ }
+
+ long endTime = Time.monotonicNow();
+ totalTime += endTime - startTime;
+ }
+
+ /**
+ * Move a bunch of blocks from a container to deleting state.
+ * This is a meta update, the actual deletes happen in async mode.
+ *
+ * @param delTX a block deletion transaction.
+ * @param config configuration.
+ * @throws IOException if I/O error occurs.
+ */
+ private void deleteContainerBlocks(DeletedBlocksTransaction delTX,
+ Configuration config) throws IOException {
+ String containerId = delTX.getContainerName();
+ ContainerData containerInfo = containerManager.readContainer(containerId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing Container : {}, DB path : {}", containerId,
+ containerInfo.getDBPath());
+ }
+ MetadataStore containerDB = KeyUtils.getDB(containerInfo, config);
+ for (String blk : delTX.getBlockIDList()) {
+ BatchOperation batch = new BatchOperation();
+ byte[] blkBytes = DFSUtil.string2Bytes(blk);
+ byte[] blkInfo = containerDB.get(blkBytes);
+ if (blkInfo != null) {
+ // Found the block in container db,
+ // use an atomic update to change its state to deleting.
+ batch.put(DFSUtil.string2Bytes(OzoneConsts.DELETING_KEY_PREFIX + blk),
+ blkInfo);
+ batch.delete(blkBytes);
+ try {
+ containerDB.writeBatch(batch);
+ LOG.info("Transited Block {} to DELETING state in container {}",
+ blk, containerId);
+ } catch (IOException e) {
+ // if some blocks failed to delete, we fail this TX,
+ // without sending this ACK to SCM, SCM will resend the TX
+ // with a certain number of retries.
+ throw new IOException(
+ "Failed to delete blocks for TXID = " + delTX.getTxID(), e);
+ }
+ } else {
+ LOG.info("Block {} not found or already under deletion in"
+ + " container {}, skip deleting it.", blk, containerId);
+ }
+ }
+ }
+
+ @Override
+ public Type getCommandType() {
+ return Type.deleteBlocksCommand;
+ }
+
+ @Override
+ public int getInvocationCount() {
+ return this.invocationCount;
+ }
+
+ @Override
+ public long getAverageRunTime() {
+ if (invocationCount > 0) {
+ return totalTime / invocationCount;
+ }
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index 764e6d2..d68351c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -21,11 +21,14 @@ package org.apache.hadoop.ozone.container.common.states.endpoint;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.container.common.helpers
+ .DeletedContainerBlocksSummary;
import org.apache.hadoop.ozone.container.common.statemachine
.EndpointStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine
.EndpointStateMachine.EndPointStates;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto;
import org.apache.hadoop.ozone.protocol.commands.SendContainerCommand;
@@ -144,6 +147,18 @@ public class HeartbeatEndpointTask
}
}
break;
+ case deleteBlocksCommand:
+ DeleteBlocksCommand db = DeleteBlocksCommand
+ .getFromProtobuf(commandResponseProto.getDeleteBlocksProto());
+ if (!db.blocksTobeDeleted().isEmpty()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(DeletedContainerBlocksSummary
+ .getFrom(db.blocksTobeDeleted())
+ .toString());
+ }
+ this.context.addCommand(db);
+ }
+ break;
default:
throw new IllegalArgumentException("Unknown response : "
+ commandResponseProto.getCmdType().name());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
index 75bd771..a7b8717 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
@@ -25,7 +25,8 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolPr
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
-
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKResponseProto;
import java.io.IOException;
/**
@@ -70,4 +71,13 @@ public interface StorageContainerDatanodeProtocol {
*/
SCMHeartbeatResponseProto sendContainerReport(ContainerReportsProto reports)
throws IOException;
+
+ /**
+ * Used by datanode to send block deletion ACK to SCM.
+ * @param request block deletion transactions.
+ * @return block deletion transaction response.
+ * @throws IOException
+ */
+ ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK(
+ ContainerBlocksDeletionACKProto request) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java
new file mode 100644
index 0000000..8e3463d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.protocol.commands;
+
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMDeleteBlocksCmdResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type;
+
+import java.util.List;
+
+/**
+ * A SCM command asks a datanode to delete a number of blocks.
+ */
+public class DeleteBlocksCommand extends
+ SCMCommand<SCMDeleteBlocksCmdResponseProto> {
+
+ private List<DeletedBlocksTransaction> blocksTobeDeleted;
+
+
+ public DeleteBlocksCommand(List<DeletedBlocksTransaction> blocks) {
+ this.blocksTobeDeleted = blocks;
+ }
+
+ public List<DeletedBlocksTransaction> blocksTobeDeleted() {
+ return this.blocksTobeDeleted;
+ }
+
+ @Override
+ public Type getType() {
+ return Type.deleteBlocksCommand;
+ }
+
+ @Override
+ public byte[] getProtoBufMessage() {
+ return getProto().toByteArray();
+ }
+
+ public static DeleteBlocksCommand getFromProtobuf(
+ SCMDeleteBlocksCmdResponseProto deleteBlocksProto) {
+ return new DeleteBlocksCommand(deleteBlocksProto
+ .getDeletedBlocksTransactionsList());
+ }
+
+ public SCMDeleteBlocksCmdResponseProto getProto() {
+ return SCMDeleteBlocksCmdResponseProto.newBuilder()
+ .addAllDeletedBlocksTransactions(blocksTobeDeleted).build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
index e0e3bee..033513d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
@@ -38,6 +38,8 @@ import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKResponseProto;
import java.io.Closeable;
import java.io.IOException;
@@ -177,4 +179,16 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB
return resp;
}
+ @Override
+ public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK(
+ ContainerBlocksDeletionACKProto deletedBlocks) throws IOException {
+ final ContainerBlocksDeletionACKResponseProto resp;
+ try {
+ resp = rpcProxy.sendContainerBlocksDeletionACK(NULL_RPC_CONTROLLER,
+ deletedBlocks);
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ return resp;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
index 116cc38..9808f3e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
@@ -24,6 +24,8 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolPr
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
import java.io.IOException;
@@ -97,4 +99,15 @@ public class StorageContainerDatanodeProtocolServerSideTranslatorPB
throw new ServiceException(e);
}
}
+
+ @Override
+ public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK(
+ RpcController controller, ContainerBlocksDeletionACKProto request)
+ throws ServiceException {
+ try {
+ return impl.sendContainerBlocksDeletionACK(request);
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
index fa14ad0..5a97735 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.ozone.client.OzoneClientUtils;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
+import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
@@ -52,6 +53,9 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolPr
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SendContainerReportProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageRequestProto;
import org.apache.hadoop.ozone.protocolPB.ScmBlockLocationProtocolServerSideTranslatorPB;
@@ -91,6 +95,8 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
+import java.util.Collections;
+import java.util.stream.Collectors;
import static org.apache.hadoop.ozone.protocol.proto
.ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result;
@@ -322,8 +328,8 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
* @throws InvalidProtocolBufferException
*/
@VisibleForTesting
- public static SCMCommandResponseProto getCommandResponse(SCMCommand cmd)
- throws InvalidProtocolBufferException {
+ public SCMCommandResponseProto getCommandResponse(SCMCommand cmd)
+ throws IOException {
Type type = cmd.getType();
SCMCommandResponseProto.Builder builder =
SCMCommandResponseProto.newBuilder();
@@ -346,6 +352,17 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
.setReregisterProto(SCMReregisterCmdResponseProto
.getDefaultInstance())
.build();
+ case deleteBlocksCommand:
+ // Once SCM sends out the deletion message, increment the count.
+ // this is done here instead of when SCM receives the ACK, because
+ // DN might not be able to response the ACK for sometime. In case
+ // it times out, SCM needs to re-send the message some more times.
+ List<Long> txs = ((DeleteBlocksCommand) cmd).blocksTobeDeleted()
+ .stream().map(tx -> tx.getTxID()).collect(Collectors.toList());
+ this.getScmBlockManager().getDeletedBlockLog().incrementCount(txs);
+ return builder.setCmdType(Type.deleteBlocksCommand)
+ .setDeleteBlocksProto(((DeleteBlocksCommand) cmd).getProto())
+ .build();
default:
throw new IllegalArgumentException("Not implemented");
}
@@ -591,6 +608,7 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
datanodeRpcAddress));
datanodeRpcServer.start();
httpServer.start();
+ scmBlockManager.start();
setStartTime();
@@ -628,6 +646,13 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
LOG.error("Storage Container Manager HTTP server stop failed.", ex);
}
+ try {
+ LOG.info("Stopping Block Manager Service.");
+ scmBlockManager.stop();
+ } catch (Exception ex) {
+ LOG.error("SCM block manager service stop failed.", ex);
+ }
+
unregisterMXBean();
IOUtils.cleanupWithLogger(LOG, scmContainerManager);
IOUtils.cleanupWithLogger(LOG, scmBlockManager);
@@ -715,6 +740,38 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
}
/**
+ * Handles the block deletion ACKs sent by datanodes. Once ACKs recieved,
+ * SCM considers the blocks are deleted and update the metadata in SCM DB.
+ *
+ * @param acks
+ * @return
+ * @throws IOException
+ */
+ @Override
+ public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK(
+ ContainerBlocksDeletionACKProto acks) throws IOException {
+ if (acks.getResultsCount() > 0) {
+ List<DeleteBlockTransactionResult> resultList = acks.getResultsList();
+ for (DeleteBlockTransactionResult result : resultList) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got block deletion ACK from datanode, TXIDs={}, "
+ + "success={}", result.getTxID(), result.getSuccess());
+ }
+ if (result.getSuccess()) {
+ LOG.info("Purging TXID={} from block deletion log", result.getTxID());
+ this.getScmBlockManager().getDeletedBlockLog()
+ .commitTransactions(Collections.singletonList(result.getTxID()));
+ } else {
+ LOG.warn("Got failed ACK for TXID={}, prepare to resend the "
+ + "TX in next interval", result.getTxID());
+ }
+ }
+ }
+ return ContainerBlocksDeletionACKResponseProto.newBuilder()
+ .getDefaultInstanceForType();
+ }
+
+ /**
* Returns the Number of Datanodes that are communicating with SCM.
*
* @param nodestate Healthy, Dead etc.
@@ -742,6 +799,11 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
return scmNodeManager;
}
+ @VisibleForTesting
+ public BlockManager getScmBlockManager() {
+ return scmBlockManager;
+ }
+
/**
* Get block locations.
* @param keys batch of block keys to retrieve.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManager.java
index d1487fc..4672b33 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManager.java
@@ -51,4 +51,21 @@ public interface BlockManager extends Closeable {
* @throws IOException
*/
void deleteBlock(String key) throws IOException;
+
+ /**
+ * @return the block deletion transaction log maintained by SCM.
+ */
+ DeletedBlockLog getDeletedBlockLog();
+
+ /**
+ * Start block manager background services.
+ * @throws IOException
+ */
+ void start() throws IOException;
+
+ /**
+ * Shutdown block manager background services.
+ * @throws IOException
+ */
+ void stop() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
index 43ca21c..d920c42 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
@@ -46,6 +46,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.List;
@@ -70,6 +71,14 @@ import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.
FAILED_TO_LOAD_OPEN_CONTAINER;
import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.
INVALID_BLOCK_SIZE;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+ .OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+ .OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+ .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+ .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
/**
* Block Manager manages the block access for SCM.
@@ -89,6 +98,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
// Track all containers owned by block service.
private final MetadataStore containerStore;
private final DeletedBlockLog deletedBlockLog;
+ private final SCMBlockDeletingService blockDeletingService;
private Map<OzoneProtos.LifeCycleState,
Map<String, BlockContainerInfo>> containers;
@@ -143,7 +153,34 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
this.lock = new ReentrantLock();
mxBean = MBeans.register("BlockManager", "BlockManagerImpl", this);
+
+ // SCM block deleting transaction log and deleting service.
deletedBlockLog = new DeletedBlockLogImpl(conf);
+ int svcInterval = conf.getInt(
+ OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS,
+ OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT);
+ long serviceTimeout = conf.getTimeDuration(
+ OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
+ OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
+ blockDeletingService = new SCMBlockDeletingService(deletedBlockLog,
+ containerManager, nodeManager, svcInterval, serviceTimeout);
+ }
+
+ /**
+ * Start block manager services.
+ * @throws IOException
+ */
+ public void start() throws IOException {
+ this.blockDeletingService.start();
+ }
+
+ /**
+ * Shutdown block manager services.
+ * @throws IOException
+ */
+ public void stop() throws IOException {
+ this.blockDeletingService.shutdown();
+ this.close();
}
// TODO: close full (or almost full) containers with a separate thread.
@@ -475,6 +512,11 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
}
}
+ @Override
+ public DeletedBlockLog getDeletedBlockLog() {
+ return this.deletedBlockLog;
+ }
+
@VisibleForTesting
public String getDeletedKeyName(String key) {
return StringUtils.format(".Deleted/%s", key);
@@ -495,6 +537,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
if (deletedBlockLog != null) {
deletedBlockLog.close();
}
+ blockDeletingService.shutdown();
MBeans.unregister(mxBean);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java
index 60d53af..9e268a6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.ozone.scm.block;
-
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
@@ -46,6 +45,17 @@ public interface DeletedBlockLog extends Closeable {
throws IOException;
/**
+ * Return all failed transactions in the log. A transaction is considered
+ * to be failed if it has been sent more than MAX_RETRY limit and its
+ * count is reset to -1.
+ *
+ * @return a list of failed deleted block transactions.
+ * @throws IOException
+ */
+ List<DeletedBlocksTransaction> getFailedTransactions()
+ throws IOException;
+
+ /**
* Increments count for given list of transactions by 1.
* The log maintains a valid range of counts for each transaction
* [0, MAX_RETRY]. If exceed this range, resets it to -1 to indicate
@@ -75,4 +85,13 @@ public interface DeletedBlockLog extends Closeable {
*/
void addTransaction(String containerName, List<String> blocks)
throws IOException;
+
+ /**
+ * Returns the total number of valid transactions. A transaction is
+ * considered to be valid as long as its count is in range [0, MAX_RETRY].
+ *
+ * @return number of a valid transactions.
+ * @throws IOException
+ */
+ int getNumOfValidTransactions() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java
index ef1a515..738157d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.scm.block;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
import com.google.common.primitives.Longs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil;
@@ -36,6 +37,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -147,6 +149,28 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
return result;
}
+ @Override
+ public List<DeletedBlocksTransaction> getFailedTransactions()
+ throws IOException {
+ lock.lock();
+ try {
+ final List<DeletedBlocksTransaction> failedTXs = Lists.newArrayList();
+ deletedStore.iterate(null, (key, value) -> {
+ if (!Arrays.equals(LATEST_TXID, key)) {
+ DeletedBlocksTransaction delTX =
+ DeletedBlocksTransaction.parseFrom(value);
+ if (delTX.getCount() == -1) {
+ failedTXs.add(delTX);
+ }
+ }
+ return true;
+ });
+ return failedTXs;
+ } finally {
+ lock.unlock();
+ }
+ }
+
/**
* {@inheritDoc}
*
@@ -163,13 +187,14 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
DeletedBlocksTransaction block = DeletedBlocksTransaction
.parseFrom(deletedStore.get(Longs.toByteArray(txID)));
DeletedBlocksTransaction.Builder builder = block.toBuilder();
- if (block.getCount() > -1) {
- builder.setCount(block.getCount() + 1);
+ int currentCount = block.getCount();
+ if (currentCount > -1) {
+ builder.setCount(++currentCount);
}
// if the retry time exceeds the maxRetry value
// then set the retry value to -1, stop retrying, admins can
// analyze those blocks and purge them manually by SCMCli.
- if (block.getCount() > maxRetry) {
+ if (currentCount > maxRetry) {
builder.setCount(-1);
}
deletedStore.put(Longs.toByteArray(txID),
@@ -238,6 +263,28 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
}
@Override
+ public int getNumOfValidTransactions() throws IOException {
+ lock.lock();
+ try {
+ final AtomicInteger num = new AtomicInteger(0);
+ deletedStore.iterate(null, (key, value) -> {
+ // Exclude latest txid record
+ if (!Arrays.equals(LATEST_TXID, key)) {
+ DeletedBlocksTransaction delTX =
+ DeletedBlocksTransaction.parseFrom(value);
+ if (delTX.getCount() > -1) {
+ num.incrementAndGet();
+ }
+ }
+ return true;
+ });
+ return num.get();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
public void close() throws IOException {
if (deletedStore != null) {
deletedStore.close();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/SCMBlockDeletingService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/SCMBlockDeletingService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/SCMBlockDeletingService.java
new file mode 100644
index 0000000..3ca1133
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/SCMBlockDeletingService.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.scm.block;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.ozone.scm.container.Mapping;
+import org.apache.hadoop.ozone.scm.node.NodeManager;
+import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.utils.BackgroundService;
+import org.apache.hadoop.utils.BackgroundTask;
+import org.apache.hadoop.utils.BackgroundTaskQueue;
+import org.apache.hadoop.utils.BackgroundTaskResult.EmptyTaskResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * A background service running in SCM to delete blocks. This service scans
+ * block deletion log in certain interval and caches block deletion commands
+ * in {@link org.apache.hadoop.ozone.scm.node.CommandQueue}, asynchronously
+ * SCM HB thread polls cached commands and sends them to datanode for physical
+ * processing.
+ */
+public class SCMBlockDeletingService extends BackgroundService {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SCMBlockDeletingService.class);
+
+ // ThreadPoolSize=2, 1 for scheduler and the other for the scanner.
+ private final static int BLOCK_DELETING_SERVICE_CORE_POOL_SIZE = 2;
+ private final DeletedBlockLog deletedBlockLog;
+ private final Mapping mappingService;
+ private final NodeManager nodeManager;
+
+ // Default container size is 5G and block size is 256MB, a full container
+ // at most contains 20 blocks. At most each TX contains 20 blocks.
+ // When SCM sends block deletion TXs to datanode, each command we allow
+ // at most 50 containers so that will limit number of to be deleted blocks
+ // less than 1000.
+ // TODO - a better throttle algorithm
+ // Note, this is not an accurate limit of blocks. When we scan
+ // the log, worst case we may get 50 TX for 50 different datanodes,
+ // that will cause the deletion message sent by SCM extremely small.
+ // As a result, the deletion will be slow. An improvement is to scan
+ // log multiple times until we get enough TXs for each datanode, or
+ // the entire log is scanned.
+ private static final int BLOCK_DELETE_TX_PER_REQUEST_LIMIT = 50;
+
+ public SCMBlockDeletingService(DeletedBlockLog deletedBlockLog,
+ Mapping mapper, NodeManager nodeManager,
+ int interval, long serviceTimeout) {
+ super("SCMBlockDeletingService", interval, TimeUnit.MILLISECONDS,
+ BLOCK_DELETING_SERVICE_CORE_POOL_SIZE, serviceTimeout);
+ this.deletedBlockLog = deletedBlockLog;
+ this.mappingService = mapper;
+ this.nodeManager = nodeManager;
+ }
+
+ @Override
+ public BackgroundTaskQueue getTasks() {
+ BackgroundTaskQueue queue = new BackgroundTaskQueue();
+ queue.add(new DeletedBlockTransactionScanner());
+ return queue;
+ }
+
+ private class DeletedBlockTransactionScanner
+ implements BackgroundTask<EmptyTaskResult> {
+
+ @Override
+ public int getPriority() {
+ return 1;
+ }
+
+ @Override
+ public EmptyTaskResult call() throws Exception {
+ // Scan SCM DB in HB interval and collect a throttled list of
+ // to delete blocks.
+ LOG.info("Running DeletedBlockTransactionScanner");
+ DatanodeDeletedBlockTransactions transactions =
+ getToDeleteContainerBlocks();
+ if (transactions != null && !transactions.isEmpty()) {
+ transactions.getDatanodes().forEach(datanodeID -> {
+ List<DeletedBlocksTransaction> dnTXs =
+ transactions.getDatanodeTransactions(datanodeID);
+ // TODO commandQueue needs a cap.
+ // We should stop caching new commands if num of un-processed
+ // command is bigger than a limit, e.g 50. In case datanode goes
+ // offline for sometime, the cached commands be flooded.
+ nodeManager.addDatanodeCommand(datanodeID,
+ new DeleteBlocksCommand(dnTXs));
+ LOG.info("Added delete block command for datanode {} in the queue,"
+ + " number delete block transactions: {}, TxID list: {}",
+ datanodeID, dnTXs.size(),
+ String.join(",", transactions.getTransactionIDList(datanodeID)));
+
+ });
+ }
+ return EmptyTaskResult.newResult();
+ }
+
+ // Scan deleteBlocks.db to get a number of to-delete blocks.
+ // this is going to be properly throttled.
+ private DatanodeDeletedBlockTransactions getToDeleteContainerBlocks() {
+ DatanodeDeletedBlockTransactions dnTXs =
+ new DatanodeDeletedBlockTransactions();
+ List<DeletedBlocksTransaction> txs = null;
+ try {
+ // Get a limited number of TXs to send via HB at a time.
+ txs = deletedBlockLog
+ .getTransactions(BLOCK_DELETE_TX_PER_REQUEST_LIMIT);
+ LOG.info("Scanned deleted blocks log and got {} delTX to process",
+ txs.size());
+ } catch (IOException e) {
+ // We may tolerant a number of failures for sometime
+ // but if it continues to fail, at some point we need to raise
+ // an exception and probably fail the SCM ? At present, it simply
+ // continues to retry the scanning.
+ LOG.error("Failed to get block deletion transactions from delTX log",
+ e);
+ }
+
+ if (txs != null) {
+ for (DeletedBlocksTransaction tx : txs) {
+ try {
+ ContainerInfo info = mappingService
+ .getContainer(tx.getContainerName());
+ // Find out the datanode where this TX is supposed to send to.
+ info.getPipeline().getMachines()
+ .forEach(entry -> dnTXs.addTransaction(entry, tx));
+ } catch (IOException e) {
+ LOG.warn("Container {} not found, continue to process next",
+ tx.getContainerName(), e);
+ }
+ }
+ }
+ return dnTXs;
+ }
+ }
+
+ /**
+ * A wrapper class to hold info about datanode and all deleted block
+ * transactions that will be sent to this datanode.
+ */
+ private static class DatanodeDeletedBlockTransactions {
+
+ // A list of TXs mapped to a certain datanode ID.
+ private final Map<DatanodeID, List<DeletedBlocksTransaction>> transactions;
+
+ DatanodeDeletedBlockTransactions() {
+ this.transactions = Maps.newHashMap();
+ }
+
+ void addTransaction(DatanodeID dnID, DeletedBlocksTransaction tx) {
+ if (transactions.containsKey(dnID)) {
+ transactions.get(dnID).add(tx);
+ } else {
+ List<DeletedBlocksTransaction> first = Lists.newArrayList();
+ first.add(tx);
+ transactions.put(dnID, first);
+ }
+ LOG.info("Transaction added: {} <- TX({})", dnID, tx.getTxID());
+ }
+
+ Set<DatanodeID> getDatanodes() {
+ return transactions.keySet();
+ }
+
+ boolean isEmpty() {
+ return transactions.isEmpty();
+ }
+
+ boolean hasTransactions(DatanodeID dnID) {
+ return transactions.containsKey(dnID) &&
+ !transactions.get(dnID).isEmpty();
+ }
+
+ List<DeletedBlocksTransaction> getDatanodeTransactions(DatanodeID dnID) {
+ return transactions.get(dnID);
+ }
+
+ List<String> getTransactionIDList(DatanodeID dnID) {
+ if (hasTransactions(dnID)) {
+ return transactions.get(dnID).stream()
+ .map(DeletedBlocksTransaction::getTxID)
+ .map(String::valueOf)
+ .collect(Collectors.toList());
+ } else {
+ return Collections.emptyList();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java
index c21e62c..dec3776 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState;
import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeMetric;
import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeStat;
@@ -134,4 +135,12 @@ public interface NodeManager extends StorageContainerNodeProtocol,
* @return Healthy/Stale/Dead.
*/
NodeState getNodeState(DatanodeID id);
+
+ /**
+ * Add a {@link SCMCommand} to the command queue, which are
+ * handled by HB thread asynchronously.
+ * @param id
+ * @param command
+ */
+ default void addDatanodeCommand(DatanodeID id, SCMCommand command) {}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java
index bcd3a02..6e2805a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java
@@ -836,4 +836,9 @@ public class SCMNodeManager
}
return nodeCountMap;
}
+
+ @Override
+ public void addDatanodeCommand(DatanodeID id, SCMCommand command) {
+ this.commandQueue.addCommand(id, command);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTaskResult.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTaskResult.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTaskResult.java
index b37a5db..198300f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTaskResult.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTaskResult.java
@@ -23,7 +23,22 @@ package org.apache.hadoop.utils;
public interface BackgroundTaskResult {
/**
- * Returns the size of entries included in this result.
+ * Returns the size of entries included in this result.
*/
int getSize();
+
+ /**
+ * An empty task result implementation.
+ */
+ class EmptyTaskResult implements BackgroundTaskResult {
+
+ public static EmptyTaskResult newResult() {
+ return new EmptyTaskResult();
+ }
+
+ @Override
+ public int getSize() {
+ return 0;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
index a8cfa57..bb3d137 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -122,7 +122,6 @@ message ContainerReportsProto {
required reportType type = 3;
}
-
/**
* This message is send along with the heart beat to report datanode
* storage utilization by SCM.
@@ -210,6 +209,7 @@ enum Type {
registeredCommand = 3;
sendContainerReport = 4;
reregisterCommand = 5;
+ deleteBlocksCommand = 6;
}
/*
@@ -221,6 +221,7 @@ message SCMCommandResponseProto {
optional SCMVersionResponseProto versionProto = 4;
optional SendContainerReportProto sendReport = 5;
optional SCMReregisterCmdResponseProto reregisterProto = 6;
+ optional SCMDeleteBlocksCmdResponseProto deleteBlocksProto = 7;
}
@@ -231,6 +232,24 @@ message SCMHeartbeatResponseProto {
repeated SCMCommandResponseProto commands = 1;
}
+// HB response from SCM, contains a list of block deletion transactions.
+message SCMDeleteBlocksCmdResponseProto {
+ repeated DeletedBlocksTransaction deletedBlocksTransactions = 1;
+}
+
+// SendACK response returned by datanode to SCM, currently empty.
+message ContainerBlocksDeletionACKResponseProto {
+}
+
+// ACK message datanode sent to SCM, contains the result of
+// block deletion transactions.
+message ContainerBlocksDeletionACKProto {
+ message DeleteBlockTransactionResult {
+ required int64 txID = 1;
+ required bool success = 2;
+ }
+ repeated DeleteBlockTransactionResult results = 1;
+}
/**
* Protocol used from a datanode to StorageContainerManager.
@@ -318,6 +337,10 @@ service StorageContainerDatanodeProtocolService {
send container reports sends the container report to SCM. This will
return a null command as response.
*/
- rpc sendContainerReport(ContainerReportsProto) returns (SCMHeartbeatResponseProto);
+ rpc sendContainerReport(ContainerReportsProto) returns (SCMHeartbeatResponseProto);
+ /**
+ * Sends the block deletion ACK to SCM.
+ */
+ rpc sendContainerBlocksDeletionACK (ContainerBlocksDeletionACKProto) returns (ContainerBlocksDeletionACKResponseProto);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
index 5d8308e..d0c363d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
@@ -17,22 +17,35 @@
*/
package org.apache.hadoop.ozone;
-import static org.junit.Assert.*;
-
+import static org.junit.Assert.fail;
import java.io.IOException;
+import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.scm.StorageContainerManager;
+import org.apache.hadoop.ozone.scm.block.DeletedBlockLog;
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.junit.Rule;
import org.junit.Assert;
import org.junit.Test;
import org.junit.rules.ExpectedException;
+import java.util.List;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.Map;
+import java.util.Collections;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
+import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.io.IOUtils;
import org.junit.rules.Timeout;
import org.mockito.Mockito;
+import org.apache.hadoop.test.GenericTestUtils;
/**
* Test class that exercises the StorageContainerManager.
@@ -149,4 +162,103 @@ public class TestStorageContainerManager {
Assert.assertTrue(e instanceof IOException);
Assert.assertEquals(expectedErrorMessage, e.getMessage());
}
+
+ @Test
+ public void testBlockDeletionTransactions() throws Exception {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.setInt(ScmConfigKeys.OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 5);
+ conf.setInt(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 3000);
+ conf.setInt(ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 5);
+ conf.setInt(OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS, 1000);
+ MiniOzoneCluster cluster =
+ new MiniOzoneCluster.Builder(conf).numDataNodes(1)
+ .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
+
+ DeletedBlockLog delLog = cluster.getStorageContainerManager()
+ .getScmBlockManager().getDeletedBlockLog();
+ Assert.assertEquals(0, delLog.getNumOfValidTransactions());
+
+ // Create 20 random names keys.
+ TestStorageContainerManagerHelper helper =
+ new TestStorageContainerManagerHelper(cluster, conf);
+ Map<String, KsmKeyInfo> keyLocations = helper.createKeys(20, 4096);
+
+ // These keys will be written into a bunch of containers,
+ // gets a set of container names, verify container containerBlocks
+ // on datanodes.
+ Set<String> containerNames = new HashSet<>();
+ for (Map.Entry<String, KsmKeyInfo> entry : keyLocations.entrySet()) {
+ entry.getValue().getKeyLocationList()
+ .forEach(loc -> containerNames.add(loc.getContainerName()));
+ }
+
+ // Total number of containerBlocks of these containers should be equal to
+ // total number of containerBlocks via creation call.
+ int totalCreatedBlocks = 0;
+ for (KsmKeyInfo info : keyLocations.values()) {
+ totalCreatedBlocks += info.getKeyLocationList().size();
+ }
+ Assert.assertTrue(totalCreatedBlocks > 0);
+ Assert.assertEquals(totalCreatedBlocks,
+ helper.getAllBlocks(containerNames).size());
+
+ // Create a deletion TX for each key.
+ Map<String, List<String>> containerBlocks = Maps.newHashMap();
+ for (KsmKeyInfo info : keyLocations.values()) {
+ List<KsmKeyLocationInfo> list = info.getKeyLocationList();
+ list.forEach(location -> {
+ if (containerBlocks.containsKey(location.getContainerName())) {
+ containerBlocks.get(location.getContainerName())
+ .add(location.getBlockID());
+ } else {
+ List<String> blks = Lists.newArrayList();
+ blks.add(location.getBlockID());
+ containerBlocks.put(location.getContainerName(), blks);
+ }
+ });
+ }
+ for (Map.Entry<String, List<String>> tx : containerBlocks.entrySet()) {
+ delLog.addTransaction(tx.getKey(), tx.getValue());
+ }
+
+ // Verify a few TX gets created in the TX log.
+ Assert.assertTrue(delLog.getNumOfValidTransactions() > 0);
+
+ // Once TXs are written into the log, SCM starts to fetch TX
+ // entries from the log and schedule block deletions in HB interval,
+ // after sometime, all the TX should be proceed and by then
+ // the number of containerBlocks of all known containers will be
+ // empty again.
+ GenericTestUtils.waitFor(() -> {
+ try {
+ return delLog.getNumOfValidTransactions() == 0;
+ } catch (IOException e) {
+ return false;
+ }
+ }, 1000, 10000);
+ Assert.assertTrue(helper.getAllBlocks(containerNames).isEmpty());
+
+ // Continue the work, add some TXs that with known container names,
+ // but unknown block IDs.
+ for (String containerName : containerBlocks.keySet()) {
+ // Add 2 TXs per container.
+ delLog.addTransaction(containerName,
+ Collections.singletonList(RandomStringUtils.randomAlphabetic(5)));
+ delLog.addTransaction(containerName,
+ Collections.singletonList(RandomStringUtils.randomAlphabetic(5)));
+ }
+
+ // Verify a few TX gets created in the TX log.
+ Assert.assertTrue(delLog.getNumOfValidTransactions() > 0);
+
+ // These blocks cannot be found in the container, skip deleting them
+ // eventually these TX will success.
+ GenericTestUtils.waitFor(() -> {
+ try {
+ return delLog.getFailedTransactions().size() == 0;
+ } catch (IOException e) {
+ return false;
+ }
+ }, 1000, 10000);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
new file mode 100644
index 0000000..2123d6f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
+import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
+import org.apache.hadoop.ozone.web.handlers.BucketArgs;
+import org.apache.hadoop.ozone.web.handlers.KeyArgs;
+import org.apache.hadoop.ozone.web.handlers.UserArgs;
+import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
+import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
+import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
+import org.apache.hadoop.utils.MetadataStore;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.Map;
+import java.util.LinkedList;
+import java.util.Set;
+
+/**
+ * A helper class used by {@link TestStorageContainerManager} to generate
+ * some keys and helps to verify containers and blocks locations.
+ */
+public class TestStorageContainerManagerHelper {
+
+ private final MiniOzoneCluster cluster;
+ private final Configuration conf;
+ private final StorageHandler storageHandler;
+
+ public TestStorageContainerManagerHelper(MiniOzoneCluster cluster,
+ Configuration conf) throws IOException {
+ this.cluster = cluster;
+ this.conf = conf;
+ storageHandler = new ObjectStoreHandler(conf).getStorageHandler();
+ }
+
+ public Map<String, KsmKeyInfo> createKeys(int numOfKeys, int keySize)
+ throws Exception {
+ Map<String, KsmKeyInfo> keyLocationMap = Maps.newHashMap();
+ String volume = "volume" + RandomStringUtils.randomNumeric(5);
+ String bucket = "bucket" + RandomStringUtils.randomNumeric(5);
+ String userName = "user" + RandomStringUtils.randomNumeric(5);
+ String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+ UserArgs userArgs = new UserArgs(null, OzoneUtils.getRequestID(),
+ null, null, null, null);
+
+ VolumeArgs createVolumeArgs = new VolumeArgs(volume, userArgs);
+ createVolumeArgs.setUserName(userName);
+ createVolumeArgs.setAdminName(adminName);
+ storageHandler.createVolume(createVolumeArgs);
+
+ BucketArgs bucketArgs = new BucketArgs(bucket, createVolumeArgs);
+ bucketArgs.setAddAcls(new LinkedList<>());
+ bucketArgs.setRemoveAcls(new LinkedList<>());
+ bucketArgs.setStorageType(StorageType.DISK);
+ storageHandler.createBucket(bucketArgs);
+
+ // Write 20 keys in bucket.
+ Set<String> keyNames = Sets.newHashSet();
+ KeyArgs keyArgs;
+ for (int i = 0; i < numOfKeys; i++) {
+ String keyName = RandomStringUtils.randomAlphabetic(5) + i;
+ keyNames.add(keyName);
+ keyArgs = new KeyArgs(keyName, bucketArgs);
+ keyArgs.setSize(keySize);
+ // Just for testing list keys call, so no need to write real data.
+ OutputStream stream = storageHandler.newKeyWriter(keyArgs);
+ stream.write(DFSUtil.string2Bytes(
+ RandomStringUtils.randomAlphabetic(5)));
+ stream.close();
+ }
+
+ for (String key : keyNames) {
+ KsmKeyArgs arg = new KsmKeyArgs.Builder()
+ .setVolumeName(volume)
+ .setBucketName(bucket)
+ .setKeyName(key)
+ .build();
+ KsmKeyInfo location = cluster.getKeySpaceManager()
+ .lookupKey(arg);
+ keyLocationMap.put(key, location);
+ }
+ return keyLocationMap;
+ }
+
+ public List<String> getPendingDeletionBlocks(String containerName)
+ throws IOException {
+ List<String> pendingDeletionBlocks = Lists.newArrayList();
+ MetadataStore meta = getContainerMetadata(containerName);
+ KeyPrefixFilter filter =
+ new KeyPrefixFilter(OzoneConsts.DELETING_KEY_PREFIX);
+ List<Map.Entry<byte[], byte[]>> kvs = meta
+ .getRangeKVs(null, Integer.MAX_VALUE, filter);
+ kvs.forEach(entry -> {
+ String key = DFSUtil.bytes2String(entry.getKey());
+ pendingDeletionBlocks
+ .add(key.replace(OzoneConsts.DELETING_KEY_PREFIX, ""));
+ });
+ return pendingDeletionBlocks;
+ }
+
+ public List<String> getAllBlocks(Set<String> containerNames)
+ throws IOException {
+ List<String> allBlocks = Lists.newArrayList();
+ for (String containerName : containerNames) {
+ allBlocks.addAll(getAllBlocks(containerName));
+ }
+ return allBlocks;
+ }
+
+ public List<String> getAllBlocks(String containerName) throws IOException {
+ List<String> allBlocks = Lists.newArrayList();
+ MetadataStore meta = getContainerMetadata(containerName);
+ MetadataKeyFilter filter =
+ (preKey, currentKey, nextKey) -> !DFSUtil.bytes2String(currentKey)
+ .startsWith(OzoneConsts.DELETING_KEY_PREFIX);
+ List<Map.Entry<byte[], byte[]>> kvs =
+ meta.getRangeKVs(null, Integer.MAX_VALUE, filter);
+ kvs.forEach(entry -> {
+ String key = DFSUtil.bytes2String(entry.getKey());
+ allBlocks.add(key.replace(OzoneConsts.DELETING_KEY_PREFIX, ""));
+ });
+ return allBlocks;
+ }
+
+ private MetadataStore getContainerMetadata(String containerName)
+ throws IOException {
+ Pipeline pipeline = cluster.getStorageContainerManager()
+ .getContainer(containerName);
+ DatanodeID leadDN = pipeline.getLeader();
+ OzoneContainer containerServer =
+ getContainerServerByDatanodeID(leadDN.getDatanodeUuid());
+ ContainerData containerData = containerServer.getContainerManager()
+ .readContainer(containerName);
+ return KeyUtils.getDB(containerData, conf);
+ }
+
+ private OzoneContainer getContainerServerByDatanodeID(String dnUUID)
+ throws IOException {
+ for (DataNode dn : cluster.getDataNodes()) {
+ if (dn.getDatanodeId().getDatanodeUuid().equals(dnUUID)) {
+ return dn.getOzoneContainerManager();
+ }
+ }
+ throw new IOException("Unable to get the ozone container "
+ + "for given datanode ID " + dnUUID);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
index c0bdd9e..edef2b4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
@@ -25,6 +25,8 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolPr
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKResponseProto;
import org.apache.hadoop.ozone.scm.VersionInfo;
import java.io.IOException;
@@ -188,6 +190,13 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
.build();
}
+ @Override
+ public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK(
+ ContainerBlocksDeletionACKProto request) throws IOException {
+ return ContainerBlocksDeletionACKResponseProto
+ .newBuilder().getDefaultInstanceForType();
+ }
+
public ReportState getReportState() {
return this.reportState;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org