You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sa...@apache.org on 2022/05/04 01:58:01 UTC

[ozone] branch master updated: HDDS-6685. Follower OM crashed when validating S3 auth info. (#3376)

This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 28170f2672 HDDS-6685. Follower OM crashed when validating S3 auth info. (#3376)
28170f2672 is described below

commit 28170f2672fdcba1e9b96a2c7c15e62981b52392
Author: Sammi Chen <sa...@apache.org>
AuthorDate: Wed May 4 09:57:51 2022 +0800

    HDDS-6685. Follower OM crashed when validating S3 auth info. (#3376)
---
 .../hadoop/ozone/om/TestOMRatisSnapshots.java      | 295 +++++++++++++++++++--
 .../org/apache/hadoop/ozone/om/OzoneManager.java   | 148 +++++++----
 .../ozone/om/ratis/OzoneManagerDoubleBuffer.java   |   7 +-
 .../ozone/om/ratis/OzoneManagerStateMachine.java   |  38 ++-
 ...TestOzoneManagerDoubleBufferWithOMResponse.java |   2 +-
 5 files changed, 409 insertions(+), 81 deletions(-)

diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
index 42bfd11da4..1c787a426f 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
@@ -22,18 +22,24 @@ import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
 import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneKeyDetails;
 import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.ozone.client.VolumeArgs;
 import org.apache.hadoop.hdds.utils.TransactionInfo;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
@@ -44,7 +50,9 @@ import org.apache.ratis.server.protocol.TermIndex;
 import static org.apache.hadoop.ozone.om.TestOzoneManagerHAWithData.createKey;
 import static org.junit.Assert.assertTrue;
 
+import org.assertj.core.api.Fail;
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -53,9 +61,9 @@ import org.slf4j.Logger;
 import org.slf4j.event.Level;
 
 /**
- * Tests the Ratis snaphsots feature in OM.
+ * Tests the Ratis snapshots feature in OM.
  */
-@Timeout(500)
+@Timeout(5000)
 public class TestOMRatisSnapshots {
 
   private MiniOzoneHAClusterImpl cluster = null;
@@ -86,6 +94,10 @@ public class TestOMRatisSnapshots {
     scmId = UUID.randomUUID().toString();
     omServiceId = "om-service-test1";
     conf.setInt(OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP, LOG_PURGE_GAP);
+    conf.setStorageSize(OMConfigKeys.OZONE_OM_RATIS_SEGMENT_SIZE_KEY, 16,
+        StorageUnit.KB);
+    conf.setStorageSize(OMConfigKeys.
+        OZONE_OM_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY, 16, StorageUnit.KB);
     conf.setLong(
         OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY,
         SNAPSHOT_THRESHOLD);
@@ -154,11 +166,112 @@ public class TestOMRatisSnapshots {
     long leaderOMSnapshotIndex = leaderOMTermIndex.getIndex();
     long leaderOMSnapshotTermIndex = leaderOMTermIndex.getTerm();
 
-    DBCheckpoint leaderDbCheckpoint =
-        leaderOM.getMetadataManager().getStore().getCheckpoint(false);
+    // Start the inactive OM. Checkpoint installation will happen spontaneously.
+    cluster.startInactiveOM(followerNodeId);
+    GenericTestUtils.LogCapturer logCapture =
+        GenericTestUtils.LogCapturer.captureLogs(OzoneManager.LOG);
+
+    // The recently started OM should be lagging behind the leader OM.
+    // Wait & for follower to update transactions to leader snapshot index.
+    // Timeout error if follower does not load update within 3s
+    GenericTestUtils.waitFor(() -> {
+      return followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex()
+          >= leaderOMSnapshotIndex - 1;
+    }, 100, 3000);
+
+    long followerOMLastAppliedIndex =
+        followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex();
+    assertTrue(
+        followerOMLastAppliedIndex >= leaderOMSnapshotIndex - 1);
+
+    // After the new checkpoint is installed, the follower OM
+    // lastAppliedIndex must >= the snapshot index of the checkpoint. It
+    // could be great than snapshot index if there is any conf entry from ratis.
+    followerOMLastAppliedIndex = followerOM.getOmRatisServer()
+        .getLastAppliedTermIndex().getIndex();
+    assertTrue(followerOMLastAppliedIndex >= leaderOMSnapshotIndex);
+    assertTrue(followerOM.getOmRatisServer().getLastAppliedTermIndex()
+        .getTerm() >= leaderOMSnapshotTermIndex);
+
+    // Verify checkpoint installation was happened.
+    String msg = "Reloaded OM state";
+    Assert.assertTrue(logCapture.getOutput().contains(msg));
+
+    // Verify that the follower OM's DB contains the transactions which were
+    // made while it was inactive.
+    OMMetadataManager followerOMMetaMngr = followerOM.getMetadataManager();
+    Assert.assertNotNull(followerOMMetaMngr.getVolumeTable().get(
+        followerOMMetaMngr.getVolumeKey(volumeName)));
+    Assert.assertNotNull(followerOMMetaMngr.getBucketTable().get(
+        followerOMMetaMngr.getBucketKey(volumeName, bucketName)));
+    for (String key : keys) {
+      Assert.assertNotNull(followerOMMetaMngr.getKeyTable(
+          getDefaultBucketLayout())
+          .get(followerOMMetaMngr.getOzoneKey(volumeName, bucketName, key)));
+    }
+
+    // Verify RPC server is running
+    GenericTestUtils.waitFor(() -> {
+      return followerOM.isOmRpcServerRunning();
+    }, 100, 5000);
+
+    Assert.assertTrue(logCapture.getOutput().contains(
+        "Install Checkpoint is finished"));
+
+    // Read & Write after snapshot installed.
+    List<String> newKeys = writeKeys(1);
+    readKeys(newKeys);
+    // TODO: Enable this part after RATIS-1481 used
+    /*
+    Assert.assertNotNull(followerOMMetaMngr.getKeyTable(
+        getDefaultBucketLayout()).get(followerOMMetaMngr.getOzoneKey(
+        volumeName, bucketName, newKeys.get(0))));
+     */
+  }
 
-    // Start the inactive OM
+  @Ignore("Enable this unit test after RATIS-1481 used")
+  public void testInstallSnapshotWithClientWrite() throws Exception {
+    // Get the leader OM
+    String leaderOMNodeId = OmFailoverProxyUtil
+        .getFailoverProxyProvider(objectStore.getClientProxy())
+        .getCurrentProxyOMNodeId();
+
+    OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId);
+    OzoneManagerRatisServer leaderRatisServer = leaderOM.getOmRatisServer();
+
+    // Find the inactive OM
+    String followerNodeId = leaderOM.getPeerNodes().get(0).getNodeId();
+    if (cluster.isOMActive(followerNodeId)) {
+      followerNodeId = leaderOM.getPeerNodes().get(1).getNodeId();
+    }
+    OzoneManager followerOM = cluster.getOzoneManager(followerNodeId);
+
+    // Do some transactions so that the log index increases
+    List<String> keys = writeKeysToIncreaseLogIndex(leaderRatisServer, 200);
+
+    // Get the latest db checkpoint from the leader OM.
+    TransactionInfo transactionInfo =
+        TransactionInfo.readTransactionInfo(leaderOM.getMetadataManager());
+    TermIndex leaderOMTermIndex =
+        TermIndex.valueOf(transactionInfo.getTerm(),
+            transactionInfo.getTransactionIndex());
+    long leaderOMSnapshotIndex = leaderOMTermIndex.getIndex();
+    long leaderOMSnapshotTermIndex = leaderOMTermIndex.getTerm();
+
+    // Start the inactive OM. Checkpoint installation will happen spontaneously.
     cluster.startInactiveOM(followerNodeId);
+    GenericTestUtils.LogCapturer logCapture =
+        GenericTestUtils.LogCapturer.captureLogs(OzoneManager.LOG);
+
+    // Continuously create new keys
+    ExecutorService executor = Executors.newFixedThreadPool(1);
+    Future<List<String>> writeFuture = executor.submit(() -> {
+      return writeKeys(200);
+    });
+    List<String> newKeys = writeFuture.get();
+
+    // Wait checkpoint installation to finish
+    Thread.sleep(5000);
 
     // The recently started OM should be lagging behind the leader OM.
     // Wait & for follower to update transactions to leader snapshot index.
@@ -167,14 +280,116 @@ public class TestOMRatisSnapshots {
       return followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex()
           >= leaderOMSnapshotIndex - 1;
     }, 100, 3000);
-    
+
+    // Verify checkpoint installation was happened.
+    String msg = "Reloaded OM state";
+    Assert.assertTrue(logCapture.getOutput().contains(msg));
+    Assert.assertTrue(logCapture.getOutput().contains(
+        "Install Checkpoint is finished"));
+
     long followerOMLastAppliedIndex =
         followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex();
     assertTrue(
-        followerOMLastAppliedIndex >= leaderOMSnapshotIndex - 1);    
+        followerOMLastAppliedIndex >= leaderOMSnapshotIndex - 1);
 
-    // Install leader OM's db checkpoint on the lagging OM.
-    followerOM.installCheckpoint(leaderOMNodeId, leaderDbCheckpoint);
+    // After the new checkpoint is installed, the follower OM
+    // lastAppliedIndex must >= the snapshot index of the checkpoint. It
+    // could be great than snapshot index if there is any conf entry from ratis.
+    followerOMLastAppliedIndex = followerOM.getOmRatisServer()
+        .getLastAppliedTermIndex().getIndex();
+    assertTrue(followerOMLastAppliedIndex >= leaderOMSnapshotIndex);
+    assertTrue(followerOM.getOmRatisServer().getLastAppliedTermIndex()
+        .getTerm() >= leaderOMSnapshotTermIndex);
+
+    // Verify that the follower OM's DB contains the transactions which were
+    // made while it was inactive.
+    OMMetadataManager followerOMMetaMgr = followerOM.getMetadataManager();
+    Assert.assertNotNull(followerOMMetaMgr.getVolumeTable().get(
+        followerOMMetaMgr.getVolumeKey(volumeName)));
+    Assert.assertNotNull(followerOMMetaMgr.getBucketTable().get(
+        followerOMMetaMgr.getBucketKey(volumeName, bucketName)));
+    for (String key : keys) {
+      Assert.assertNotNull(followerOMMetaMgr.getKeyTable(
+          getDefaultBucketLayout())
+          .get(followerOMMetaMgr.getOzoneKey(volumeName, bucketName, key)));
+    }
+    OMMetadataManager leaderOmMetaMgr = leaderOM.getMetadataManager();
+    for (String key : newKeys) {
+      Assert.assertNotNull(leaderOmMetaMgr.getKeyTable(
+          getDefaultBucketLayout())
+          .get(followerOMMetaMgr.getOzoneKey(volumeName, bucketName, key)));
+    }
+    Thread.sleep(5000);
+    followerOMMetaMgr = followerOM.getMetadataManager();
+    for (String key : newKeys) {
+      Assert.assertNotNull(followerOMMetaMgr.getKeyTable(
+          getDefaultBucketLayout())
+          .get(followerOMMetaMgr.getOzoneKey(volumeName, bucketName, key)));
+    }
+    // Read newly created keys
+    readKeys(newKeys);
+    System.out.println("All data are replicated");
+  }
+
+  @Test
+  public void testInstallSnapshotWithClientRead() throws Exception {
+    // Get the leader OM
+    String leaderOMNodeId = OmFailoverProxyUtil
+        .getFailoverProxyProvider(objectStore.getClientProxy())
+        .getCurrentProxyOMNodeId();
+
+    OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId);
+    OzoneManagerRatisServer leaderRatisServer = leaderOM.getOmRatisServer();
+
+    // Find the inactive OM
+    String followerNodeId = leaderOM.getPeerNodes().get(0).getNodeId();
+    if (cluster.isOMActive(followerNodeId)) {
+      followerNodeId = leaderOM.getPeerNodes().get(1).getNodeId();
+    }
+    OzoneManager followerOM = cluster.getOzoneManager(followerNodeId);
+
+    // Do some transactions so that the log index increases
+    List<String> keys = writeKeysToIncreaseLogIndex(leaderRatisServer, 200);
+
+    // Get transaction Index
+    TransactionInfo transactionInfo =
+        TransactionInfo.readTransactionInfo(leaderOM.getMetadataManager());
+    TermIndex leaderOMTermIndex =
+        TermIndex.valueOf(transactionInfo.getTerm(),
+            transactionInfo.getTransactionIndex());
+    long leaderOMSnapshotIndex = leaderOMTermIndex.getIndex();
+    long leaderOMSnapshotTermIndex = leaderOMTermIndex.getTerm();
+
+    // Start the inactive OM. Checkpoint installation will happen spontaneously.
+    cluster.startInactiveOM(followerNodeId);
+    GenericTestUtils.LogCapturer logCapture =
+        GenericTestUtils.LogCapturer.captureLogs(OzoneManager.LOG);
+
+    // Continuously read keys
+    ExecutorService executor = Executors.newFixedThreadPool(1);
+    Future<Void> readFuture = executor.submit(() -> {
+      try {
+        getKeys(keys, 10);
+        readKeys(keys);
+      } catch (IOException e) {
+        Fail.fail("Read Key failed", e);
+      }
+      return null;
+    });
+    readFuture.get();
+
+    // The recently started OM should be lagging behind the leader OM.
+    // Wait & for follower to update transactions to leader snapshot index.
+    // Timeout error if follower does not load update within 3s
+    GenericTestUtils.waitFor(() -> {
+      return followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex()
+          >= leaderOMSnapshotIndex - 1;
+    }, 100, 3000);
+
+    long followerOMLastAppliedIndex =
+        followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex();
+    assertTrue(
+        followerOMLastAppliedIndex >= leaderOMSnapshotIndex - 1);
 
     // After the new checkpoint is installed, the follower OM
     // lastAppliedIndex must >= the snapshot index of the checkpoint. It
@@ -197,6 +412,13 @@ public class TestOMRatisSnapshots {
           getDefaultBucketLayout())
           .get(followerOMMetaMngr.getOzoneKey(volumeName, bucketName, key)));
     }
+
+    // Wait installation finish
+    Thread.sleep(5000);
+    // Verify checkpoint installation was happened.
+    Assert.assertTrue(logCapture.getOutput().contains("Reloaded OM state"));
+    Assert.assertTrue(logCapture.getOutput().contains(
+        "Install Checkpoint is finished"));
   }
 
   @Test
@@ -214,6 +436,9 @@ public class TestOMRatisSnapshots {
       followerNodeId = leaderOM.getPeerNodes().get(1).getNodeId();
     }
     cluster.startInactiveOM(followerNodeId);
+    GenericTestUtils.setLogLevel(OzoneManager.LOG, Level.INFO);
+    GenericTestUtils.LogCapturer logCapture =
+        GenericTestUtils.LogCapturer.captureLogs(OzoneManager.LOG);
 
     OzoneManager followerOM = cluster.getOzoneManager(followerNodeId);
     OzoneManagerRatisServer followerRatisServer = followerOM.getOmRatisServer();
@@ -232,10 +457,6 @@ public class TestOMRatisSnapshots {
     writeKeysToIncreaseLogIndex(followerOM.getOmRatisServer(),
         leaderCheckpointTermIndex.getIndex() + 100);
 
-    GenericTestUtils.setLogLevel(OzoneManager.LOG, Level.INFO);
-    GenericTestUtils.LogCapturer logCapture =
-        GenericTestUtils.LogCapturer.captureLogs(OzoneManager.LOG);
-
     // Install the old checkpoint on the follower OM. This should fail as the
     // followerOM is already ahead of that transactionLogIndex and the OM
     // state should be reloaded.
@@ -251,6 +472,10 @@ public class TestOMRatisSnapshots {
         "logIndex is less than it's lastAppliedIndex", newTermIndex);
     Assert.assertEquals(followerTermIndex,
         followerRatisServer.getLastAppliedTermIndex());
+    String msg = "OM DB is not stopped. Started services with Term: " +
+        followerTermIndex.getTerm() + " and Index: " +
+        followerTermIndex.getIndex();
+    Assert.assertTrue(logCapture.getOutput().contains(msg));
   }
 
   @Test
@@ -294,16 +519,22 @@ public class TestOMRatisSnapshots {
       }
     }
 
-    GenericTestUtils.setLogLevel(OzoneManager.LOG, Level.ERROR);
+    GenericTestUtils.setLogLevel(OzoneManager.LOG, Level.INFO);
     GenericTestUtils.LogCapturer logCapture =
         GenericTestUtils.LogCapturer.captureLogs(OzoneManager.LOG);
     followerOM.setExitManagerForTesting(new DummyExitManager());
-
+    // Install corrupted checkpoint
     followerOM.installCheckpoint(leaderOMNodeId, leaderCheckpointLocation,
         leaderCheckpointTrxnInfo);
 
-    Assert.assertTrue(logCapture.getOutput().contains("System Exit: " +
-        "Failed to reload OM state and instantiate services."));
+    // Wait checkpoint installation to be finished.
+    GenericTestUtils.waitFor(() -> {
+      Assert.assertTrue(logCapture.getOutput().contains("System Exit: " +
+          "Failed to reload OM state and instantiate services."));
+      return true;
+    }, 100, 3000);
+    String msg = "RPC server is stopped";
+    Assert.assertTrue(logCapture.getOutput().contains(msg));
   }
 
   private List<String> writeKeysToIncreaseLogIndex(
@@ -319,6 +550,36 @@ public class TestOMRatisSnapshots {
     return keys;
   }
 
+  private List<String> writeKeys(long keyCount) throws IOException,
+      InterruptedException {
+    List<String> keys = new ArrayList<>();
+    long index = 0;
+    while (index < keyCount) {
+      keys.add(createKey(ozoneBucket));
+      index++;
+    }
+    return keys;
+  }
+
+  private void getKeys(List<String> keys, int round) throws IOException {
+    while (round > 0) {
+      for (String keyName : keys) {
+        OzoneKeyDetails key = ozoneBucket.getKey(keyName);
+        Assert.assertEquals(keyName, key.getName());
+      }
+      round--;
+    }
+  }
+
+  private void readKeys(List<String> keys) throws IOException {
+    for (String keyName : keys) {
+      OzoneInputStream inputStream = ozoneBucket.readKey(keyName);
+      byte[] data = new byte[100];
+      inputStream.read(data, 0, 100);
+      inputStream.close();
+    }
+  }
+
   private static BucketLayout getDefaultBucketLayout() {
     return BucketLayout.DEFAULT;
   }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index c0bd8cc1f0..2cde62ba95 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -1024,6 +1024,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
       return omRpcServer;
     }
 
+    LOG.info("Creating RPC Server");
     InetSocketAddress omNodeRpcAddr = OmUtils.getOmAddress(conf);
 
     final int handlerCount = conf.getInt(OZONE_OM_HANDLER_COUNT_KEY,
@@ -1821,34 +1822,36 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
    * @throws IOException
    */
   private void startTrashEmptier(Configuration conf) throws IOException {
-    float hadoopTrashInterval =
-        conf.getFloat(FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT);
-    // check whether user has configured ozone specific trash-interval
-    // if not fall back to hadoop configuration
-    long trashInterval =
-        (long)(conf.getFloat(
-            OMConfigKeys.OZONE_FS_TRASH_INTERVAL_KEY, hadoopTrashInterval)
-            * MSECS_PER_MINUTE);
-    if (trashInterval == 0) {
-      LOG.info("Trash Interval set to 0. Files deleted will not move to trash");
-      return;
-    } else if (trashInterval < 0) {
-      throw new IOException("Cannot start trash emptier with negative interval."
-              + " Set " + FS_TRASH_INTERVAL_KEY + " to a positive value.");
+    if (emptier == null) {
+      float hadoopTrashInterval =
+          conf.getFloat(FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT);
+      // check whether user has configured ozone specific trash-interval
+      // if not fall back to hadoop configuration
+      long trashInterval =
+          (long) (conf.getFloat(
+              OMConfigKeys.OZONE_FS_TRASH_INTERVAL_KEY, hadoopTrashInterval)
+              * MSECS_PER_MINUTE);
+      if (trashInterval == 0) {
+        LOG.info("Trash Interval set to 0. Files deleted won't move to trash");
+        return;
+      } else if (trashInterval < 0) {
+        throw new IOException("Cannot start trash emptier with negative " +
+            "interval. Set " + FS_TRASH_INTERVAL_KEY + " to a positive value.");
+      }
+
+      OzoneManager i = this;
+      FileSystem fs = SecurityUtil.doAsLoginUser(
+          new PrivilegedExceptionAction<FileSystem>() {
+            @Override
+            public FileSystem run() throws IOException {
+              return new TrashOzoneFileSystem(i);
+            }
+          });
+      this.emptier = new Thread(new OzoneTrash(fs, conf, this).
+          getEmptier(), "Trash Emptier");
+      this.emptier.setDaemon(true);
+      this.emptier.start();
     }
-
-    OzoneManager i = this;
-    FileSystem fs = SecurityUtil.doAsLoginUser(
-        new PrivilegedExceptionAction<FileSystem>() {
-          @Override
-          public FileSystem run() throws IOException {
-            return new TrashOzoneFileSystem(i);
-          }
-        });
-    this.emptier = new Thread(new OzoneTrash(fs, conf, this).
-      getEmptier(), "Trash Emptier");
-    this.emptier.setDaemon(true);
-    this.emptier.start();
   }
 
   /**
@@ -3253,11 +3256,13 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
 
   TermIndex installCheckpoint(String leaderId, Path checkpointLocation,
       TransactionInfo checkpointTrxnInfo) throws Exception {
-
+    long startTime = Time.monotonicNow();
     File oldDBLocation = metadataManager.getStore().getDbLocation();
     try {
       // Stop Background services
-      stopServices();
+      keyManager.stop();
+      stopSecretManager();
+      stopTrashEmptier();
 
       // Pause the State Machine so that no new transactions can be applied.
       // This action also clears the OM Double Buffer so that if there are any
@@ -3266,9 +3271,9 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     } catch (Exception e) {
       LOG.error("Failed to stop/ pause the services. Cannot proceed with " +
           "installing the new checkpoint.");
-      // During stopServices, if KeyManager was stopped successfully and
-      // OMMetadataManager stop failed, we should restart the KeyManager.
+      // Stop the checkpoint install process and restart the services.
       keyManager.start(configuration);
+      startSecretManagerIfNecessary();
       startTrashEmptier(configuration);
       throw e;
     }
@@ -3285,14 +3290,39 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     boolean canProceed = OzoneManagerRatisUtils.verifyTransactionInfo(
         checkpointTrxnInfo, lastAppliedIndex, leaderId, checkpointLocation);
 
+    boolean oldOmMetadataManagerStopped = false;
+    boolean newMetadataManagerStarted = false;
+    boolean omRpcServerStopped = false;
+    long time = Time.monotonicNow();
     if (canProceed) {
+      // Stop RPC server before stop metadataManager
+      omRpcServer.stop();
+      isOmRpcServerRunning = false;
+      omRpcServerStopped = true;
+      LOG.info("RPC server is stopped. Spend " +
+          (Time.monotonicNow() - time) + " ms.");
       try {
+        // Stop old metadataManager before replacing DB Dir
+        time = Time.monotonicNow();
+        metadataManager.stop();
+        oldOmMetadataManagerStopped = true;
+        LOG.info("metadataManager is stopped. Spend " +
+            (Time.monotonicNow() - time) + " ms.");
+      } catch (Exception e) {
+        String errorMsg = "Failed to stop metadataManager. Cannot proceed " +
+            "with installing the new checkpoint.";
+        LOG.error(errorMsg);
+        exitManager.exitSystem(1, errorMsg, e, LOG);
+      }
+      try {
+        time = Time.monotonicNow();
         dbBackup = replaceOMDBWithCheckpoint(lastAppliedIndex, oldDBLocation,
             checkpointLocation);
         term = checkpointTrxnInfo.getTerm();
         lastAppliedIndex = checkpointTrxnInfo.getTransactionIndex();
-        LOG.info("Replaced DB with checkpoint from OM: {}, term: {}, index: {}",
-            leaderId, term, lastAppliedIndex);
+        LOG.info("Replaced DB with checkpoint from OM: {}, term: {}, " +
+            "index: {}, time: {} ms", leaderId, term, lastAppliedIndex,
+            Time.monotonicNow() - time);
       } catch (Exception e) {
         LOG.error("Failed to install Snapshot from {} as OM failed to replace" +
             " DB with downloaded checkpoint. Reloading old OM state.", e);
@@ -3307,15 +3337,43 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     // Restart (unpause) the state machine and update its last applied index
     // to the installed checkpoint's snapshot index.
     try {
-      reloadOMState(lastAppliedIndex, term);
-      omRatisServer.getOmStateMachine().unpause(lastAppliedIndex, term);
-      LOG.info("Reloaded OM state with Term: {} and Index: {}", term,
-          lastAppliedIndex);
+      if (oldOmMetadataManagerStopped) {
+        time = Time.monotonicNow();
+        reloadOMState(lastAppliedIndex, term);
+        omRatisServer.getOmStateMachine().unpause(lastAppliedIndex, term);
+        newMetadataManagerStarted = true;
+        LOG.info("Reloaded OM state with Term: {} and Index: {}. Spend {} ms",
+            term, lastAppliedIndex, Time.monotonicNow() - time);
+      } else {
+        // OM DB is not stopped. Start the services.
+        keyManager.start(configuration);
+        startSecretManagerIfNecessary();
+        startTrashEmptier(configuration);
+        omRatisServer.getOmStateMachine().unpause(lastAppliedIndex, term);
+        LOG.info("OM DB is not stopped. Started services with Term: {} and " +
+            "Index: {}", term, lastAppliedIndex);
+      }
     } catch (Exception ex) {
+
       String errorMsg = "Failed to reload OM state and instantiate services.";
       exitManager.exitSystem(1, errorMsg, ex, LOG);
     }
 
+    if (omRpcServerStopped && newMetadataManagerStarted) {
+      // Start the RPC server. RPC server start requires metadataManager
+      try {
+        time = Time.monotonicNow();
+        omRpcServer = getRpcServer(configuration);
+        omRpcServer.start();
+        isOmRpcServerRunning = true;
+        LOG.info("RPC server is re-started. Spend " +
+            (Time.monotonicNow() - time) + " ms.");
+      } catch (Exception e) {
+        String errorMsg = "Failed to start RPC Server.";
+        exitManager.exitSystem(1, errorMsg, e, LOG);
+      }
+    }
+
     // Delete the backup DB
     try {
       if (dbBackup != null) {
@@ -3334,6 +3392,9 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     // TODO: We should only return the snpashotIndex to the leader.
     //  Should be fixed after RATIS-586
     TermIndex newTermIndex = TermIndex.valueOf(term, lastAppliedIndex);
+    LOG.info("Install Checkpoint is finished with Term: {} and Index: {}. " +
+        "Spend {} ms.", newTermIndex.getTerm(), newTermIndex.getIndex(),
+        (Time.monotonicNow() - startTime));
     return newTermIndex;
   }
 
@@ -3356,13 +3417,6 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     return null;
   }
 
-  void stopServices() throws Exception {
-    keyManager.stop();
-    stopSecretManager();
-    metadataManager.stop();
-    stopTrashEmptier();
-  }
-
   private void stopTrashEmptier() {
     if (this.emptier != null) {
       emptier.interrupt();
@@ -3436,6 +3490,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     // Restart required services
     metadataManager.start(configuration);
     keyManager.start(configuration);
+    startSecretManagerIfNecessary();
     startTrashEmptier(configuration);
 
     // Set metrics and start metrics back ground thread
@@ -3976,7 +4031,10 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     this.minMultipartUploadPartSize = partSizeForTest;
   }
 
-
+  @VisibleForTesting
+  public boolean isOmRpcServerRunning() {
+    return isOmRpcServerRunning;
+  }
   /**
    * Write down Layout version of a finalized feature to DB on finalization.
    * @param lvm OMLayoutVersionManager
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
index b3c826244a..5543a6c2c2 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
@@ -383,14 +383,13 @@ public final class OzoneManagerDoubleBuffer {
           ExitUtils.terminate(1, message, ex, LOG);
         } else {
           LOG.info("OMDoubleBuffer flush thread {} is interrupted and will "
-              + "exit. {}", Thread.currentThread().getName(),
-                  Thread.currentThread().getName());
+              + "exit.", Thread.currentThread().getName());
         }
       } catch (IOException ex) {
         terminate(ex);
       } catch (Throwable t) {
-        final String s = "OMDoubleBuffer flush thread" +
-            Thread.currentThread().getName() + "encountered Throwable error";
+        final String s = "OMDoubleBuffer flush thread " +
+            Thread.currentThread().getName() + " encountered Throwable error";
         ExitUtils.terminate(2, s, t, LOG);
       }
     }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
index 78cbabfbc7..c8c69d4f6d 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.TransactionInfo;
@@ -92,6 +93,7 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
   private final ExecutorService executorService;
   private final ExecutorService installSnapshotExecutor;
   private final boolean isTracingEnabled;
+  private final AtomicInteger statePausedCount = new AtomicInteger(0);
 
   // Map which contains index and term for the ratis transactions which are
   // stateMachine entries which are received through applyTransaction.
@@ -139,12 +141,12 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
   }
 
   @Override
-  public void reinitialize() throws IOException {
-    getLifeCycle().startAndTransition(() -> {
-      loadSnapshotInfoFromDB();
-      this.ozoneManagerDoubleBuffer = buildDoubleBufferForRatis();
-      handler.updateDoubleBuffer(ozoneManagerDoubleBuffer);
-    });
+  public synchronized void reinitialize() throws IOException {
+    loadSnapshotInfoFromDB();
+    if (getLifeCycleState() == LifeCycle.State.PAUSED) {
+      unpause(getLastAppliedTermIndex().getIndex(),
+          getLastAppliedTermIndex().getTerm());
+    }
   }
 
   @Override
@@ -378,7 +380,12 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
   }
 
   @Override
-  public void pause() {
+  public synchronized void pause() {
+    LOG.info("OzoneManagerStateMachine is pausing");
+    statePausedCount.incrementAndGet();
+    if (getLifeCycleState() == LifeCycle.State.PAUSED) {
+      return;
+    }
     getLifeCycle().transition(LifeCycle.State.PAUSING);
     getLifeCycle().transition(LifeCycle.State.PAUSED);
     ozoneManagerDoubleBuffer.stop();
@@ -389,14 +396,17 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
    * lastAppliedIndex. This should be done after uploading new state to the
    * StateMachine.
    */
-  public void unpause(long newLastAppliedSnaphsotIndex,
+  public synchronized void unpause(long newLastAppliedSnaphsotIndex,
       long newLastAppliedSnapShotTermIndex) {
-    getLifeCycle().startAndTransition(() -> {
-      this.ozoneManagerDoubleBuffer = buildDoubleBufferForRatis();
-      handler.updateDoubleBuffer(ozoneManagerDoubleBuffer);
-      this.setLastAppliedTermIndex(TermIndex.valueOf(
-          newLastAppliedSnapShotTermIndex, newLastAppliedSnaphsotIndex));
-    });
+    LOG.info("OzoneManagerStateMachine is un-pausing");
+    if (statePausedCount.decrementAndGet() == 0) {
+      getLifeCycle().startAndTransition(() -> {
+        this.ozoneManagerDoubleBuffer = buildDoubleBufferForRatis();
+        handler.updateDoubleBuffer(ozoneManagerDoubleBuffer);
+        this.setLastAppliedTermIndex(TermIndex.valueOf(
+            newLastAppliedSnapShotTermIndex, newLastAppliedSnaphsotIndex));
+      });
+    }
   }
 
   public OzoneManagerDoubleBuffer buildDoubleBufferForRatis() {
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java
index 21e67d0bd9..83d3602cb7 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java
@@ -110,7 +110,7 @@ public class TestOzoneManagerDoubleBufferWithOMResponse {
     doubleBuffer = new OzoneManagerDoubleBuffer.Builder()
         .setOmMetadataManager(omMetadataManager)
         .setOzoneManagerRatisSnapShot(ozoneManagerRatisSnapshot)
-        .setmaxUnFlushedTransactionCount(1)
+        .setmaxUnFlushedTransactionCount(100000)
         .enableRatis(true)
         .setIndexToTerm((i) -> term)
         .build();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org