You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by lj...@apache.org on 2021/01/06 14:47:21 UTC
[ozone] 02/02: HDDS-4369. Datanode should store the delete
transaction as is in rocksDB (#1702)
This is an automated email from the ASF dual-hosted git repository.
ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 431e9097b4d0e7f3d5de04ee86cc8d4ba0c05e30
Author: Aryan Gupta <44...@users.noreply.github.com>
AuthorDate: Wed Jan 6 20:15:44 2021 +0530
HDDS-4369. Datanode should store the delete transaction as is in rocksDB (#1702)
---
.../java/org/apache/hadoop/ozone/OzoneConsts.java | 6 +-
.../commandhandler/DeleteBlocksCommandHandler.java | 191 ++++++++------
.../background/BlockDeletingService.java | 171 +++++++++++--
.../metadata/AbstractDatanodeDBDefinition.java | 2 +-
.../metadata/DatanodeSchemaOneDBDefinition.java | 5 +
.../metadata/DatanodeSchemaTwoDBDefinition.java | 32 ++-
.../metadata/DatanodeStoreSchemaTwoImpl.java | 14 +-
...mpl.java => DeletedBlocksTransactionCodec.java} | 35 +--
.../container/common/TestBlockDeletingService.java | 279 +++++++++++++++++----
.../hadoop/ozone/TestStorageContainerManager.java | 13 +-
.../ozone/TestStorageContainerManagerHelper.java | 30 +++
11 files changed, 604 insertions(+), 174 deletions(-)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index d2d6e35..9836452 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -254,11 +254,15 @@ public final class OzoneConsts {
// versions, requiring this property to be tracked on a per container basis.
// V1: All data in default column family.
public static final String SCHEMA_V1 = "1";
- // V2: Metadata, block data, and deleted blocks in their own column families.
+ // V2: Metadata, block data, and delete transactions in their own
+ // column families.
public static final String SCHEMA_V2 = "2";
// Most recent schema version that all new containers should be created with.
public static final String SCHEMA_LATEST = SCHEMA_V2;
+ public static final String[] SCHEMA_VERSIONS =
+ new String[] {SCHEMA_V1, SCHEMA_V2};
+
// Supported store types.
public static final String OZONE = "ozone";
public static final String S3 = "s3";
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
index 91ab4c9..10e6797 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
@@ -42,6 +42,8 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.statemachine
.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.metadata.DatanodeStore;
+import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaTwoImpl;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlockCommandStatus;
@@ -59,6 +61,8 @@ import java.util.function.Consumer;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.CONTAINER_NOT_FOUND;
+import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V1;
+import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V2;
/**
* Handle block deletion commands.
@@ -116,6 +120,7 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
DeleteBlockTransactionResult.newBuilder();
txResultBuilder.setTxID(entry.getTxID());
long containerId = entry.getContainerID();
+ int newDeletionBlocks = 0;
try {
Container cont = containerSet.getContainer(containerId);
if (cont == null) {
@@ -129,7 +134,16 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
cont.getContainerData();
cont.writeLock();
try {
- deleteKeyValueContainerBlocks(containerData, entry);
+ if (containerData.getSchemaVersion().equals(SCHEMA_V1)) {
+ markBlocksForDeletionSchemaV1(containerData, entry);
+ } else if (containerData.getSchemaVersion().equals(SCHEMA_V2)) {
+ markBlocksForDeletionSchemaV2(containerData, entry,
+ newDeletionBlocks, entry.getTxID());
+ } else {
+ throw new UnsupportedOperationException(
+ "Only schema version 1 and schema version 2 are "
+ + "supported.");
+ }
} finally {
cont.writeUnlock();
}
@@ -187,107 +201,140 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
* @param delTX a block deletion transaction.
* @throws IOException if I/O error occurs.
*/
- private void deleteKeyValueContainerBlocks(
- KeyValueContainerData containerData, DeletedBlocksTransaction delTX)
- throws IOException {
+
+ private void markBlocksForDeletionSchemaV2(
+ KeyValueContainerData containerData, DeletedBlocksTransaction delTX,
+ int newDeletionBlocks, long txnID) throws IOException {
long containerId = delTX.getContainerID();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Processing Container : {}, DB path : {}", containerId,
- containerData.getMetadataPath());
+ if (!isTxnIdValid(containerId, containerData, delTX)) {
+ return;
}
-
- if (delTX.getTxID() < containerData.getDeleteTransactionId()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("Ignoring delete blocks for containerId: %d."
- + " Outdated delete transactionId %d < %d", containerId,
- delTX.getTxID(), containerData.getDeleteTransactionId()));
+ try (ReferenceCountedDB containerDB = BlockUtils
+ .getDB(containerData, conf)) {
+ DatanodeStore ds = containerDB.getStore();
+ DatanodeStoreSchemaTwoImpl dnStoreTwoImpl =
+ (DatanodeStoreSchemaTwoImpl) ds;
+ Table<Long, DeletedBlocksTransaction> delTxTable =
+ dnStoreTwoImpl.getDeleteTransactionTable();
+ try (BatchOperation batch = containerDB.getStore().getBatchHandler()
+ .initBatchOperation()) {
+ delTxTable.putWithBatch(batch, txnID, delTX);
+ newDeletionBlocks += delTX.getLocalIDList().size();
+ updateMetaData(containerData, delTX, newDeletionBlocks, containerDB,
+ batch);
+ containerDB.getStore().getBatchHandler().commitBatchOperation(batch);
}
- return;
}
+ }
+ private void markBlocksForDeletionSchemaV1(
+ KeyValueContainerData containerData, DeletedBlocksTransaction delTX)
+ throws IOException {
+ long containerId = delTX.getContainerID();
+ if (!isTxnIdValid(containerId, containerData, delTX)) {
+ return;
+ }
int newDeletionBlocks = 0;
- try(ReferenceCountedDB containerDB =
- BlockUtils.getDB(containerData, conf)) {
+ try (ReferenceCountedDB containerDB = BlockUtils
+ .getDB(containerData, conf)) {
Table<String, BlockData> blockDataTable =
- containerDB.getStore().getBlockDataTable();
+ containerDB.getStore().getBlockDataTable();
Table<String, ChunkInfoList> deletedBlocksTable =
- containerDB.getStore().getDeletedBlocksTable();
+ containerDB.getStore().getDeletedBlocksTable();
- for (Long blkLong : delTX.getLocalIDList()) {
- String blk = blkLong.toString();
- BlockData blkInfo = blockDataTable.get(blk);
- if (blkInfo != null) {
- String deletingKey = OzoneConsts.DELETING_KEY_PREFIX + blk;
-
- if (blockDataTable.get(deletingKey) != null
- || deletedBlocksTable.get(blk) != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format(
- "Ignoring delete for block %s in container %d."
- + " Entry already added.", blk, containerId));
+ try (BatchOperation batch = containerDB.getStore().getBatchHandler()
+ .initBatchOperation()) {
+ for (Long blkLong : delTX.getLocalIDList()) {
+ String blk = blkLong.toString();
+ BlockData blkInfo = blockDataTable.get(blk);
+ if (blkInfo != null) {
+ String deletingKey = OzoneConsts.DELETING_KEY_PREFIX + blk;
+ if (blockDataTable.get(deletingKey) != null
+ || deletedBlocksTable.get(blk) != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format(
+ "Ignoring delete for block %s in container %d."
+ + " Entry already added.", blk, containerId));
+ }
+ continue;
}
- continue;
- }
-
- try(BatchOperation batch = containerDB.getStore()
- .getBatchHandler().initBatchOperation()) {
// Found the block in container db,
// use an atomic update to change its state to deleting.
blockDataTable.putWithBatch(batch, deletingKey, blkInfo);
blockDataTable.deleteWithBatch(batch, blk);
- containerDB.getStore().getBatchHandler()
- .commitBatchOperation(batch);
newDeletionBlocks++;
if (LOG.isDebugEnabled()) {
LOG.debug("Transited Block {} to DELETING state in container {}",
blk, containerId);
}
- } catch (IOException e) {
- // if some blocks failed to delete, we fail this TX,
- // without sending this ACK to SCM, SCM will resend the TX
- // with a certain number of retries.
- throw new IOException(
- "Failed to delete blocks for TXID = " + delTX.getTxID(), e);
- }
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Block {} not found or already under deletion in"
- + " container {}, skip deleting it.", blk, containerId);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Block {} not found or already under deletion in"
+ + " container {}, skip deleting it.", blk, containerId);
+ }
}
}
+ updateMetaData(containerData, delTX, newDeletionBlocks, containerDB,
+ batch);
+ containerDB.getStore().getBatchHandler().commitBatchOperation(batch);
+ } catch (IOException e) {
+ // if some blocks failed to delete, we fail this TX,
+ // without sending this ACK to SCM, SCM will resend the TX
+ // with a certain number of retries.
+ throw new IOException(
+ "Failed to delete blocks for TXID = " + delTX.getTxID(), e);
}
+ }
+ }
- if (newDeletionBlocks > 0) {
- // Finally commit the DB counters.
- try(BatchOperation batchOperation =
- containerDB.getStore().getBatchHandler().initBatchOperation()) {
- Table< String, Long > metadataTable = containerDB.getStore()
- .getMetadataTable();
+ private void updateMetaData(KeyValueContainerData containerData,
+ DeletedBlocksTransaction delTX, int newDeletionBlocks,
+ ReferenceCountedDB containerDB, BatchOperation batchOperation)
+ throws IOException {
+ if (newDeletionBlocks > 0) {
+ // Finally commit the DB counters.
+ Table<String, Long> metadataTable =
+ containerDB.getStore().getMetadataTable();
- // In memory is updated only when existing delete transactionID is
- // greater.
- if (delTX.getTxID() > containerData.getDeleteTransactionId()) {
- // Update in DB pending delete key count and delete transaction ID.
- metadataTable.putWithBatch(batchOperation,
- OzoneConsts.DELETE_TRANSACTION_KEY, delTX.getTxID());
- }
+ // In memory is updated only when existing delete transactionID is
+ // greater.
+ if (delTX.getTxID() > containerData.getDeleteTransactionId()) {
+ // Update in DB pending delete key count and delete transaction ID.
+ metadataTable
+ .putWithBatch(batchOperation, OzoneConsts.DELETE_TRANSACTION_KEY,
+ delTX.getTxID());
+ }
- long pendingDeleteBlocks =
- containerData.getNumPendingDeletionBlocks() + newDeletionBlocks;
- metadataTable.putWithBatch(batchOperation,
- OzoneConsts.PENDING_DELETE_BLOCK_COUNT, pendingDeleteBlocks);
+ long pendingDeleteBlocks =
+ containerData.getNumPendingDeletionBlocks() + newDeletionBlocks;
+ metadataTable
+ .putWithBatch(batchOperation, OzoneConsts.PENDING_DELETE_BLOCK_COUNT,
+ pendingDeleteBlocks);
- containerDB.getStore().getBatchHandler()
- .commitBatchOperation(batchOperation);
+ // update pending deletion blocks count and delete transaction ID in
+ // in-memory container status
+ containerData.updateDeleteTransactionId(delTX.getTxID());
+ containerData.incrPendingDeletionBlocks(newDeletionBlocks);
+ }
+ }
- // update pending deletion blocks count and delete transaction ID in
- // in-memory container status
- containerData.updateDeleteTransactionId(delTX.getTxID());
+ private boolean isTxnIdValid(long containerId,
+ KeyValueContainerData containerData, DeletedBlocksTransaction delTX) {
+ boolean b = true;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing Container : {}, DB path : {}", containerId,
+ containerData.getMetadataPath());
+ }
- containerData.incrPendingDeletionBlocks(newDeletionBlocks);
- }
+ if (delTX.getTxID() < containerData.getDeleteTransactionId()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Ignoring delete blocks for containerId: %d."
+ + " Outdated delete transactionId %d < %d", containerId,
+ delTX.getTxID(), containerData.getDeleteTransactionId()));
}
+ b = false;
}
+ return b;
}
@Override
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java
index b03b7d7..3dab1fa 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java
@@ -20,11 +20,12 @@ package org.apache.hadoop.ozone.container.keyvalue.statemachine.background;
import java.io.File;
import java.io.IOException;
+import java.util.UUID;
import java.util.LinkedList;
+import java.util.Objects;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
-import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -32,14 +33,15 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
-import org.apache.hadoop.hdds.utils.BackgroundService;
-import org.apache.hadoop.hdds.utils.BackgroundTask;
-import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
import org.apache.hadoop.hdds.utils.MetadataKeyFilters.KeyPrefixFilter;
import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.impl.TopNOrderedContainerDeletionChoosingPolicy;
@@ -50,14 +52,22 @@ import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverSe
import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
+import org.apache.hadoop.ozone.container.metadata.DatanodeStore;
+import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaTwoImpl;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.util.Time;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import com.google.common.collect.Lists;
+
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V1;
+import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V2;
+
import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,7 +76,7 @@ import org.slf4j.LoggerFactory;
* A per-datanode container block deleting service takes in charge
* of deleting staled ozone blocks.
*/
-// TODO: Fix BlockDeletingService to work with new StorageLayer
+
public class BlockDeletingService extends BackgroundService {
private static final Logger LOG =
@@ -244,21 +254,54 @@ public class BlockDeletingService extends BackgroundService {
@Override
public BackgroundTaskResult call() throws Exception {
- ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult();
+ ContainerBackgroundTaskResult crr;
final Container container = ozoneContainer.getContainerSet()
.getContainer(containerData.getContainerID());
container.writeLock();
+ File dataDir = new File(containerData.getChunksPath());
long startTime = Time.monotonicNow();
// Scan container's db and get list of under deletion blocks
try (ReferenceCountedDB meta = BlockUtils.getDB(containerData, conf)) {
+ if (containerData.getSchemaVersion().equals(SCHEMA_V1)) {
+ crr = deleteViaSchema1(meta, container, dataDir, startTime);
+ } else if (containerData.getSchemaVersion().equals(SCHEMA_V2)) {
+ crr = deleteViaSchema2(meta, container, dataDir, startTime);
+ } else {
+ throw new UnsupportedOperationException(
+ "Only schema version 1 and schema version 2 are supported.");
+ }
+ return crr;
+ } finally {
+ container.writeUnlock();
+ }
+ }
+
+ public boolean checkDataDir(File dataDir) {
+ boolean b = true;
+ if (!dataDir.exists() || !dataDir.isDirectory()) {
+ LOG.error("Invalid container data dir {} : "
+ + "does not exist or not a directory", dataDir.getAbsolutePath());
+ b = false;
+ }
+ return b;
+ }
+
+ public ContainerBackgroundTaskResult deleteViaSchema1(
+ ReferenceCountedDB meta, Container container, File dataDir,
+ long startTime) throws IOException {
+ ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult();
+ if (!checkDataDir(dataDir)) {
+ return crr;
+ }
+ try {
Table<String, BlockData> blockDataTable =
- meta.getStore().getBlockDataTable();
+ meta.getStore().getBlockDataTable();
// # of blocks to delete is throttled
KeyPrefixFilter filter = MetadataKeyFilters.getDeletingKeyFilter();
List<? extends Table.KeyValue<String, BlockData>> toDeleteBlocks =
blockDataTable.getSequentialRangeKVs(null, blockLimitPerTask,
- filter);
+ filter);
if (toDeleteBlocks.isEmpty()) {
LOG.debug("No under deletion block found in container : {}",
containerData.getContainerID());
@@ -267,12 +310,6 @@ public class BlockDeletingService extends BackgroundService {
List<String> succeedBlocks = new LinkedList<>();
LOG.debug("Container : {}, To-Delete blocks : {}",
containerData.getContainerID(), toDeleteBlocks.size());
- File dataDir = new File(containerData.getChunksPath());
- if (!dataDir.exists() || !dataDir.isDirectory()) {
- LOG.error("Invalid container data dir {} : "
- + "does not exist or not a directory", dataDir.getAbsolutePath());
- return crr;
- }
Handler handler = Objects.requireNonNull(ozoneContainer.getDispatcher()
.getHandler(container.getContainerType()));
@@ -292,7 +329,7 @@ public class BlockDeletingService extends BackgroundService {
// Once blocks are deleted... remove the blockID from blockDataTable.
try(BatchOperation batch = meta.getStore().getBatchHandler()
- .initBatchOperation()) {
+ .initBatchOperation()) {
for (String entry : succeedBlocks) {
blockDataTable.deleteWithBatch(batch, entry);
}
@@ -312,8 +349,106 @@ public class BlockDeletingService extends BackgroundService {
}
crr.addAll(succeedBlocks);
return crr;
- } finally {
- container.writeUnlock();
+ } catch (IOException exception) {
+ LOG.warn(
+ "Deletion operation was not successful for container: " + container
+ .getContainerData().getContainerID(), exception);
+ throw exception;
+ }
+ }
+
+ public ContainerBackgroundTaskResult deleteViaSchema2(
+ ReferenceCountedDB meta, Container container, File dataDir,
+ long startTime) throws IOException {
+ ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult();
+ if (!checkDataDir(dataDir)) {
+ return crr;
+ }
+ try {
+ Table<String, BlockData> blockDataTable =
+ meta.getStore().getBlockDataTable();
+ DatanodeStore ds = meta.getStore();
+ DatanodeStoreSchemaTwoImpl dnStoreTwoImpl =
+ (DatanodeStoreSchemaTwoImpl) ds;
+ Table<Long, DeletedBlocksTransaction>
+ deleteTxns = dnStoreTwoImpl.getDeleteTransactionTable();
+ List<DeletedBlocksTransaction> delBlocks = new ArrayList<>();
+ int totalBlocks = 0;
+ try (TableIterator<Long,
+ ? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
+ dnStoreTwoImpl.getDeleteTransactionTable().iterator()) {
+ while (iter.hasNext() && (totalBlocks < blockLimitPerTask)) {
+ DeletedBlocksTransaction delTx = iter.next().getValue();
+ totalBlocks += delTx.getLocalIDList().size();
+ delBlocks.add(delTx);
+ }
+ }
+
+ if (delBlocks.isEmpty()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No transaction found in container : {}",
+ containerData.getContainerID());
+ }
+ return crr;
+ }
+
+ LOG.debug("Container : {}, To-Delete blocks : {}",
+ containerData.getContainerID(), delBlocks.size());
+
+ Handler handler = Objects.requireNonNull(ozoneContainer.getDispatcher()
+ .getHandler(container.getContainerType()));
+
+ deleteTransactions(delBlocks, handler, blockDataTable, container);
+
+ // Once blocks are deleted... remove the blockID from blockDataTable
+ // and also remove the transactions from txnTable.
+ try(BatchOperation batch = meta.getStore().getBatchHandler()
+ .initBatchOperation()) {
+ for (DeletedBlocksTransaction delTx : delBlocks) {
+ deleteTxns.deleteWithBatch(batch, delTx.getTxID());
+ for (Long blk : delTx.getLocalIDList()) {
+ String bID = blk.toString();
+ meta.getStore().getBlockDataTable().deleteWithBatch(batch, bID);
+ }
+ }
+ meta.getStore().getBatchHandler().commitBatchOperation(batch);
+ containerData.updateAndCommitDBCounters(meta, batch,
+ totalBlocks);
+ // update count of pending deletion blocks and block count in
+ // in-memory container status.
+ containerData.decrPendingDeletionBlocks(totalBlocks);
+ containerData.decrKeyCount(totalBlocks);
+ }
+
+ LOG.info("Container: {}, deleted blocks: {}, task elapsed time: {}ms",
+ containerData.getContainerID(), totalBlocks,
+ Time.monotonicNow() - startTime);
+
+ return crr;
+ } catch (IOException exception) {
+ LOG.warn(
+ "Deletion operation was not successful for container: " + container
+ .getContainerData().getContainerID(), exception);
+ throw exception;
+ }
+ }
+
+ private void deleteTransactions(List<DeletedBlocksTransaction> delBlocks,
+ Handler handler, Table<String, BlockData> blockDataTable,
+ Container container) throws IOException {
+ for (DeletedBlocksTransaction entry : delBlocks) {
+ for (Long blkLong : entry.getLocalIDList()) {
+ String blk = blkLong.toString();
+ BlockData blkInfo = blockDataTable.get(blk);
+ LOG.debug("Deleting block {}", blk);
+ try {
+ handler.deleteBlock(container, blkInfo);
+ } catch (InvalidProtocolBufferException e) {
+ LOG.error("Failed to parse block info for block {}", blk, e);
+ } catch (IOException e) {
+ LOG.error("Failed to delete files for block {}", blk, e);
+ }
+ }
}
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java
index 8895475..2fb1174 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java
@@ -60,7 +60,7 @@ public abstract class AbstractDatanodeDBDefinition implements DBDefinition {
@Override
public DBColumnFamilyDefinition[] getColumnFamilies() {
return new DBColumnFamilyDefinition[] {getBlockDataColumnFamily(),
- getMetadataColumnFamily(), getDeletedBlocksColumnFamily()};
+ getMetadataColumnFamily(), getDeletedBlocksColumnFamily()};
}
public abstract DBColumnFamilyDefinition<String, BlockData>
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaOneDBDefinition.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaOneDBDefinition.java
index faf399d..7d5e053 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaOneDBDefinition.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaOneDBDefinition.java
@@ -88,4 +88,9 @@ public class DatanodeSchemaOneDBDefinition
getDeletedBlocksColumnFamily() {
return DELETED_BLOCKS;
}
+
+ public DBColumnFamilyDefinition[] getColumnFamilies() {
+ return new DBColumnFamilyDefinition[] {getBlockDataColumnFamily(),
+ getMetadataColumnFamily(), getDeletedBlocksColumnFamily() };
+ }
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java
index 2ac56f2..1fabd13 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java
@@ -17,16 +17,19 @@
*/
package org.apache.hadoop.ozone.container.metadata;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.utils.db.DBColumnFamilyDefinition;
import org.apache.hadoop.hdds.utils.db.LongCodec;
import org.apache.hadoop.hdds.utils.db.StringCodec;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfoList;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
/**
* This class defines the RocksDB structure for datanodes following schema
- * version 2, where the block data, metadata, and deleted block ids are put in
- * their own separate column families.
+ * version 2, where the block data, metadata, and transactions which are to be
+ * deleted are put in their own separate column families.
*/
public class DatanodeSchemaTwoDBDefinition extends
AbstractDatanodeDBDefinition {
@@ -34,7 +37,7 @@ public class DatanodeSchemaTwoDBDefinition extends
public static final DBColumnFamilyDefinition<String, BlockData>
BLOCK_DATA =
new DBColumnFamilyDefinition<>(
- "block_data",
+ "blockData",
String.class,
new StringCodec(),
BlockData.class,
@@ -52,17 +55,33 @@ public class DatanodeSchemaTwoDBDefinition extends
public static final DBColumnFamilyDefinition<String, ChunkInfoList>
DELETED_BLOCKS =
new DBColumnFamilyDefinition<>(
- "deleted_blocks",
+ "deletedBlocks",
String.class,
new StringCodec(),
ChunkInfoList.class,
new ChunkInfoListCodec());
+ public static final DBColumnFamilyDefinition<Long, DeletedBlocksTransaction>
+ DELETE_TRANSACTION =
+ new DBColumnFamilyDefinition<>(
+ "deleteTxns",
+ Long.class,
+ new LongCodec(),
+ StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction.class,
+ new DeletedBlocksTransactionCodec());
+
protected DatanodeSchemaTwoDBDefinition(String dbPath) {
super(dbPath);
}
@Override
+ public DBColumnFamilyDefinition[] getColumnFamilies() {
+ return new DBColumnFamilyDefinition[] {getBlockDataColumnFamily(),
+ getMetadataColumnFamily(), getDeletedBlocksColumnFamily(),
+ getDeleteTransactionsColumnFamily()};
+ }
+
+ @Override
public DBColumnFamilyDefinition<String, BlockData>
getBlockDataColumnFamily() {
return BLOCK_DATA;
@@ -78,4 +97,9 @@ public class DatanodeSchemaTwoDBDefinition extends
getDeletedBlocksColumnFamily() {
return DELETED_BLOCKS;
}
+
+ public DBColumnFamilyDefinition<Long, DeletedBlocksTransaction>
+ getDeleteTransactionsColumnFamily() {
+ return DELETE_TRANSACTION;
+ }
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java
index df9b8c0..db8fe6b 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java
@@ -18,6 +18,9 @@
package org.apache.hadoop.ozone.container.metadata;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.proto.
+ StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.hdds.utils.db.Table;
import java.io.IOException;
@@ -26,10 +29,13 @@ import java.io.IOException;
* three column families/tables:
* 1. A block data table.
* 2. A metadata table.
- * 3. A deleted blocks table.
+ * 3. A Delete Transaction Table.
*/
public class DatanodeStoreSchemaTwoImpl extends AbstractDatanodeStore {
+ private final Table<Long, DeletedBlocksTransaction>
+ deleteTransactionTable;
+
/**
* Constructs the datanode store and starts the DB Services.
*
@@ -41,5 +47,11 @@ public class DatanodeStoreSchemaTwoImpl extends AbstractDatanodeStore {
throws IOException {
super(config, containerID, new DatanodeSchemaTwoDBDefinition(dbPath),
openReadOnly);
+ this.deleteTransactionTable = new DatanodeSchemaTwoDBDefinition(dbPath)
+ .getDeleteTransactionsColumnFamily().getTable(getStore());
+ }
+
+ public Table<Long, DeletedBlocksTransaction> getDeleteTransactionTable() {
+ return deleteTransactionTable;
}
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DeletedBlocksTransactionCodec.java
similarity index 54%
copy from hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java
copy to hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DeletedBlocksTransactionCodec.java
index df9b8c0..90c26fe 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaTwoImpl.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DeletedBlocksTransactionCodec.java
@@ -17,29 +17,30 @@
*/
package org.apache.hadoop.ozone.container.metadata;
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.utils.db.Codec;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import java.io.IOException;
/**
- * Constructs a datanode store in accordance with schema version 2, which uses
- * three column families/tables:
- * 1. A block data table.
- * 2. A metadata table.
- * 3. A deleted blocks table.
+ * Supports encoding and decoding {@link DeletedBlocksTransaction} objects.
*/
-public class DatanodeStoreSchemaTwoImpl extends AbstractDatanodeStore {
+public class DeletedBlocksTransactionCodec
+ implements Codec<DeletedBlocksTransaction> {
- /**
- * Constructs the datanode store and starts the DB Services.
- *
- * @param config - Ozone Configuration.
- * @throws IOException - on Failure.
- */
- public DatanodeStoreSchemaTwoImpl(ConfigurationSource config,
- long containerID, String dbPath, boolean openReadOnly)
+ @Override public byte[] toPersistedFormat(
+ DeletedBlocksTransaction deletedBlocksTransaction) {
+ return deletedBlocksTransaction.toByteArray();
+ }
+
+ @Override public DeletedBlocksTransaction fromPersistedFormat(byte[] rawData)
throws IOException {
- super(config, containerID, new DatanodeSchemaTwoDBDefinition(dbPath),
- openReadOnly);
+ return DeletedBlocksTransaction.parseFrom(rawData);
+ }
+
+ @Override public DeletedBlocksTransaction copyObject(
+ DeletedBlocksTransaction deletedBlocksTransaction) {
+ throw new UnsupportedOperationException();
}
}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
index 2eb6a39..96d4228 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
@@ -21,9 +21,10 @@ package org.apache.hadoop.ozone.container.common;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.UUID;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -34,9 +35,13 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.MutableConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.utils.BackgroundService;
import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChunkBuffer;
@@ -55,7 +60,6 @@ import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
-import org.apache.hadoop.ozone.container.keyvalue.ChunkLayoutTestInfo;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
@@ -64,11 +68,14 @@ import org.apache.hadoop.ozone.container.keyvalue.impl.FilePerBlockStrategy;
import org.apache.hadoop.ozone.container.keyvalue.impl.FilePerChunkStrategy;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
import org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingService;
+import org.apache.hadoop.ozone.container.metadata.DatanodeStore;
+import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaTwoImpl;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.container.testutils.BlockDeletingServiceTestImpl;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
+import static java.util.stream.Collectors.toList;
import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
import org.junit.AfterClass;
@@ -82,7 +89,11 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_CONTA
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
+import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_VERSIONS;
+import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V1;
+import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V2;
import static org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion.FILE_PER_BLOCK;
+import static org.apache.hadoop.ozone.container.common.states.endpoint.VersionEndpointTask.LOG;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -101,16 +112,38 @@ public class TestBlockDeletingService {
private static MutableConfigurationSource conf;
private final ChunkLayOutVersion layout;
+ private final String schemaVersion;
private int blockLimitPerTask;
private static VolumeSet volumeSet;
- public TestBlockDeletingService(ChunkLayOutVersion layout) {
- this.layout = layout;
+ public TestBlockDeletingService(LayoutInfo layoutInfo) {
+ this.layout = layoutInfo.layout;
+ this.schemaVersion = layoutInfo.schemaVersion;
}
@Parameterized.Parameters
public static Iterable<Object[]> parameters() {
- return ChunkLayoutTestInfo.chunkLayoutParameters();
+ return LayoutInfo.layoutList.stream().map(each -> new Object[] {each})
+ .collect(toList());
+ }
+
+ public static class LayoutInfo {
+ private final String schemaVersion;
+ private final ChunkLayOutVersion layout;
+
+ public LayoutInfo(String schemaVersion, ChunkLayOutVersion layout) {
+ this.schemaVersion = schemaVersion;
+ this.layout = layout;
+ }
+
+ private static List<LayoutInfo> layoutList = new ArrayList<>();
+ static {
+ for (ChunkLayOutVersion ch : ChunkLayOutVersion.getAllVersions()) {
+ for (String sch : SCHEMA_VERSIONS) {
+ layoutList.add(new LayoutInfo(sch, ch));
+ }
+ }
+ }
}
@BeforeClass
@@ -158,6 +191,7 @@ public class TestBlockDeletingService {
}
byte[] arr = randomAlphanumeric(1048576).getBytes(UTF_8);
ChunkBuffer buffer = ChunkBuffer.wrap(ByteBuffer.wrap(arr));
+ int txnID = 0;
for (int x = 0; x < numOfContainers; x++) {
long containerID = ContainerTestHelper.getTestContainerID();
KeyValueContainerData data =
@@ -165,55 +199,164 @@ public class TestBlockDeletingService {
ContainerTestHelper.CONTAINER_MAX_SIZE,
UUID.randomUUID().toString(), datanodeUuid);
data.closeContainer();
+ data.setSchemaVersion(schemaVersion);
KeyValueContainer container = new KeyValueContainer(data, conf);
container.create(volumeSet,
new RoundRobinVolumeChoosingPolicy(), scmId);
containerSet.addContainer(container);
data = (KeyValueContainerData) containerSet.getContainer(
containerID).getContainerData();
- long chunkLength = 100;
- try(ReferenceCountedDB metadata = BlockUtils.getDB(data, conf)) {
- for (int j = 0; j < numOfBlocksPerContainer; j++) {
- BlockID blockID =
- ContainerTestHelper.getTestBlockID(containerID);
- String deleteStateName = OzoneConsts.DELETING_KEY_PREFIX +
- blockID.getLocalID();
- BlockData kd = new BlockData(blockID);
- List<ContainerProtos.ChunkInfo> chunks = Lists.newArrayList();
- for (int k = 0; k < numOfChunksPerBlock; k++) {
- final String chunkName = String.format("block.%d.chunk.%d", j, k);
- final long offset = k * chunkLength;
- ContainerProtos.ChunkInfo info =
- ContainerProtos.ChunkInfo.newBuilder()
- .setChunkName(chunkName)
- .setLen(chunkLength)
- .setOffset(offset)
- .setChecksumData(Checksum.getNoChecksumDataProto())
- .build();
- chunks.add(info);
- ChunkInfo chunkInfo = new ChunkInfo(chunkName, offset, chunkLength);
- ChunkBuffer chunkData = buffer.duplicate(0, (int) chunkLength);
- chunkManager.writeChunk(container, blockID, chunkInfo, chunkData,
- WRITE_STAGE);
- chunkManager.writeChunk(container, blockID, chunkInfo, chunkData,
- COMMIT_STAGE);
- }
- kd.setChunks(chunks);
- metadata.getStore().getBlockDataTable().put(
- deleteStateName, kd);
- container.getContainerData().incrPendingDeletionBlocks(1);
- }
- container.getContainerData().setKeyCount(numOfBlocksPerContainer);
- // Set block count, bytes used and pending delete block count.
- metadata.getStore().getMetadataTable().put(
- OzoneConsts.BLOCK_COUNT, (long)numOfBlocksPerContainer);
- metadata.getStore().getMetadataTable().put(
- OzoneConsts.CONTAINER_BYTES_USED,
- chunkLength * numOfChunksPerBlock * numOfBlocksPerContainer);
- metadata.getStore().getMetadataTable().put(
- OzoneConsts.PENDING_DELETE_BLOCK_COUNT,
- (long)numOfBlocksPerContainer);
+ if (data.getSchemaVersion().equals(SCHEMA_V1)) {
+ createPendingDeleteBlocksSchema1(numOfBlocksPerContainer, data,
+ containerID, numOfChunksPerBlock, buffer, chunkManager, container);
+ } else if (data.getSchemaVersion().equals(SCHEMA_V2)) {
+ createPendingDeleteBlocksSchema2(numOfBlocksPerContainer, txnID,
+ containerID, numOfChunksPerBlock, buffer, chunkManager, container,
+ data);
+ } else {
+ throw new UnsupportedOperationException(
+ "Only schema version 1 and schema version 2 are "
+ + "supported.");
+ }
+ }
+ }
+
+ @SuppressWarnings("checkstyle:parameternumber")
+ private void createPendingDeleteBlocksSchema1(int numOfBlocksPerContainer,
+ KeyValueContainerData data, long containerID, int numOfChunksPerBlock,
+ ChunkBuffer buffer, ChunkManager chunkManager,
+ KeyValueContainer container) {
+ BlockID blockID = null;
+ try (ReferenceCountedDB metadata = BlockUtils.getDB(data, conf)) {
+ for (int j = 0; j < numOfBlocksPerContainer; j++) {
+ blockID = ContainerTestHelper.getTestBlockID(containerID);
+ String deleteStateName =
+ OzoneConsts.DELETING_KEY_PREFIX + blockID.getLocalID();
+ BlockData kd = new BlockData(blockID);
+ List<ContainerProtos.ChunkInfo> chunks = Lists.newArrayList();
+ putChunksInBlock(numOfChunksPerBlock, j, chunks, buffer, chunkManager,
+ container, blockID);
+ kd.setChunks(chunks);
+ metadata.getStore().getBlockDataTable().put(deleteStateName, kd);
+ container.getContainerData().incrPendingDeletionBlocks(1);
+ }
+ updateMetaData(data, container, numOfBlocksPerContainer,
+ numOfChunksPerBlock);
+ } catch (IOException exception) {
+ LOG.info("Exception " + exception);
+ LOG.warn("Failed to put block: " + blockID + " in BlockDataTable.");
+ }
+ }
+
+ @SuppressWarnings("checkstyle:parameternumber")
+ private void createPendingDeleteBlocksSchema2(int numOfBlocksPerContainer,
+ int txnID, long containerID, int numOfChunksPerBlock, ChunkBuffer buffer,
+ ChunkManager chunkManager, KeyValueContainer container,
+ KeyValueContainerData data) {
+ List<Long> containerBlocks = new ArrayList<>();
+ int blockCount = 0;
+ for (int i = 0; i < numOfBlocksPerContainer; i++) {
+ txnID = txnID + 1;
+ BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
+ BlockData kd = new BlockData(blockID);
+ List<ContainerProtos.ChunkInfo> chunks = Lists.newArrayList();
+ putChunksInBlock(numOfChunksPerBlock, i, chunks, buffer, chunkManager,
+ container, blockID);
+ kd.setChunks(chunks);
+ String bID = null;
+ try (ReferenceCountedDB metadata = BlockUtils.getDB(data, conf)) {
+ bID = blockID.getLocalID() + "";
+ metadata.getStore().getBlockDataTable().put(bID, kd);
+ } catch (IOException exception) {
+ LOG.info("Exception = " + exception);
+ LOG.warn("Failed to put block: " + bID + " in BlockDataTable.");
+ }
+ container.getContainerData().incrPendingDeletionBlocks(1);
+
+ // In below if statements we are checking if a single container
+ // consists of more blocks than 'blockLimitPerTask' then we create
+ // (totalBlocksInContainer / blockLimitPerTask) transactions which
+ // consists of blocks equal to blockLimitPerTask and last transaction
+ // consists of blocks equal to
+ // (totalBlocksInContainer % blockLimitPerTask).
+ containerBlocks.add(blockID.getLocalID());
+ blockCount++;
+ if (blockCount == blockLimitPerTask || i == (numOfBlocksPerContainer
+ - 1)) {
+ createTxn(data, containerBlocks, txnID, containerID);
+ containerBlocks.clear();
+ blockCount = 0;
+ }
+ }
+ updateMetaData(data, container, numOfBlocksPerContainer,
+ numOfChunksPerBlock);
+ }
+
+ private void createTxn(KeyValueContainerData data, List<Long> containerBlocks,
+ int txnID, long containerID) {
+ try (ReferenceCountedDB metadata = BlockUtils.getDB(data, conf)) {
+ StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction dtx =
+ StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction
+ .newBuilder().setTxID(txnID).setContainerID(containerID)
+ .addAllLocalID(containerBlocks).setCount(0).build();
+ try (BatchOperation batch = metadata.getStore().getBatchHandler()
+ .initBatchOperation()) {
+ DatanodeStore ds = metadata.getStore();
+ DatanodeStoreSchemaTwoImpl dnStoreTwoImpl =
+ (DatanodeStoreSchemaTwoImpl) ds;
+ dnStoreTwoImpl.getDeleteTransactionTable()
+ .putWithBatch(batch, (long) txnID, dtx);
+ metadata.getStore().getBatchHandler().commitBatchOperation(batch);
}
+ } catch (IOException exception) {
+ LOG.warn("Transaction creation was not successful for txnID: " + txnID
+ + " consisting of " + containerBlocks.size() + " blocks.");
+ }
+ }
+
+ private void putChunksInBlock(int numOfChunksPerBlock, int i,
+ List<ContainerProtos.ChunkInfo> chunks, ChunkBuffer buffer,
+ ChunkManager chunkManager, KeyValueContainer container, BlockID blockID) {
+ long chunkLength = 100;
+ try {
+ for (int k = 0; k < numOfChunksPerBlock; k++) {
+ final String chunkName = String.format("block.%d.chunk.%d", i, k);
+ final long offset = k * chunkLength;
+ ContainerProtos.ChunkInfo info =
+ ContainerProtos.ChunkInfo.newBuilder().setChunkName(chunkName)
+ .setLen(chunkLength).setOffset(offset)
+ .setChecksumData(Checksum.getNoChecksumDataProto()).build();
+ chunks.add(info);
+ ChunkInfo chunkInfo = new ChunkInfo(chunkName, offset, chunkLength);
+ ChunkBuffer chunkData = buffer.duplicate(0, (int) chunkLength);
+ chunkManager
+ .writeChunk(container, blockID, chunkInfo, chunkData, WRITE_STAGE);
+ chunkManager
+ .writeChunk(container, blockID, chunkInfo, chunkData, COMMIT_STAGE);
+ }
+ } catch (IOException ex) {
+ LOG.warn("Putting chunks in blocks was not successful for BlockID: "
+ + blockID);
+ }
+ }
+
+ private void updateMetaData(KeyValueContainerData data,
+ KeyValueContainer container, int numOfBlocksPerContainer,
+ int numOfChunksPerBlock) {
+ long chunkLength = 100;
+ try (ReferenceCountedDB metadata = BlockUtils.getDB(data, conf)) {
+ container.getContainerData().setKeyCount(numOfBlocksPerContainer);
+ // Set block count, bytes used and pending delete block count.
+ metadata.getStore().getMetadataTable()
+ .put(OzoneConsts.BLOCK_COUNT, (long) numOfBlocksPerContainer);
+ metadata.getStore().getMetadataTable()
+ .put(OzoneConsts.CONTAINER_BYTES_USED,
+ chunkLength * numOfChunksPerBlock * numOfBlocksPerContainer);
+ metadata.getStore().getMetadataTable()
+ .put(OzoneConsts.PENDING_DELETE_BLOCK_COUNT,
+ (long) numOfBlocksPerContainer);
+ } catch (IOException exception) {
+ LOG.warn("Meta Data update was not successful for container: "+container);
}
}
@@ -231,11 +374,32 @@ public class TestBlockDeletingService {
* Get under deletion blocks count from DB,
* note this info is parsed from container.db.
*/
- private int getUnderDeletionBlocksCount(ReferenceCountedDB meta)
- throws IOException {
- return meta.getStore().getBlockDataTable()
- .getRangeKVs(null, 100,
- MetadataKeyFilters.getDeletingKeyFilter()).size();
+ private int getUnderDeletionBlocksCount(ReferenceCountedDB meta,
+ KeyValueContainerData data) throws IOException {
+ if (data.getSchemaVersion().equals(SCHEMA_V1)) {
+ return meta.getStore().getBlockDataTable()
+ .getRangeKVs(null, 100, MetadataKeyFilters.getDeletingKeyFilter())
+ .size();
+ } else if (data.getSchemaVersion().equals(SCHEMA_V2)) {
+ int pendingBlocks = 0;
+ DatanodeStore ds = meta.getStore();
+ DatanodeStoreSchemaTwoImpl dnStoreTwoImpl =
+ (DatanodeStoreSchemaTwoImpl) ds;
+ try (
+ TableIterator<Long, ? extends Table.KeyValue<Long,
+ StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction>>
+ iter = dnStoreTwoImpl.getDeleteTransactionTable().iterator()) {
+ while (iter.hasNext()) {
+ StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction
+ delTx = iter.next().getValue();
+ pendingBlocks += delTx.getLocalIDList().size();
+ }
+ }
+ return pendingBlocks;
+ } else {
+ throw new UnsupportedOperationException(
+ "Only schema version 1 and schema version 2 are supported.");
+ }
}
@@ -261,6 +425,7 @@ public class TestBlockDeletingService {
// Ensure 1 container was created
List<ContainerData> containerData = Lists.newArrayList();
containerSet.listContainer(0L, 1, containerData);
+ KeyValueContainerData data = (KeyValueContainerData) containerData.get(0);
Assert.assertEquals(1, containerData.size());
try(ReferenceCountedDB meta = BlockUtils.getDB(
@@ -280,7 +445,7 @@ public class TestBlockDeletingService {
Assert.assertEquals(0, transactionId);
// Ensure there are 3 blocks under deletion and 0 deleted blocks
- Assert.assertEquals(3, getUnderDeletionBlocksCount(meta));
+ Assert.assertEquals(3, getUnderDeletionBlocksCount(meta, data));
Assert.assertEquals(3, meta.getStore().getMetadataTable()
.get(OzoneConsts.PENDING_DELETE_BLOCK_COUNT).longValue());
@@ -348,6 +513,9 @@ public class TestBlockDeletingService {
public void testBlockDeletionTimeout() throws Exception {
conf.setInt(OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, 10);
conf.setInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, 2);
+ this.blockLimitPerTask =
+ conf.getInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER,
+ OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT);
ContainerSet containerSet = new ContainerSet();
createToDeleteBlocks(containerSet, 1, 3, 1);
ContainerMetrics metrics = ContainerMetrics.create(conf);
@@ -394,7 +562,7 @@ public class TestBlockDeletingService {
LogCapturer newLog = LogCapturer.captureLogs(BackgroundService.LOG);
GenericTestUtils.waitFor(() -> {
try {
- return getUnderDeletionBlocksCount(meta) == 0;
+ return getUnderDeletionBlocksCount(meta, data) == 0;
} catch (IOException ignored) {
}
return false;
@@ -445,6 +613,9 @@ public class TestBlockDeletingService {
TopNOrderedContainerDeletionChoosingPolicy.class.getName());
conf.setInt(OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, 1);
conf.setInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, 1);
+ this.blockLimitPerTask =
+ conf.getInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER,
+ OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT);
ContainerSet containerSet = new ContainerSet();
int containerCount = 2;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
index 548f073..7bccd8e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
@@ -92,11 +92,15 @@ import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY;
-import static org.apache.hadoop.hdds.HddsConfigKeys.*;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
/**
* Test class that exercises the StorageContainerManager.
@@ -272,8 +276,6 @@ public class TestStorageContainerManager {
Map<Long, List<Long>> containerBlocks = createDeleteTXLog(delLog,
keyLocations, helper);
- Set<Long> containerIDs = containerBlocks.keySet();
-
// Verify a few TX gets created in the TX log.
Assert.assertTrue(delLog.getNumOfValidTransactions() > 0);
@@ -289,8 +291,7 @@ public class TestStorageContainerManager {
return false;
}
}, 1000, 10000);
- Assert.assertTrue(helper.getAllBlocks(containerIDs).isEmpty());
-
+ Assert.assertTrue(helper.verifyBlocksWithTxnTable(containerBlocks));
// Continue the work, add some TXs that with known container names,
// but unknown block IDs.
for (Long containerID : containerBlocks.keySet()) {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
index c67fe30..fe0e075 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.ozone;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -32,9 +33,14 @@ import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
+import org.apache.hadoop.ozone.container.metadata.DatanodeStore;
+import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaTwoImpl;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -137,6 +143,30 @@ public class TestStorageContainerManagerHelper {
return allBlocks;
}
+ public boolean verifyBlocksWithTxnTable(Map<Long, List<Long>> containerBlocks)
+ throws IOException {
+ Set<Long> containerIDs = containerBlocks.keySet();
+ for (Long entry : containerIDs) {
+ ReferenceCountedDB meta = getContainerMetadata(entry);
+ DatanodeStore ds = meta.getStore();
+ DatanodeStoreSchemaTwoImpl dnStoreTwoImpl =
+ (DatanodeStoreSchemaTwoImpl) ds;
+ List<? extends Table.KeyValue<Long, DeletedBlocksTransaction>>
+ txnsInTxnTable = dnStoreTwoImpl.getDeleteTransactionTable()
+ .getRangeKVs(null, Integer.MAX_VALUE, null);
+ List<Long> conID = new ArrayList<>();
+ for (Table.KeyValue<Long, DeletedBlocksTransaction> txn :
+ txnsInTxnTable) {
+ conID.addAll(txn.getValue().getLocalIDList());
+ }
+ if (!conID.equals(containerBlocks.get(entry))) {
+ return false;
+ }
+ meta.close();
+ }
+ return true;
+ }
+
private ReferenceCountedDB getContainerMetadata(Long containerID)
throws IOException {
ContainerWithPipeline containerWithPipeline = cluster
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org