You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ww...@apache.org on 2017/08/24 05:46:53 UTC
hadoop git commit: HDFS-12283. Ozone: DeleteKey-5: Implement SCM
DeletedBlockLog. Contributed by Yuanbo Liu.
Repository: hadoop
Updated Branches:
refs/heads/HDFS-7240 9c789e883 -> 91ffbaa8b
HDFS-12283. Ozone: DeleteKey-5: Implement SCM DeletedBlockLog. Contributed by Yuanbo Liu.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/91ffbaa8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/91ffbaa8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/91ffbaa8
Branch: refs/heads/HDFS-7240
Commit: 91ffbaa8b9a3042a225d65c61a81bf41c53d5e33
Parents: 9c789e8
Author: Weiwei Yang <ww...@apache.org>
Authored: Thu Aug 24 13:46:03 2017 +0800
Committer: Weiwei Yang <ww...@apache.org>
Committed: Thu Aug 24 13:46:03 2017 +0800
----------------------------------------------------------------------
.../org/apache/hadoop/ozone/OzoneConsts.java | 1 +
.../org/apache/hadoop/scm/ScmConfigKeys.java | 3 +
.../ozone/scm/block/BlockManagerImpl.java | 6 +-
.../hadoop/ozone/scm/block/DeletedBlockLog.java | 78 ++++++
.../ozone/scm/block/DeletedBlockLogImpl.java | 246 +++++++++++++++++++
.../StorageContainerDatanodeProtocol.proto | 9 +
.../src/main/resources/ozone-default.xml | 13 +
.../ozone/scm/block/TestDeletedBlockLog.java | 240 ++++++++++++++++++
8 files changed, 595 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/91ffbaa8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 68f1e09..de8061a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -80,6 +80,7 @@ public final class OzoneConsts {
public static final String BLOCK_DB = "block.db";
public static final String NODEPOOL_DB = "nodepool.db";
public static final String OPEN_CONTAINERS_DB = "openContainers.db";
+ public static final String DELETED_BLOCK_DB = "deletedBlock.db";
public static final String KSM_DB_NAME = "ksm.db";
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/91ffbaa8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
index 0b081a1..44cc380 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
@@ -212,6 +212,9 @@ public final class ScmConfigKeys {
public static final int OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_DEFAULT =
300; // Default 5 minute wait.
+ public static final String OZONE_SCM_BLOCK_DELETION_MAX_RETRY =
+ "ozone.scm.block.deletion.max.retry";
+ public static final int OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT = 4096;
/**
* Never constructed.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/91ffbaa8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
index 5730589..43ca21c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
@@ -88,6 +88,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
// Track all containers owned by block service.
private final MetadataStore containerStore;
+ private final DeletedBlockLog deletedBlockLog;
private Map<OzoneProtos.LifeCycleState,
Map<String, BlockContainerInfo>> containers;
@@ -142,6 +143,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
this.lock = new ReentrantLock();
mxBean = MBeans.register("BlockManager", "BlockManagerImpl", this);
+ deletedBlockLog = new DeletedBlockLogImpl(conf);
}
// TODO: close full (or almost full) containers with a separate thread.
@@ -490,7 +492,9 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
if (containerStore != null) {
containerStore.close();
}
-
+ if (deletedBlockLog != null) {
+ deletedBlockLog.close();
+ }
MBeans.unregister(mxBean);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/91ffbaa8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java
new file mode 100644
index 0000000..60d53af
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.scm.block;
+
+
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * The DeletedBlockLog is a persisted log in SCM to keep tracking
+ * container blocks which are under deletion. It maintains info
+ * about under-deletion container blocks that notified by KSM,
+ * and the state how it is processed.
+ */
+public interface DeletedBlockLog extends Closeable {
+
+ /**
+ * A limit size list of transactions. Note count is the max number
+ * of TXs to return, we might not be able to always return this
+ * number. and the processCount of those transactions
+ * should be [0, MAX_RETRY).
+ *
+ * @param count - number of transactions.
+ * @return a list of BlockDeletionTransaction.
+ */
+ List<DeletedBlocksTransaction> getTransactions(int count)
+ throws IOException;
+
+ /**
+ * Increments count for given list of transactions by 1.
+ * The log maintains a valid range of counts for each transaction
+ * [0, MAX_RETRY]. If exceed this range, resets it to -1 to indicate
+ * the transaction is no longer valid.
+ *
+ * @param txIDs - transaction ID.
+ */
+ void incrementCount(List<Long> txIDs)
+ throws IOException;
+
+ /**
+ * Commits a transaction means to delete all footprints of a transaction
+ * from the log. This method doesn't guarantee all transactions can be
+ * successfully deleted, it tolerate failures and tries best efforts to.
+ *
+ * @param txIDs - transaction IDs.
+ */
+ void commitTransactions(List<Long> txIDs) throws IOException;
+
+ /**
+ * Creates a block deletion transaction and adds that into the log.
+ *
+ * @param containerName - container name.
+ * @param blocks - blocks that belong to the same container.
+ *
+ * @throws IOException
+ */
+ void addTransaction(String containerName, List<String> blocks)
+ throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/91ffbaa8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java
new file mode 100644
index 0000000..ef1a515
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.scm.block;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.primitives.Longs;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.hadoop.utils.BatchOperation;
+import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
+import org.apache.hadoop.utils.MetadataStore;
+import org.apache.hadoop.utils.MetadataStoreBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.hadoop.ozone.OzoneConsts.DELETED_BLOCK_DB;
+import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY;
+import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT;
+import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
+import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB;
+
+/**
+ * A implement class of {@link DeletedBlockLog}, and it uses
+ * K/V db to maintain block deletion transactions between scm and datanode.
+ * This is a very basic implementation, it simply scans the log and
+ * memorize the position that scanned by last time, and uses this to
+ * determine where the next scan starts. It has no notion about weight
+ * of each transaction so as long as transaction is still valid, they get
+ * equally same chance to be retrieved which only depends on the nature
+ * order of the transaction ID.
+ */
+public class DeletedBlockLogImpl implements DeletedBlockLog {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DeletedBlockLogImpl.class);
+
+ private static final byte[] LATEST_TXID =
+ DFSUtil.string2Bytes("#LATEST_TXID#");
+
+ private final int maxRetry;
+ private final MetadataStore deletedStore;
+ private final Lock lock;
+ // The latest id of deleted blocks in the db.
+ private long lastTxID;
+ private long lastReadTxID;
+
+ public DeletedBlockLogImpl(Configuration conf) throws IOException {
+ maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY,
+ OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT);
+
+ File metaDir = OzoneUtils.getScmMetadirPath(conf);
+ String scmMetaDataDir = metaDir.getPath();
+ File deletedLogDbPath = new File(scmMetaDataDir, DELETED_BLOCK_DB);
+ int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
+ OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
+ // Load store of all transactions.
+ deletedStore = MetadataStoreBuilder.newBuilder()
+ .setCreateIfMissing(true)
+ .setConf(conf)
+ .setDbFile(deletedLogDbPath)
+ .setCacheSize(cacheSize * OzoneConsts.MB)
+ .build();
+
+ this.lock = new ReentrantLock();
+ // start from the head of deleted store.
+ lastReadTxID = 0;
+ lastTxID = findLatestTxIDInStore();
+ }
+
+ @VisibleForTesting
+ MetadataStore getDeletedStore() {
+ return deletedStore;
+ }
+
+ /**
+ * There is no need to lock before reading because
+ * it's only used in construct method.
+ *
+ * @return latest txid.
+ * @throws IOException
+ */
+ private long findLatestTxIDInStore() throws IOException {
+ long txid = 0;
+ byte[] value = deletedStore.get(LATEST_TXID);
+ if (value != null) {
+ txid = Longs.fromByteArray(value);
+ }
+ return txid;
+ }
+
+ @Override
+ public List<DeletedBlocksTransaction> getTransactions(
+ int count) throws IOException {
+ List<DeletedBlocksTransaction> result = new ArrayList<>();
+ MetadataKeyFilter getNextTxID = (preKey, currentKey, nextKey)
+ -> Longs.fromByteArray(currentKey) > lastReadTxID;
+ MetadataKeyFilter avoidInvalidTxid = (preKey, currentKey, nextKey)
+ -> !Arrays.equals(LATEST_TXID, currentKey);
+ lock.lock();
+ try {
+ deletedStore.iterate(null, (key, value) -> {
+ if (getNextTxID.filterKey(null, key, null) &&
+ avoidInvalidTxid.filterKey(null, key, null)) {
+ DeletedBlocksTransaction block = DeletedBlocksTransaction
+ .parseFrom(value);
+ if (block.getCount() > -1 && block.getCount() <= maxRetry) {
+ result.add(block);
+ }
+ }
+ return result.size() < count;
+ });
+ // Scan the metadata from the beginning.
+ if (result.size() < count || result.size() < 1) {
+ lastReadTxID = 0;
+ } else {
+ lastReadTxID = result.get(result.size() - 1).getTxID();
+ }
+ } finally {
+ lock.unlock();
+ }
+ return result;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @param txIDs - transaction ID.
+ * @throws IOException
+ */
+ @Override
+ public void incrementCount(List<Long> txIDs) throws IOException {
+ BatchOperation batch = new BatchOperation();
+ lock.lock();
+ try {
+ for(Long txID : txIDs) {
+ try {
+ DeletedBlocksTransaction block = DeletedBlocksTransaction
+ .parseFrom(deletedStore.get(Longs.toByteArray(txID)));
+ DeletedBlocksTransaction.Builder builder = block.toBuilder();
+ if (block.getCount() > -1) {
+ builder.setCount(block.getCount() + 1);
+ }
+ // if the retry time exceeds the maxRetry value
+ // then set the retry value to -1, stop retrying, admins can
+ // analyze those blocks and purge them manually by SCMCli.
+ if (block.getCount() > maxRetry) {
+ builder.setCount(-1);
+ }
+ deletedStore.put(Longs.toByteArray(txID),
+ builder.build().toByteArray());
+ } catch (IOException ex) {
+ LOG.warn("Cannot increase count for txID " + txID, ex);
+ }
+ }
+ deletedStore.writeBatch(batch);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @param txIDs - transaction IDs.
+ * @throws IOException
+ */
+ @Override
+ public void commitTransactions(List<Long> txIDs) throws IOException {
+ lock.lock();
+ try {
+ for (Long txID : txIDs) {
+ try {
+ deletedStore.delete(Longs.toByteArray(txID));
+ } catch (IOException ex) {
+ LOG.warn("Cannot commit txID " + txID, ex);
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @param containerName - container name.
+ * @param blocks - blocks that belong to the same container.
+ * @throws IOException
+ */
+ @Override
+ public void addTransaction(String containerName, List<String> blocks)
+ throws IOException {
+ BatchOperation batch = new BatchOperation();
+ lock.lock();
+ try {
+ DeletedBlocksTransaction tx = DeletedBlocksTransaction.newBuilder()
+ .setTxID(lastTxID + 1)
+ .setContainerName(containerName)
+ .addAllBlockID(blocks)
+ .setCount(0)
+ .build();
+ byte[] key = Longs.toByteArray(lastTxID + 1);
+
+ batch.put(key, tx.toByteArray());
+ batch.put(LATEST_TXID, Longs.toByteArray(lastTxID + 1));
+
+ deletedStore.writeBatch(batch);
+ lastTxID += 1;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (deletedStore != null) {
+ deletedStore.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/91ffbaa8/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
index 8400ee0..a8cfa57 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -99,6 +99,15 @@ message ContainerInfo {
optional int64 keycount = 4;
}
+// The deleted blocks which are stored in deletedBlock.db of scm.
+message DeletedBlocksTransaction {
+ required int64 txID = 1;
+ required string containerName = 2;
+ repeated string blockID = 3;
+ // the retry time of sending deleting command to datanode.
+ required int32 count = 4;
+}
+
/**
A set of container reports, max count is generally set to
8192 since that keeps the size of the reports under 1 MB.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/91ffbaa8/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
index e39a5ec..b10a536 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
@@ -265,6 +265,19 @@
</property>
<property>
+ <name>ozone.scm.block.deletion.max.retry</name>
+ <value>4096</value>
+ <description>
+ SCM wraps up a number of blocks in a deletion transaction and send that
+ to datanode for physically deletion periodically. This property
+ determines how many times at most for SCM to retry sending a deletion
+ transaction to datanode. The default value 4096 is relatively big so
+ that SCM could try enough times before giving up, as the actual deletion
+ is async so time required is unpredictable.
+ </description>
+ </property>
+
+ <property>
<name>ozone.scm.heartbeat.log.warn.interval.count</name>
<value>10</value>
<description>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/91ffbaa8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/block/TestDeletedBlockLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/block/TestDeletedBlockLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/block/TestDeletedBlockLog.java
new file mode 100644
index 0000000..c1c87ab
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/block/TestDeletedBlockLog.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.scm.block;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.utils.MetadataKeyFilters;
+import org.apache.hadoop.utils.MetadataStore;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CONTAINER_METADATA_DIRS;
+import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY;
+
+/**
+ * Tests for DeletedBlockLog.
+ */
+public class TestDeletedBlockLog {
+
+ private static DeletedBlockLogImpl deletedBlockLog;
+ private OzoneConfiguration conf;
+ private File testDir;
+
+ @Before
+ public void setup() throws Exception {
+ testDir = GenericTestUtils.getTestDir(
+ TestDeletedBlockLog.class.getSimpleName());
+ conf = new OzoneConfiguration();
+ conf.setInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 20);
+ conf.set(OZONE_CONTAINER_METADATA_DIRS, testDir.getAbsolutePath());
+ deletedBlockLog = new DeletedBlockLogImpl(conf);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ deletedBlockLog.close();
+ FileUtils.deleteDirectory(testDir);
+ }
+
+ private Map<String, List<String>> generateData(int dataSize) {
+ Map<String, List<String>> blockMap = new HashMap<>();
+ Random random = new Random(1);
+ for (int i = 0; i < dataSize; i++) {
+ String containerName = "container-" + UUID.randomUUID().toString();
+ List<String> blocks = new ArrayList<>();
+ int blockSize = random.nextInt(30) + 1;
+ for (int j = 0; j < blockSize; j++) {
+ blocks.add("block-" + UUID.randomUUID().toString());
+ }
+ blockMap.put(containerName, blocks);
+ }
+ return blockMap;
+ }
+
+ @Test
+ public void testGetTransactions() throws Exception {
+ List<DeletedBlocksTransaction> blocks =
+ deletedBlockLog.getTransactions(30);
+ Assert.assertEquals(0, blocks.size());
+
+ // Creates 40 TX in the log.
+ for (Map.Entry<String, List<String>> entry : generateData(40).entrySet()){
+ deletedBlockLog.addTransaction(entry.getKey(), entry.getValue());
+ }
+
+ // Get first 30 TXs.
+ blocks = deletedBlockLog.getTransactions(30);
+ Assert.assertEquals(30, blocks.size());
+ for (int i = 0; i < 30; i++) {
+ Assert.assertEquals(i + 1, blocks.get(i).getTxID());
+ }
+
+ // Get another 30 TXs.
+ // The log only 10 left, so this time it will only return 10 TXs.
+ blocks = deletedBlockLog.getTransactions(30);
+ Assert.assertEquals(10, blocks.size());
+ for (int i = 30; i < 40; i++) {
+ Assert.assertEquals(i + 1, blocks.get(i - 30).getTxID());
+ }
+
+ // Get another 50 TXs.
+ // By now the position should have moved to the beginning,
+ // this call will return all 40 TXs.
+ blocks = deletedBlockLog.getTransactions(50);
+ Assert.assertEquals(40, blocks.size());
+ for (int i = 0; i < 40; i++) {
+ Assert.assertEquals(i + 1, blocks.get(i).getTxID());
+ }
+ List<Long> txIDs = new ArrayList<>();
+ for (DeletedBlocksTransaction block : blocks) {
+ txIDs.add(block.getTxID());
+ }
+ deletedBlockLog.commitTransactions(txIDs);
+ }
+
+ @Test
+ public void testIncrementCount() throws Exception {
+ int maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 20);
+
+ // Create 30 TXs in the log.
+ for (Map.Entry<String, List<String>> entry : generateData(30).entrySet()){
+ deletedBlockLog.addTransaction(entry.getKey(), entry.getValue());
+ }
+
+ // This will return all TXs, total num 30.
+ List<DeletedBlocksTransaction> blocks =
+ deletedBlockLog.getTransactions(40);
+ List<Long> txIDs = blocks.stream().map(DeletedBlocksTransaction::getTxID)
+ .collect(Collectors.toList());
+
+ for (int i = 0; i < maxRetry; i++) {
+ deletedBlockLog.incrementCount(txIDs);
+ }
+
+ // Increment another time so it exceed the maxRetry.
+ // On this call, count will be set to -1 which means TX eventually fails.
+ deletedBlockLog.incrementCount(txIDs);
+ blocks = deletedBlockLog.getTransactions(40);
+ for (DeletedBlocksTransaction block : blocks) {
+ Assert.assertEquals(-1, block.getCount());
+ }
+
+ // If all TXs are failed, getTransactions call will always return nothing.
+ blocks = deletedBlockLog.getTransactions(40);
+ Assert.assertEquals(blocks.size(), 0);
+ }
+
+ @Test
+ public void testCommitTransactions() throws Exception {
+ for (Map.Entry<String, List<String>> entry : generateData(50).entrySet()){
+ deletedBlockLog.addTransaction(entry.getKey(), entry.getValue());
+ }
+ List<DeletedBlocksTransaction> blocks =
+ deletedBlockLog.getTransactions(20);
+ List<Long> txIDs = new ArrayList<>();
+ for (DeletedBlocksTransaction block : blocks) {
+ txIDs.add(block.getTxID());
+ }
+ // Add an invalid txID.
+ txIDs.add(70L);
+ deletedBlockLog.commitTransactions(txIDs);
+ blocks = deletedBlockLog.getTransactions(50);
+ Assert.assertEquals(30, blocks.size());
+ }
+
+ @Test
+ public void testRandomOperateTransactions() throws Exception {
+ Random random = new Random();
+ int added = 0, committed = 0;
+ List<DeletedBlocksTransaction> blocks = new ArrayList<>();
+ List<Long> txIDs = new ArrayList<>();
+ byte[] latestTxid = DFSUtil.string2Bytes("#LATEST_TXID#");
+ MetadataKeyFilters.MetadataKeyFilter avoidLatestTxid =
+ (preKey, currentKey, nextKey) ->
+ !Arrays.equals(latestTxid, currentKey);
+ MetadataStore store = deletedBlockLog.getDeletedStore();
+ // Randomly add/get/commit/increase transactions.
+ for (int i = 0; i < 100; i++) {
+ int state = random.nextInt(4);
+ if (state == 0) {
+ for (Map.Entry<String, List<String>> entry :
+ generateData(10).entrySet()){
+ deletedBlockLog.addTransaction(entry.getKey(), entry.getValue());
+ }
+ added += 10;
+ } else if (state == 1) {
+ blocks = deletedBlockLog.getTransactions(20);
+ txIDs = new ArrayList<>();
+ for (DeletedBlocksTransaction block : blocks) {
+ txIDs.add(block.getTxID());
+ }
+ deletedBlockLog.incrementCount(txIDs);
+ } else if (state == 2) {
+ txIDs = new ArrayList<>();
+ for (DeletedBlocksTransaction block : blocks) {
+ txIDs.add(block.getTxID());
+ }
+ blocks = new ArrayList<>();
+ committed += txIDs.size();
+ deletedBlockLog.commitTransactions(txIDs);
+ } else {
+ // verify the number of added and committed.
+ List<Map.Entry<byte[], byte[]>> result =
+ store.getRangeKVs(null, added, avoidLatestTxid);
+ Assert.assertEquals(added, result.size() + committed);
+ }
+ }
+ }
+
+ @Test
+ public void testPersistence() throws Exception {
+ for (Map.Entry<String, List<String>> entry : generateData(50).entrySet()){
+ deletedBlockLog.addTransaction(entry.getKey(), entry.getValue());
+ }
+ // close db and reopen it again to make sure
+ // transactions are stored persistently.
+ deletedBlockLog.close();
+ deletedBlockLog = new DeletedBlockLogImpl(conf);
+ List<DeletedBlocksTransaction> blocks =
+ deletedBlockLog.getTransactions(10);
+ List<Long> txIDs = new ArrayList<>();
+ for (DeletedBlocksTransaction block : blocks) {
+ txIDs.add(block.getTxID());
+ }
+ deletedBlockLog.commitTransactions(txIDs);
+ blocks = deletedBlockLog.getTransactions(10);
+ Assert.assertEquals(10, blocks.size());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org