You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ww...@apache.org on 2017/08/24 05:46:53 UTC

hadoop git commit: HDFS-12283. Ozone: DeleteKey-5: Implement SCM DeletedBlockLog. Contributed by Yuanbo Liu.

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7240 9c789e883 -> 91ffbaa8b


HDFS-12283. Ozone: DeleteKey-5: Implement SCM DeletedBlockLog. Contributed by Yuanbo Liu.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/91ffbaa8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/91ffbaa8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/91ffbaa8

Branch: refs/heads/HDFS-7240
Commit: 91ffbaa8b9a3042a225d65c61a81bf41c53d5e33
Parents: 9c789e8
Author: Weiwei Yang <ww...@apache.org>
Authored: Thu Aug 24 13:46:03 2017 +0800
Committer: Weiwei Yang <ww...@apache.org>
Committed: Thu Aug 24 13:46:03 2017 +0800

----------------------------------------------------------------------
 .../org/apache/hadoop/ozone/OzoneConsts.java    |   1 +
 .../org/apache/hadoop/scm/ScmConfigKeys.java    |   3 +
 .../ozone/scm/block/BlockManagerImpl.java       |   6 +-
 .../hadoop/ozone/scm/block/DeletedBlockLog.java |  78 ++++++
 .../ozone/scm/block/DeletedBlockLogImpl.java    | 246 +++++++++++++++++++
 .../StorageContainerDatanodeProtocol.proto      |   9 +
 .../src/main/resources/ozone-default.xml        |  13 +
 .../ozone/scm/block/TestDeletedBlockLog.java    | 240 ++++++++++++++++++
 8 files changed, 595 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/91ffbaa8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 68f1e09..de8061a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -80,6 +80,7 @@ public final class OzoneConsts {
   public static final String BLOCK_DB = "block.db";
   public static final String NODEPOOL_DB = "nodepool.db";
   public static final String OPEN_CONTAINERS_DB = "openContainers.db";
+  public static final String DELETED_BLOCK_DB = "deletedBlock.db";
   public static final String KSM_DB_NAME = "ksm.db";
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/91ffbaa8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
index 0b081a1..44cc380 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
@@ -212,6 +212,9 @@ public final class ScmConfigKeys {
   public static final int OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_DEFAULT =
       300; // Default 5 minute wait.
 
+  public static final String OZONE_SCM_BLOCK_DELETION_MAX_RETRY =
+      "ozone.scm.block.deletion.max.retry";
+  public static final int OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT = 4096;
 
   /**
    * Never constructed.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/91ffbaa8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
index 5730589..43ca21c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
@@ -88,6 +88,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
 
   // Track all containers owned by block service.
   private final MetadataStore containerStore;
+  private final DeletedBlockLog deletedBlockLog;
 
   private Map<OzoneProtos.LifeCycleState,
       Map<String, BlockContainerInfo>> containers;
@@ -142,6 +143,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
     this.lock = new ReentrantLock();
 
     mxBean = MBeans.register("BlockManager", "BlockManagerImpl", this);
+    deletedBlockLog = new DeletedBlockLogImpl(conf);
   }
 
   // TODO: close full (or almost full) containers with a separate thread.
@@ -490,7 +492,9 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
     if (containerStore != null) {
       containerStore.close();
     }
-
+    if (deletedBlockLog != null) {
+      deletedBlockLog.close();
+    }
     MBeans.unregister(mxBean);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/91ffbaa8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java
new file mode 100644
index 0000000..60d53af
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.scm.block;
+
+
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * The DeletedBlockLog is a persisted log in SCM to keep tracking
+ * container blocks which are under deletion. It maintains info
+ * about under-deletion container blocks that notified by KSM,
+ * and the state how it is processed.
+ */
+public interface DeletedBlockLog extends Closeable {
+
+  /**
+   *  A limit size list of transactions. Note count is the max number
+   *  of TXs to return, we might not be able to always return this
+   *  number. and the processCount of those transactions
+   *  should be [0, MAX_RETRY).
+   *
+   * @param count - number of transactions.
+   * @return a list of BlockDeletionTransaction.
+   */
+  List<DeletedBlocksTransaction> getTransactions(int count)
+      throws IOException;
+
+  /**
+   * Increments count for given list of transactions by 1.
+   * The log maintains a valid range of counts for each transaction
+   * [0, MAX_RETRY]. If exceed this range, resets it to -1 to indicate
+   * the transaction is no longer valid.
+   *
+   * @param txIDs - transaction ID.
+   */
+  void incrementCount(List<Long> txIDs)
+      throws IOException;
+
+  /**
+   * Commits a transaction means to delete all footprints of a transaction
+   * from the log. This method doesn't guarantee all transactions can be
+   * successfully deleted, it tolerate failures and tries best efforts to.
+   *
+   * @param txIDs - transaction IDs.
+   */
+  void commitTransactions(List<Long> txIDs) throws IOException;
+
+  /**
+   * Creates a block deletion transaction and adds that into the log.
+   *
+   * @param containerName - container name.
+   * @param blocks - blocks that belong to the same container.
+   *
+   * @throws IOException
+   */
+  void addTransaction(String containerName, List<String> blocks)
+      throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/91ffbaa8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java
new file mode 100644
index 0000000..ef1a515
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.scm.block;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.primitives.Longs;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.hadoop.utils.BatchOperation;
+import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
+import org.apache.hadoop.utils.MetadataStore;
+import org.apache.hadoop.utils.MetadataStoreBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.hadoop.ozone.OzoneConsts.DELETED_BLOCK_DB;
+import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY;
+import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT;
+import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
+import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB;
+
+/**
+ * A implement class of {@link DeletedBlockLog}, and it uses
+ * K/V db to maintain block deletion transactions between scm and datanode.
+ * This is a very basic implementation, it simply scans the log and
+ * memorize the position that scanned by last time, and uses this to
+ * determine where the next scan starts. It has no notion about weight
+ * of each transaction so as long as transaction is still valid, they get
+ * equally same chance to be retrieved which only depends on the nature
+ * order of the transaction ID.
+ */
+public class DeletedBlockLogImpl implements DeletedBlockLog {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DeletedBlockLogImpl.class);
+
+  private static final byte[] LATEST_TXID =
+      DFSUtil.string2Bytes("#LATEST_TXID#");
+
+  private final int maxRetry;
+  private final MetadataStore deletedStore;
+  private final Lock lock;
+  // The latest id of deleted blocks in the db.
+  private long lastTxID;
+  private long lastReadTxID;
+
+  public DeletedBlockLogImpl(Configuration conf) throws IOException {
+    maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY,
+        OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT);
+
+    File metaDir = OzoneUtils.getScmMetadirPath(conf);
+    String scmMetaDataDir = metaDir.getPath();
+    File deletedLogDbPath = new File(scmMetaDataDir, DELETED_BLOCK_DB);
+    int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
+        OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
+    // Load store of all transactions.
+    deletedStore = MetadataStoreBuilder.newBuilder()
+        .setCreateIfMissing(true)
+        .setConf(conf)
+        .setDbFile(deletedLogDbPath)
+        .setCacheSize(cacheSize * OzoneConsts.MB)
+        .build();
+
+    this.lock = new ReentrantLock();
+    // start from the head of deleted store.
+    lastReadTxID = 0;
+    lastTxID = findLatestTxIDInStore();
+  }
+
+  @VisibleForTesting
+  MetadataStore getDeletedStore() {
+    return deletedStore;
+  }
+
+  /**
+   * There is no need to lock before reading because
+   * it's only used in construct method.
+   *
+   * @return latest txid.
+   * @throws IOException
+   */
+  private long findLatestTxIDInStore() throws IOException {
+    long txid = 0;
+    byte[] value = deletedStore.get(LATEST_TXID);
+    if (value != null) {
+      txid = Longs.fromByteArray(value);
+    }
+    return txid;
+  }
+
+  @Override
+  public List<DeletedBlocksTransaction> getTransactions(
+      int count) throws IOException {
+    List<DeletedBlocksTransaction> result = new ArrayList<>();
+    MetadataKeyFilter getNextTxID = (preKey, currentKey, nextKey)
+        -> Longs.fromByteArray(currentKey) > lastReadTxID;
+    MetadataKeyFilter avoidInvalidTxid = (preKey, currentKey, nextKey)
+        -> !Arrays.equals(LATEST_TXID, currentKey);
+    lock.lock();
+    try {
+      deletedStore.iterate(null, (key, value) -> {
+        if (getNextTxID.filterKey(null, key, null) &&
+            avoidInvalidTxid.filterKey(null, key, null)) {
+          DeletedBlocksTransaction block = DeletedBlocksTransaction
+              .parseFrom(value);
+          if (block.getCount() > -1 && block.getCount() <= maxRetry) {
+            result.add(block);
+          }
+        }
+        return result.size() < count;
+      });
+      // Scan the metadata from the beginning.
+      if (result.size() < count || result.size() < 1) {
+        lastReadTxID = 0;
+      } else {
+        lastReadTxID = result.get(result.size() - 1).getTxID();
+      }
+    } finally {
+      lock.unlock();
+    }
+    return result;
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @param txIDs - transaction ID.
+   * @throws IOException
+   */
+  @Override
+  public void incrementCount(List<Long> txIDs) throws IOException {
+    BatchOperation batch = new BatchOperation();
+    lock.lock();
+    try {
+      for(Long txID : txIDs) {
+        try {
+          DeletedBlocksTransaction block = DeletedBlocksTransaction
+              .parseFrom(deletedStore.get(Longs.toByteArray(txID)));
+          DeletedBlocksTransaction.Builder builder = block.toBuilder();
+          if (block.getCount() > -1) {
+            builder.setCount(block.getCount() + 1);
+          }
+          // if the retry time exceeds the maxRetry value
+          // then set the retry value to -1, stop retrying, admins can
+          // analyze those blocks and purge them manually by SCMCli.
+          if (block.getCount() > maxRetry) {
+            builder.setCount(-1);
+          }
+          deletedStore.put(Longs.toByteArray(txID),
+              builder.build().toByteArray());
+        } catch (IOException ex) {
+          LOG.warn("Cannot increase count for txID " + txID, ex);
+        }
+      }
+      deletedStore.writeBatch(batch);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @param txIDs - transaction IDs.
+   * @throws IOException
+   */
+  @Override
+  public void commitTransactions(List<Long> txIDs) throws IOException {
+    lock.lock();
+    try {
+      for (Long txID : txIDs) {
+        try {
+          deletedStore.delete(Longs.toByteArray(txID));
+        } catch (IOException ex) {
+          LOG.warn("Cannot commit txID " + txID, ex);
+        }
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @param containerName - container name.
+   * @param blocks - blocks that belong to the same container.
+   * @throws IOException
+   */
+  @Override
+  public void addTransaction(String containerName, List<String> blocks)
+      throws IOException {
+    BatchOperation batch = new BatchOperation();
+    lock.lock();
+    try {
+      DeletedBlocksTransaction tx = DeletedBlocksTransaction.newBuilder()
+          .setTxID(lastTxID + 1)
+          .setContainerName(containerName)
+          .addAllBlockID(blocks)
+          .setCount(0)
+          .build();
+      byte[] key = Longs.toByteArray(lastTxID + 1);
+
+      batch.put(key, tx.toByteArray());
+      batch.put(LATEST_TXID, Longs.toByteArray(lastTxID + 1));
+
+      deletedStore.writeBatch(batch);
+      lastTxID += 1;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (deletedStore != null) {
+      deletedStore.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/91ffbaa8/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
index 8400ee0..a8cfa57 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -99,6 +99,15 @@ message ContainerInfo {
   optional int64 keycount = 4;
 }
 
+// The deleted blocks which are stored in deletedBlock.db of scm.
+message DeletedBlocksTransaction {
+  required int64 txID = 1;
+  required string containerName = 2;
+  repeated string blockID = 3;
+  // the retry time of sending deleting command to datanode.
+  required int32 count = 4;
+}
+
 /**
 A set of container reports, max count is generally set to
 8192 since that keeps the size of the reports under 1 MB.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/91ffbaa8/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
index e39a5ec..b10a536 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
@@ -265,6 +265,19 @@
   </property>
 
   <property>
+    <name>ozone.scm.block.deletion.max.retry</name>
+    <value>4096</value>
+    <description>
+      SCM wraps up a number of blocks in a deletion transaction and send that
+      to datanode for physically deletion periodically. This property
+      determines how many times at most for SCM to retry sending a deletion
+      transaction to datanode. The default value 4096 is relatively big so
+      that SCM could try enough times before giving up, as the actual deletion
+      is async so time required is unpredictable.
+    </description>
+  </property>
+
+  <property>
     <name>ozone.scm.heartbeat.log.warn.interval.count</name>
     <value>10</value>
     <description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/91ffbaa8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/block/TestDeletedBlockLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/block/TestDeletedBlockLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/block/TestDeletedBlockLog.java
new file mode 100644
index 0000000..c1c87ab
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/block/TestDeletedBlockLog.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.scm.block;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.utils.MetadataKeyFilters;
+import org.apache.hadoop.utils.MetadataStore;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CONTAINER_METADATA_DIRS;
+import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY;
+
+/**
+ * Tests for DeletedBlockLog.
+ */
+public class TestDeletedBlockLog {
+
+  private static DeletedBlockLogImpl deletedBlockLog;
+  private OzoneConfiguration conf;
+  private File testDir;
+
+  @Before
+  public void setup() throws Exception {
+    testDir = GenericTestUtils.getTestDir(
+        TestDeletedBlockLog.class.getSimpleName());
+    conf = new OzoneConfiguration();
+    conf.setInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 20);
+    conf.set(OZONE_CONTAINER_METADATA_DIRS, testDir.getAbsolutePath());
+    deletedBlockLog = new DeletedBlockLogImpl(conf);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    deletedBlockLog.close();
+    FileUtils.deleteDirectory(testDir);
+  }
+
+  private Map<String, List<String>> generateData(int dataSize) {
+    Map<String, List<String>> blockMap = new HashMap<>();
+    Random random = new Random(1);
+    for (int i = 0; i < dataSize; i++) {
+      String containerName = "container-" + UUID.randomUUID().toString();
+      List<String> blocks = new ArrayList<>();
+      int blockSize = random.nextInt(30) + 1;
+      for (int j = 0; j < blockSize; j++)  {
+        blocks.add("block-" + UUID.randomUUID().toString());
+      }
+      blockMap.put(containerName, blocks);
+    }
+    return blockMap;
+  }
+
+  @Test
+  public void testGetTransactions() throws Exception {
+    List<DeletedBlocksTransaction> blocks =
+        deletedBlockLog.getTransactions(30);
+    Assert.assertEquals(0, blocks.size());
+
+    // Creates 40 TX in the log.
+    for (Map.Entry<String, List<String>> entry : generateData(40).entrySet()){
+      deletedBlockLog.addTransaction(entry.getKey(), entry.getValue());
+    }
+
+    // Get first 30 TXs.
+    blocks = deletedBlockLog.getTransactions(30);
+    Assert.assertEquals(30, blocks.size());
+    for (int i = 0; i < 30; i++) {
+      Assert.assertEquals(i + 1, blocks.get(i).getTxID());
+    }
+
+    // Get another 30 TXs.
+    // The log only 10 left, so this time it will only return 10 TXs.
+    blocks = deletedBlockLog.getTransactions(30);
+    Assert.assertEquals(10, blocks.size());
+    for (int i = 30; i < 40; i++) {
+      Assert.assertEquals(i + 1, blocks.get(i - 30).getTxID());
+    }
+
+    // Get another 50 TXs.
+    // By now the position should have moved to the beginning,
+    // this call will return all 40 TXs.
+    blocks = deletedBlockLog.getTransactions(50);
+    Assert.assertEquals(40, blocks.size());
+    for (int i = 0; i < 40; i++) {
+      Assert.assertEquals(i + 1, blocks.get(i).getTxID());
+    }
+    List<Long> txIDs = new ArrayList<>();
+    for (DeletedBlocksTransaction block : blocks) {
+      txIDs.add(block.getTxID());
+    }
+    deletedBlockLog.commitTransactions(txIDs);
+  }
+
+  @Test
+  public void testIncrementCount() throws Exception {
+    int maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 20);
+
+    // Create 30 TXs in the log.
+    for (Map.Entry<String, List<String>> entry : generateData(30).entrySet()){
+      deletedBlockLog.addTransaction(entry.getKey(), entry.getValue());
+    }
+
+    // This will return all TXs, total num 30.
+    List<DeletedBlocksTransaction> blocks =
+        deletedBlockLog.getTransactions(40);
+    List<Long> txIDs = blocks.stream().map(DeletedBlocksTransaction::getTxID)
+        .collect(Collectors.toList());
+
+    for (int i = 0; i < maxRetry; i++) {
+      deletedBlockLog.incrementCount(txIDs);
+    }
+
+    // Increment another time so it exceed the maxRetry.
+    // On this call, count will be set to -1 which means TX eventually fails.
+    deletedBlockLog.incrementCount(txIDs);
+    blocks = deletedBlockLog.getTransactions(40);
+    for (DeletedBlocksTransaction block : blocks) {
+      Assert.assertEquals(-1, block.getCount());
+    }
+
+    // If all TXs are failed, getTransactions call will always return nothing.
+    blocks = deletedBlockLog.getTransactions(40);
+    Assert.assertEquals(blocks.size(), 0);
+  }
+
+  @Test
+  public void testCommitTransactions() throws Exception {
+    for (Map.Entry<String, List<String>> entry : generateData(50).entrySet()){
+      deletedBlockLog.addTransaction(entry.getKey(), entry.getValue());
+    }
+    List<DeletedBlocksTransaction> blocks =
+        deletedBlockLog.getTransactions(20);
+    List<Long> txIDs = new ArrayList<>();
+    for (DeletedBlocksTransaction block : blocks) {
+      txIDs.add(block.getTxID());
+    }
+    // Add an invalid txID.
+    txIDs.add(70L);
+    deletedBlockLog.commitTransactions(txIDs);
+    blocks = deletedBlockLog.getTransactions(50);
+    Assert.assertEquals(30, blocks.size());
+  }
+
+  @Test
+  public void testRandomOperateTransactions() throws Exception {
+    Random random = new Random();
+    int added = 0, committed = 0;
+    List<DeletedBlocksTransaction> blocks = new ArrayList<>();
+    List<Long> txIDs = new ArrayList<>();
+    byte[] latestTxid = DFSUtil.string2Bytes("#LATEST_TXID#");
+    MetadataKeyFilters.MetadataKeyFilter avoidLatestTxid =
+        (preKey, currentKey, nextKey) ->
+            !Arrays.equals(latestTxid, currentKey);
+    MetadataStore store = deletedBlockLog.getDeletedStore();
+    // Randomly add/get/commit/increase transactions.
+    for (int i = 0; i < 100; i++) {
+      int state = random.nextInt(4);
+      if (state == 0) {
+        for (Map.Entry<String, List<String>> entry :
+            generateData(10).entrySet()){
+          deletedBlockLog.addTransaction(entry.getKey(), entry.getValue());
+        }
+        added += 10;
+      } else if (state == 1) {
+        blocks = deletedBlockLog.getTransactions(20);
+        txIDs = new ArrayList<>();
+        for (DeletedBlocksTransaction block : blocks) {
+          txIDs.add(block.getTxID());
+        }
+        deletedBlockLog.incrementCount(txIDs);
+      } else if (state == 2) {
+        txIDs = new ArrayList<>();
+        for (DeletedBlocksTransaction block : blocks) {
+          txIDs.add(block.getTxID());
+        }
+        blocks = new ArrayList<>();
+        committed += txIDs.size();
+        deletedBlockLog.commitTransactions(txIDs);
+      } else {
+        // verify the number of added and committed.
+        List<Map.Entry<byte[], byte[]>> result =
+            store.getRangeKVs(null, added, avoidLatestTxid);
+        Assert.assertEquals(added, result.size() + committed);
+      }
+    }
+  }
+
+  @Test
+  public void testPersistence() throws Exception {
+    for (Map.Entry<String, List<String>> entry : generateData(50).entrySet()){
+      deletedBlockLog.addTransaction(entry.getKey(), entry.getValue());
+    }
+    // close db and reopen it again to make sure
+    // transactions are stored persistently.
+    deletedBlockLog.close();
+    deletedBlockLog = new DeletedBlockLogImpl(conf);
+    List<DeletedBlocksTransaction> blocks =
+        deletedBlockLog.getTransactions(10);
+    List<Long> txIDs = new ArrayList<>();
+    for (DeletedBlocksTransaction block : blocks) {
+      txIDs.add(block.getTxID());
+    }
+    deletedBlockLog.commitTransactions(txIDs);
+    blocks = deletedBlockLog.getTransactions(10);
+    Assert.assertEquals(10, blocks.size());
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org