You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sh...@apache.org on 2021/01/25 13:23:48 UTC
[ozone] branch master updated: HDDS-4477. Delete txnId in
SCMMetadataStoreImpl may drop to 0 after SCM restart. (#1828)
This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new b9d0f55 HDDS-4477. Delete txnId in SCMMetadataStoreImpl may drop to 0 after SCM restart. (#1828)
b9d0f55 is described below
commit b9d0f55f63de88b1d60e73ca8caac46abcf0e58e
Author: Lokesh Jain <lj...@apache.org>
AuthorDate: Mon Jan 25 18:53:27 2021 +0530
HDDS-4477. Delete txnId in SCMMetadataStoreImpl may drop to 0 after SCM restart. (#1828)
---
.../hadoop/hdds/scm/block/BlockManagerImpl.java | 3 +-
.../hadoop/hdds/scm/block/DeletedBlockLogImpl.java | 102 ++++++++++++++++-----
.../hadoop/hdds/scm/metadata/SCMMetadataStore.java | 14 ---
.../hdds/scm/metadata/SCMMetadataStoreImpl.java | 30 ------
.../hadoop/hdds/scm/block/TestDeletedBlockLog.java | 11 ++-
...TestSCMStoreImplWithOldPipelineIDKeyFormat.java | 10 --
6 files changed, 89 insertions(+), 81 deletions(-)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index fae21b4..657a1f7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -90,7 +90,8 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
* @throws IOException
*/
public BlockManagerImpl(final ConfigurationSource conf,
- final StorageContainerManager scm) {
+ final StorageContainerManager scm)
+ throws IOException {
Objects.requireNonNull(scm, "SCM cannot be null");
this.pipelineManager = scm.getPipelineManager();
this.containerManager = scm.getContainerManager();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
index ac53f2c..69af37e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.block;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.Set;
@@ -26,6 +27,7 @@ import java.util.LinkedHashSet;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
@@ -70,6 +72,8 @@ public class DeletedBlockLogImpl
public static final Logger LOG =
LoggerFactory.getLogger(DeletedBlockLogImpl.class);
+ private static final DeletedBlocksTransaction.Builder DUMMY_TXN_BUILDER =
+ DeletedBlocksTransaction.newBuilder().setContainerID(1).setCount(1);
private final int maxRetry;
private final ContainerManager containerManager;
@@ -78,9 +82,15 @@ public class DeletedBlockLogImpl
// Maps txId to set of DNs which are successful in committing the transaction
private Map<Long, Set<UUID>> transactionToDNsCommitMap;
+ private final AtomicLong largestTxnId;
+ // largest transactionId is stored at largestTxnIdHolderKey
+ private final long largestTxnIdHolderKey = 0L;
+
+
public DeletedBlockLogImpl(ConfigurationSource conf,
ContainerManager containerManager,
- SCMMetadataStore scmMetadataStore) {
+ SCMMetadataStore scmMetadataStore)
+ throws IOException {
maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY,
OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT);
this.containerManager = containerManager;
@@ -92,8 +102,46 @@ public class DeletedBlockLogImpl
// maps transaction to dns which have committed it.
transactionToDNsCommitMap = new ConcurrentHashMap<>();
+
+ this.largestTxnId = new AtomicLong(this.getLargestRecordedTXID());
+ }
+
+ public Long getNextDeleteBlockTXID() {
+ return this.largestTxnId.incrementAndGet();
+ }
+
+ public Long getCurrentTXID() {
+ return this.largestTxnId.get();
}
+ /**
+ * Returns the largest recorded TXID from the DB.
+ *
+ * @return Long
+ * @throws IOException
+ */
+ private long getLargestRecordedTXID() throws IOException {
+ DeletedBlocksTransaction txn =
+ scmMetadataStore.getDeletedBlocksTXTable().get(largestTxnIdHolderKey);
+ long txnId = txn != null ? txn.getTxID() : 0L;
+ if (txn == null) {
+ // HDDS-4477 adds largestTxnIdHolderKey to table for storing largest
+ // transactionId. In case the key does not exist, fetch largest
+ // transactionId from existing transactions and update
+ // largestTxnIdHolderKey with same.
+ try (TableIterator<Long,
+ ? extends Table.KeyValue<Long, DeletedBlocksTransaction>> txIter =
+ getIterator()) {
+ txIter.seekToLast();
+ txnId = txIter.key() != null ? txIter.key() : 0L;
+ if (txnId > 0) {
+ scmMetadataStore.getDeletedBlocksTXTable().put(largestTxnIdHolderKey,
+ DUMMY_TXN_BUILDER.setTxID(txnId).build());
+ }
+ }
+ }
+ return txnId;
+ }
@Override
public List<DeletedBlocksTransaction> getFailedTransactions()
@@ -103,7 +151,7 @@ public class DeletedBlockLogImpl
final List<DeletedBlocksTransaction> failedTXs = Lists.newArrayList();
try (TableIterator<Long,
? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
- scmMetadataStore.getDeletedBlocksTXTable().iterator()) {
+ getIterator()) {
while (iter.hasNext()) {
DeletedBlocksTransaction delTX = iter.next().getValue();
if (delTX.getCount() == -1) {
@@ -270,15 +318,8 @@ public class DeletedBlockLogImpl
@Override
public void addTransaction(long containerID, List<Long> blocks)
throws IOException {
- lock.lock();
- try {
- Long nextTXID = scmMetadataStore.getNextDeleteBlockTXID();
- DeletedBlocksTransaction tx =
- constructNewTransaction(nextTXID, containerID, blocks);
- scmMetadataStore.getDeletedBlocksTXTable().put(nextTXID, tx);
- } finally {
- lock.unlock();
- }
+ Map<Long, List<Long>> map = Collections.singletonMap(containerID, blocks);
+ addTransactions(map);
}
@Override
@@ -288,7 +329,7 @@ public class DeletedBlockLogImpl
final AtomicInteger num = new AtomicInteger(0);
try (TableIterator<Long,
? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
- scmMetadataStore.getDeletedBlocksTXTable().iterator()) {
+ getIterator()) {
while (iter.hasNext()) {
DeletedBlocksTransaction delTX = iter.next().getValue();
if (delTX.getCount() > -1) {
@@ -312,19 +353,20 @@ public class DeletedBlockLogImpl
public void addTransactions(Map<Long, List<Long>> containerBlocksMap)
throws IOException {
lock.lock();
- try {
- try(BatchOperation batch =
- scmMetadataStore.getStore().initBatchOperation()) {
- for (Map.Entry< Long, List< Long > > entry :
- containerBlocksMap.entrySet()) {
- long nextTXID = scmMetadataStore.getNextDeleteBlockTXID();
- DeletedBlocksTransaction tx = constructNewTransaction(nextTXID,
- entry.getKey(), entry.getValue());
- scmMetadataStore.getDeletedBlocksTXTable().putWithBatch(batch,
- nextTXID, tx);
- }
- scmMetadataStore.getStore().commitBatchOperation(batch);
+ try (BatchOperation batch = scmMetadataStore.getStore()
+ .initBatchOperation()) {
+ for (Map.Entry<Long, List<Long>> entry : containerBlocksMap.entrySet()) {
+ long nextTXID = getNextDeleteBlockTXID();
+ scmMetadataStore.getDeletedBlocksTXTable().putWithBatch(batch, nextTXID,
+ constructNewTransaction(nextTXID, entry.getKey(),
+ entry.getValue()));
}
+ // Add a dummy transaction to store the largestTransactionId at
+ // largestTxnIdHolderKey
+ scmMetadataStore.getDeletedBlocksTXTable()
+ .putWithBatch(batch, largestTxnIdHolderKey,
+ DUMMY_TXN_BUILDER.setTxID(getCurrentTXID()).build());
+ scmMetadataStore.getStore().commitBatchOperation(batch);
} finally {
lock.unlock();
}
@@ -364,7 +406,7 @@ public class DeletedBlockLogImpl
new DatanodeDeletedBlockTransactions();
try (TableIterator<Long,
? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
- scmMetadataStore.getDeletedBlocksTXTable().iterator()) {
+ getIterator()) {
int numBlocksAdded = 0;
List<DeletedBlocksTransaction> txnsToBePurged =
new ArrayList<>();
@@ -406,6 +448,16 @@ public class DeletedBlockLogImpl
}
}
+ TableIterator<Long,
+ ? extends Table.KeyValue<Long, DeletedBlocksTransaction>> getIterator()
+ throws IOException {
+ TableIterator<Long,
+ ? extends Table.KeyValue<Long, DeletedBlocksTransaction>>
+ iter = scmMetadataStore.getDeletedBlocksTXTable().iterator();
+ iter.seek(largestTxnIdHolderKey + 1);
+ return iter;
+ }
+
@Override
public void onMessage(DeleteBlockStatus deleteBlockStatus,
EventPublisher publisher) {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java
index 0452c05..e11a7ac 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java
@@ -70,20 +70,6 @@ public interface SCMMetadataStore {
Table<Long, DeletedBlocksTransaction> getDeletedBlocksTXTable();
/**
- * Returns the current TXID for the deleted blocks.
- *
- * @return Long
- */
- Long getCurrentTXID();
-
- /**
- * Returns the next TXID for the Deleted Blocks.
- *
- * @return Long.
- */
- Long getNextDeleteBlockTXID();
-
- /**
* A table that maintains all the valid certificates issued by the SCM CA.
*
* @return Table
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java
index 4ab5457..51c6d9c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdds.scm.metadata;
import java.io.IOException;
import java.math.BigInteger;
import java.security.cert.X509Certificate;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
@@ -33,7 +32,6 @@ import org.apache.hadoop.hdds.utils.db.BatchOperationHandler;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
import org.apache.hadoop.hdds.utils.db.Table;
-import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
import org.apache.hadoop.hdds.utils.db.TableIterator;
import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.CONTAINERS;
@@ -64,7 +62,6 @@ public class SCMMetadataStoreImpl implements SCMMetadataStore {
LoggerFactory.getLogger(SCMMetadataStoreImpl.class);
private DBStore store;
private final OzoneConfiguration configuration;
- private final AtomicLong txID;
/**
* Constructs the metadata store and starts the DB Services.
@@ -76,7 +73,6 @@ public class SCMMetadataStoreImpl implements SCMMetadataStore {
throws IOException {
this.configuration = config;
start(this.configuration);
- this.txID = new AtomicLong(this.getLargestRecordedTXID());
}
@Override
@@ -124,10 +120,6 @@ public class SCMMetadataStoreImpl implements SCMMetadataStore {
return deletedBlocksTable;
}
- @Override
- public Long getNextDeleteBlockTXID() {
- return this.txID.incrementAndGet();
- }
@Override
public Table<BigInteger, X509Certificate> getValidCertsTable() {
@@ -167,28 +159,6 @@ public class SCMMetadataStoreImpl implements SCMMetadataStore {
return containerTable;
}
- @Override
- public Long getCurrentTXID() {
- return this.txID.get();
- }
-
- /**
- * Returns the largest recorded TXID from the DB.
- *
- * @return Long
- * @throws IOException
- */
- private Long getLargestRecordedTXID() throws IOException {
- try (TableIterator<Long, ? extends KeyValue<Long, DeletedBlocksTransaction>>
- txIter = deletedBlocksTable.iterator()) {
- txIter.seekToLast();
- Long txid = txIter.key();
- if (txid != null) {
- return txid;
- }
- }
- return 0L;
- }
private void checkTableStatus(Table table, String name) throws IOException {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
index 8f2b3f5..61069ca 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
@@ -300,7 +300,7 @@ public class TestDeletedBlockLog {
// verify the number of added and committed.
try (TableIterator<Long,
? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
- scm.getScmMetadataStore().getDeletedBlocksTXTable().iterator()) {
+ deletedBlockLog.getIterator()) {
AtomicInteger count = new AtomicInteger();
iter.forEachRemaining((keyValue) -> count.incrementAndGet());
Assert.assertEquals(added, count.get() + committed);
@@ -328,6 +328,15 @@ public class TestDeletedBlockLog {
blocks = getTransactions(BLOCKS_PER_TXN * 40);
Assert.assertEquals(40, blocks.size());
commitTransactions(blocks);
+
+ // close db and reopen it again to make sure
+ // currentTxnID = 50
+ deletedBlockLog.close();
+ deletedBlockLog = new DeletedBlockLogImpl(conf, containerManager,
+ scm.getScmMetadataStore());
+ blocks = getTransactions(BLOCKS_PER_TXN * 40);
+ Assert.assertEquals(0, blocks.size());
+ Assert.assertEquals((long)deletedBlockLog.getCurrentTXID(), 50L);
}
@Test
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java
index a04ecea..400d872 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java
@@ -87,16 +87,6 @@ public class TestSCMStoreImplWithOldPipelineIDKeyFormat
}
@Override
- public Long getCurrentTXID() {
- return null;
- }
-
- @Override
- public Long getNextDeleteBlockTXID() {
- return null;
- }
-
- @Override
public Table<BigInteger, X509Certificate> getValidCertsTable() {
return null;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org