You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sa...@apache.org on 2022/05/04 01:58:01 UTC
[ozone] branch master updated: HDDS-6685. Follower OM crashed when validating S3 auth info. (#3376)
This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 28170f2672 HDDS-6685. Follower OM crashed when validating S3 auth info. (#3376)
28170f2672 is described below
commit 28170f2672fdcba1e9b96a2c7c15e62981b52392
Author: Sammi Chen <sa...@apache.org>
AuthorDate: Wed May 4 09:57:51 2022 +0800
HDDS-6685. Follower OM crashed when validating S3 auth info. (#3376)
---
.../hadoop/ozone/om/TestOMRatisSnapshots.java | 295 +++++++++++++++++++--
.../org/apache/hadoop/ozone/om/OzoneManager.java | 148 +++++++----
.../ozone/om/ratis/OzoneManagerDoubleBuffer.java | 7 +-
.../ozone/om/ratis/OzoneManagerStateMachine.java | 38 ++-
...TestOzoneManagerDoubleBufferWithOMResponse.java | 2 +-
5 files changed, 409 insertions(+), 81 deletions(-)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
index 42bfd11da4..1c787a426f 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
@@ -22,18 +22,24 @@ import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneKeyDetails;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.VolumeArgs;
import org.apache.hadoop.hdds.utils.TransactionInfo;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
@@ -44,7 +50,9 @@ import org.apache.ratis.server.protocol.TermIndex;
import static org.apache.hadoop.ozone.om.TestOzoneManagerHAWithData.createKey;
import static org.junit.Assert.assertTrue;
+import org.assertj.core.api.Fail;
import org.junit.Assert;
+import org.junit.Ignore;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -53,9 +61,9 @@ import org.slf4j.Logger;
import org.slf4j.event.Level;
/**
- * Tests the Ratis snaphsots feature in OM.
+ * Tests the Ratis snapshots feature in OM.
*/
-@Timeout(500)
+@Timeout(5000)
public class TestOMRatisSnapshots {
private MiniOzoneHAClusterImpl cluster = null;
@@ -86,6 +94,10 @@ public class TestOMRatisSnapshots {
scmId = UUID.randomUUID().toString();
omServiceId = "om-service-test1";
conf.setInt(OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP, LOG_PURGE_GAP);
+ conf.setStorageSize(OMConfigKeys.OZONE_OM_RATIS_SEGMENT_SIZE_KEY, 16,
+ StorageUnit.KB);
+ conf.setStorageSize(OMConfigKeys.
+ OZONE_OM_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY, 16, StorageUnit.KB);
conf.setLong(
OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY,
SNAPSHOT_THRESHOLD);
@@ -154,11 +166,112 @@ public class TestOMRatisSnapshots {
long leaderOMSnapshotIndex = leaderOMTermIndex.getIndex();
long leaderOMSnapshotTermIndex = leaderOMTermIndex.getTerm();
- DBCheckpoint leaderDbCheckpoint =
- leaderOM.getMetadataManager().getStore().getCheckpoint(false);
+ // Start the inactive OM. Checkpoint installation will happen spontaneously.
+ cluster.startInactiveOM(followerNodeId);
+ GenericTestUtils.LogCapturer logCapture =
+ GenericTestUtils.LogCapturer.captureLogs(OzoneManager.LOG);
+
+ // The recently started OM should be lagging behind the leader OM.
+ // Wait & for follower to update transactions to leader snapshot index.
+ // Timeout error if follower does not load update within 3s
+ GenericTestUtils.waitFor(() -> {
+ return followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex()
+ >= leaderOMSnapshotIndex - 1;
+ }, 100, 3000);
+
+ long followerOMLastAppliedIndex =
+ followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex();
+ assertTrue(
+ followerOMLastAppliedIndex >= leaderOMSnapshotIndex - 1);
+
+ // After the new checkpoint is installed, the follower OM
+ // lastAppliedIndex must >= the snapshot index of the checkpoint. It
+ // could be great than snapshot index if there is any conf entry from ratis.
+ followerOMLastAppliedIndex = followerOM.getOmRatisServer()
+ .getLastAppliedTermIndex().getIndex();
+ assertTrue(followerOMLastAppliedIndex >= leaderOMSnapshotIndex);
+ assertTrue(followerOM.getOmRatisServer().getLastAppliedTermIndex()
+ .getTerm() >= leaderOMSnapshotTermIndex);
+
+ // Verify checkpoint installation was happened.
+ String msg = "Reloaded OM state";
+ Assert.assertTrue(logCapture.getOutput().contains(msg));
+
+ // Verify that the follower OM's DB contains the transactions which were
+ // made while it was inactive.
+ OMMetadataManager followerOMMetaMngr = followerOM.getMetadataManager();
+ Assert.assertNotNull(followerOMMetaMngr.getVolumeTable().get(
+ followerOMMetaMngr.getVolumeKey(volumeName)));
+ Assert.assertNotNull(followerOMMetaMngr.getBucketTable().get(
+ followerOMMetaMngr.getBucketKey(volumeName, bucketName)));
+ for (String key : keys) {
+ Assert.assertNotNull(followerOMMetaMngr.getKeyTable(
+ getDefaultBucketLayout())
+ .get(followerOMMetaMngr.getOzoneKey(volumeName, bucketName, key)));
+ }
+
+ // Verify RPC server is running
+ GenericTestUtils.waitFor(() -> {
+ return followerOM.isOmRpcServerRunning();
+ }, 100, 5000);
+
+ Assert.assertTrue(logCapture.getOutput().contains(
+ "Install Checkpoint is finished"));
+
+ // Read & Write after snapshot installed.
+ List<String> newKeys = writeKeys(1);
+ readKeys(newKeys);
+ // TODO: Enable this part after RATIS-1481 used
+ /*
+ Assert.assertNotNull(followerOMMetaMngr.getKeyTable(
+ getDefaultBucketLayout()).get(followerOMMetaMngr.getOzoneKey(
+ volumeName, bucketName, newKeys.get(0))));
+ */
+ }
- // Start the inactive OM
+ @Ignore("Enable this unit test after RATIS-1481 used")
+ public void testInstallSnapshotWithClientWrite() throws Exception {
+ // Get the leader OM
+ String leaderOMNodeId = OmFailoverProxyUtil
+ .getFailoverProxyProvider(objectStore.getClientProxy())
+ .getCurrentProxyOMNodeId();
+
+ OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId);
+ OzoneManagerRatisServer leaderRatisServer = leaderOM.getOmRatisServer();
+
+ // Find the inactive OM
+ String followerNodeId = leaderOM.getPeerNodes().get(0).getNodeId();
+ if (cluster.isOMActive(followerNodeId)) {
+ followerNodeId = leaderOM.getPeerNodes().get(1).getNodeId();
+ }
+ OzoneManager followerOM = cluster.getOzoneManager(followerNodeId);
+
+ // Do some transactions so that the log index increases
+ List<String> keys = writeKeysToIncreaseLogIndex(leaderRatisServer, 200);
+
+ // Get the latest db checkpoint from the leader OM.
+ TransactionInfo transactionInfo =
+ TransactionInfo.readTransactionInfo(leaderOM.getMetadataManager());
+ TermIndex leaderOMTermIndex =
+ TermIndex.valueOf(transactionInfo.getTerm(),
+ transactionInfo.getTransactionIndex());
+ long leaderOMSnapshotIndex = leaderOMTermIndex.getIndex();
+ long leaderOMSnapshotTermIndex = leaderOMTermIndex.getTerm();
+
+ // Start the inactive OM. Checkpoint installation will happen spontaneously.
cluster.startInactiveOM(followerNodeId);
+ GenericTestUtils.LogCapturer logCapture =
+ GenericTestUtils.LogCapturer.captureLogs(OzoneManager.LOG);
+
+ // Continuously create new keys
+ ExecutorService executor = Executors.newFixedThreadPool(1);
+ Future<List<String>> writeFuture = executor.submit(() -> {
+ return writeKeys(200);
+ });
+ List<String> newKeys = writeFuture.get();
+
+ // Wait checkpoint installation to finish
+ Thread.sleep(5000);
// The recently started OM should be lagging behind the leader OM.
// Wait & for follower to update transactions to leader snapshot index.
@@ -167,14 +280,116 @@ public class TestOMRatisSnapshots {
return followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex()
>= leaderOMSnapshotIndex - 1;
}, 100, 3000);
-
+
+ // Verify checkpoint installation was happened.
+ String msg = "Reloaded OM state";
+ Assert.assertTrue(logCapture.getOutput().contains(msg));
+ Assert.assertTrue(logCapture.getOutput().contains(
+ "Install Checkpoint is finished"));
+
long followerOMLastAppliedIndex =
followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex();
assertTrue(
- followerOMLastAppliedIndex >= leaderOMSnapshotIndex - 1);
+ followerOMLastAppliedIndex >= leaderOMSnapshotIndex - 1);
- // Install leader OM's db checkpoint on the lagging OM.
- followerOM.installCheckpoint(leaderOMNodeId, leaderDbCheckpoint);
+ // After the new checkpoint is installed, the follower OM
+ // lastAppliedIndex must >= the snapshot index of the checkpoint. It
+ // could be great than snapshot index if there is any conf entry from ratis.
+ followerOMLastAppliedIndex = followerOM.getOmRatisServer()
+ .getLastAppliedTermIndex().getIndex();
+ assertTrue(followerOMLastAppliedIndex >= leaderOMSnapshotIndex);
+ assertTrue(followerOM.getOmRatisServer().getLastAppliedTermIndex()
+ .getTerm() >= leaderOMSnapshotTermIndex);
+
+ // Verify that the follower OM's DB contains the transactions which were
+ // made while it was inactive.
+ OMMetadataManager followerOMMetaMgr = followerOM.getMetadataManager();
+ Assert.assertNotNull(followerOMMetaMgr.getVolumeTable().get(
+ followerOMMetaMgr.getVolumeKey(volumeName)));
+ Assert.assertNotNull(followerOMMetaMgr.getBucketTable().get(
+ followerOMMetaMgr.getBucketKey(volumeName, bucketName)));
+ for (String key : keys) {
+ Assert.assertNotNull(followerOMMetaMgr.getKeyTable(
+ getDefaultBucketLayout())
+ .get(followerOMMetaMgr.getOzoneKey(volumeName, bucketName, key)));
+ }
+ OMMetadataManager leaderOmMetaMgr = leaderOM.getMetadataManager();
+ for (String key : newKeys) {
+ Assert.assertNotNull(leaderOmMetaMgr.getKeyTable(
+ getDefaultBucketLayout())
+ .get(followerOMMetaMgr.getOzoneKey(volumeName, bucketName, key)));
+ }
+ Thread.sleep(5000);
+ followerOMMetaMgr = followerOM.getMetadataManager();
+ for (String key : newKeys) {
+ Assert.assertNotNull(followerOMMetaMgr.getKeyTable(
+ getDefaultBucketLayout())
+ .get(followerOMMetaMgr.getOzoneKey(volumeName, bucketName, key)));
+ }
+ // Read newly created keys
+ readKeys(newKeys);
+ System.out.println("All data are replicated");
+ }
+
+ @Test
+ public void testInstallSnapshotWithClientRead() throws Exception {
+ // Get the leader OM
+ String leaderOMNodeId = OmFailoverProxyUtil
+ .getFailoverProxyProvider(objectStore.getClientProxy())
+ .getCurrentProxyOMNodeId();
+
+ OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId);
+ OzoneManagerRatisServer leaderRatisServer = leaderOM.getOmRatisServer();
+
+ // Find the inactive OM
+ String followerNodeId = leaderOM.getPeerNodes().get(0).getNodeId();
+ if (cluster.isOMActive(followerNodeId)) {
+ followerNodeId = leaderOM.getPeerNodes().get(1).getNodeId();
+ }
+ OzoneManager followerOM = cluster.getOzoneManager(followerNodeId);
+
+ // Do some transactions so that the log index increases
+ List<String> keys = writeKeysToIncreaseLogIndex(leaderRatisServer, 200);
+
+ // Get transaction Index
+ TransactionInfo transactionInfo =
+ TransactionInfo.readTransactionInfo(leaderOM.getMetadataManager());
+ TermIndex leaderOMTermIndex =
+ TermIndex.valueOf(transactionInfo.getTerm(),
+ transactionInfo.getTransactionIndex());
+ long leaderOMSnapshotIndex = leaderOMTermIndex.getIndex();
+ long leaderOMSnapshotTermIndex = leaderOMTermIndex.getTerm();
+
+ // Start the inactive OM. Checkpoint installation will happen spontaneously.
+ cluster.startInactiveOM(followerNodeId);
+ GenericTestUtils.LogCapturer logCapture =
+ GenericTestUtils.LogCapturer.captureLogs(OzoneManager.LOG);
+
+ // Continuously read keys
+ ExecutorService executor = Executors.newFixedThreadPool(1);
+ Future<Void> readFuture = executor.submit(() -> {
+ try {
+ getKeys(keys, 10);
+ readKeys(keys);
+ } catch (IOException e) {
+ Fail.fail("Read Key failed", e);
+ }
+ return null;
+ });
+ readFuture.get();
+
+ // The recently started OM should be lagging behind the leader OM.
+ // Wait & for follower to update transactions to leader snapshot index.
+ // Timeout error if follower does not load update within 3s
+ GenericTestUtils.waitFor(() -> {
+ return followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex()
+ >= leaderOMSnapshotIndex - 1;
+ }, 100, 3000);
+
+ long followerOMLastAppliedIndex =
+ followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex();
+ assertTrue(
+ followerOMLastAppliedIndex >= leaderOMSnapshotIndex - 1);
// After the new checkpoint is installed, the follower OM
// lastAppliedIndex must >= the snapshot index of the checkpoint. It
@@ -197,6 +412,13 @@ public class TestOMRatisSnapshots {
getDefaultBucketLayout())
.get(followerOMMetaMngr.getOzoneKey(volumeName, bucketName, key)));
}
+
+ // Wait installation finish
+ Thread.sleep(5000);
+ // Verify checkpoint installation was happened.
+ Assert.assertTrue(logCapture.getOutput().contains("Reloaded OM state"));
+ Assert.assertTrue(logCapture.getOutput().contains(
+ "Install Checkpoint is finished"));
}
@Test
@@ -214,6 +436,9 @@ public class TestOMRatisSnapshots {
followerNodeId = leaderOM.getPeerNodes().get(1).getNodeId();
}
cluster.startInactiveOM(followerNodeId);
+ GenericTestUtils.setLogLevel(OzoneManager.LOG, Level.INFO);
+ GenericTestUtils.LogCapturer logCapture =
+ GenericTestUtils.LogCapturer.captureLogs(OzoneManager.LOG);
OzoneManager followerOM = cluster.getOzoneManager(followerNodeId);
OzoneManagerRatisServer followerRatisServer = followerOM.getOmRatisServer();
@@ -232,10 +457,6 @@ public class TestOMRatisSnapshots {
writeKeysToIncreaseLogIndex(followerOM.getOmRatisServer(),
leaderCheckpointTermIndex.getIndex() + 100);
- GenericTestUtils.setLogLevel(OzoneManager.LOG, Level.INFO);
- GenericTestUtils.LogCapturer logCapture =
- GenericTestUtils.LogCapturer.captureLogs(OzoneManager.LOG);
-
// Install the old checkpoint on the follower OM. This should fail as the
// followerOM is already ahead of that transactionLogIndex and the OM
// state should be reloaded.
@@ -251,6 +472,10 @@ public class TestOMRatisSnapshots {
"logIndex is less than it's lastAppliedIndex", newTermIndex);
Assert.assertEquals(followerTermIndex,
followerRatisServer.getLastAppliedTermIndex());
+ String msg = "OM DB is not stopped. Started services with Term: " +
+ followerTermIndex.getTerm() + " and Index: " +
+ followerTermIndex.getIndex();
+ Assert.assertTrue(logCapture.getOutput().contains(msg));
}
@Test
@@ -294,16 +519,22 @@ public class TestOMRatisSnapshots {
}
}
- GenericTestUtils.setLogLevel(OzoneManager.LOG, Level.ERROR);
+ GenericTestUtils.setLogLevel(OzoneManager.LOG, Level.INFO);
GenericTestUtils.LogCapturer logCapture =
GenericTestUtils.LogCapturer.captureLogs(OzoneManager.LOG);
followerOM.setExitManagerForTesting(new DummyExitManager());
-
+ // Install corrupted checkpoint
followerOM.installCheckpoint(leaderOMNodeId, leaderCheckpointLocation,
leaderCheckpointTrxnInfo);
- Assert.assertTrue(logCapture.getOutput().contains("System Exit: " +
- "Failed to reload OM state and instantiate services."));
+ // Wait checkpoint installation to be finished.
+ GenericTestUtils.waitFor(() -> {
+ Assert.assertTrue(logCapture.getOutput().contains("System Exit: " +
+ "Failed to reload OM state and instantiate services."));
+ return true;
+ }, 100, 3000);
+ String msg = "RPC server is stopped";
+ Assert.assertTrue(logCapture.getOutput().contains(msg));
}
private List<String> writeKeysToIncreaseLogIndex(
@@ -319,6 +550,36 @@ public class TestOMRatisSnapshots {
return keys;
}
+ private List<String> writeKeys(long keyCount) throws IOException,
+ InterruptedException {
+ List<String> keys = new ArrayList<>();
+ long index = 0;
+ while (index < keyCount) {
+ keys.add(createKey(ozoneBucket));
+ index++;
+ }
+ return keys;
+ }
+
+ private void getKeys(List<String> keys, int round) throws IOException {
+ while (round > 0) {
+ for (String keyName : keys) {
+ OzoneKeyDetails key = ozoneBucket.getKey(keyName);
+ Assert.assertEquals(keyName, key.getName());
+ }
+ round--;
+ }
+ }
+
+ private void readKeys(List<String> keys) throws IOException {
+ for (String keyName : keys) {
+ OzoneInputStream inputStream = ozoneBucket.readKey(keyName);
+ byte[] data = new byte[100];
+ inputStream.read(data, 0, 100);
+ inputStream.close();
+ }
+ }
+
private static BucketLayout getDefaultBucketLayout() {
return BucketLayout.DEFAULT;
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index c0bd8cc1f0..2cde62ba95 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -1024,6 +1024,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
return omRpcServer;
}
+ LOG.info("Creating RPC Server");
InetSocketAddress omNodeRpcAddr = OmUtils.getOmAddress(conf);
final int handlerCount = conf.getInt(OZONE_OM_HANDLER_COUNT_KEY,
@@ -1821,34 +1822,36 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
* @throws IOException
*/
private void startTrashEmptier(Configuration conf) throws IOException {
- float hadoopTrashInterval =
- conf.getFloat(FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT);
- // check whether user has configured ozone specific trash-interval
- // if not fall back to hadoop configuration
- long trashInterval =
- (long)(conf.getFloat(
- OMConfigKeys.OZONE_FS_TRASH_INTERVAL_KEY, hadoopTrashInterval)
- * MSECS_PER_MINUTE);
- if (trashInterval == 0) {
- LOG.info("Trash Interval set to 0. Files deleted will not move to trash");
- return;
- } else if (trashInterval < 0) {
- throw new IOException("Cannot start trash emptier with negative interval."
- + " Set " + FS_TRASH_INTERVAL_KEY + " to a positive value.");
+ if (emptier == null) {
+ float hadoopTrashInterval =
+ conf.getFloat(FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT);
+ // check whether user has configured ozone specific trash-interval
+ // if not fall back to hadoop configuration
+ long trashInterval =
+ (long) (conf.getFloat(
+ OMConfigKeys.OZONE_FS_TRASH_INTERVAL_KEY, hadoopTrashInterval)
+ * MSECS_PER_MINUTE);
+ if (trashInterval == 0) {
+ LOG.info("Trash Interval set to 0. Files deleted won't move to trash");
+ return;
+ } else if (trashInterval < 0) {
+ throw new IOException("Cannot start trash emptier with negative " +
+ "interval. Set " + FS_TRASH_INTERVAL_KEY + " to a positive value.");
+ }
+
+ OzoneManager i = this;
+ FileSystem fs = SecurityUtil.doAsLoginUser(
+ new PrivilegedExceptionAction<FileSystem>() {
+ @Override
+ public FileSystem run() throws IOException {
+ return new TrashOzoneFileSystem(i);
+ }
+ });
+ this.emptier = new Thread(new OzoneTrash(fs, conf, this).
+ getEmptier(), "Trash Emptier");
+ this.emptier.setDaemon(true);
+ this.emptier.start();
}
-
- OzoneManager i = this;
- FileSystem fs = SecurityUtil.doAsLoginUser(
- new PrivilegedExceptionAction<FileSystem>() {
- @Override
- public FileSystem run() throws IOException {
- return new TrashOzoneFileSystem(i);
- }
- });
- this.emptier = new Thread(new OzoneTrash(fs, conf, this).
- getEmptier(), "Trash Emptier");
- this.emptier.setDaemon(true);
- this.emptier.start();
}
/**
@@ -3253,11 +3256,13 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
TermIndex installCheckpoint(String leaderId, Path checkpointLocation,
TransactionInfo checkpointTrxnInfo) throws Exception {
-
+ long startTime = Time.monotonicNow();
File oldDBLocation = metadataManager.getStore().getDbLocation();
try {
// Stop Background services
- stopServices();
+ keyManager.stop();
+ stopSecretManager();
+ stopTrashEmptier();
// Pause the State Machine so that no new transactions can be applied.
// This action also clears the OM Double Buffer so that if there are any
@@ -3266,9 +3271,9 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
} catch (Exception e) {
LOG.error("Failed to stop/ pause the services. Cannot proceed with " +
"installing the new checkpoint.");
- // During stopServices, if KeyManager was stopped successfully and
- // OMMetadataManager stop failed, we should restart the KeyManager.
+ // Stop the checkpoint install process and restart the services.
keyManager.start(configuration);
+ startSecretManagerIfNecessary();
startTrashEmptier(configuration);
throw e;
}
@@ -3285,14 +3290,39 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
boolean canProceed = OzoneManagerRatisUtils.verifyTransactionInfo(
checkpointTrxnInfo, lastAppliedIndex, leaderId, checkpointLocation);
+ boolean oldOmMetadataManagerStopped = false;
+ boolean newMetadataManagerStarted = false;
+ boolean omRpcServerStopped = false;
+ long time = Time.monotonicNow();
if (canProceed) {
+ // Stop RPC server before stop metadataManager
+ omRpcServer.stop();
+ isOmRpcServerRunning = false;
+ omRpcServerStopped = true;
+ LOG.info("RPC server is stopped. Spend " +
+ (Time.monotonicNow() - time) + " ms.");
try {
+ // Stop old metadataManager before replacing DB Dir
+ time = Time.monotonicNow();
+ metadataManager.stop();
+ oldOmMetadataManagerStopped = true;
+ LOG.info("metadataManager is stopped. Spend " +
+ (Time.monotonicNow() - time) + " ms.");
+ } catch (Exception e) {
+ String errorMsg = "Failed to stop metadataManager. Cannot proceed " +
+ "with installing the new checkpoint.";
+ LOG.error(errorMsg);
+ exitManager.exitSystem(1, errorMsg, e, LOG);
+ }
+ try {
+ time = Time.monotonicNow();
dbBackup = replaceOMDBWithCheckpoint(lastAppliedIndex, oldDBLocation,
checkpointLocation);
term = checkpointTrxnInfo.getTerm();
lastAppliedIndex = checkpointTrxnInfo.getTransactionIndex();
- LOG.info("Replaced DB with checkpoint from OM: {}, term: {}, index: {}",
- leaderId, term, lastAppliedIndex);
+ LOG.info("Replaced DB with checkpoint from OM: {}, term: {}, " +
+ "index: {}, time: {} ms", leaderId, term, lastAppliedIndex,
+ Time.monotonicNow() - time);
} catch (Exception e) {
LOG.error("Failed to install Snapshot from {} as OM failed to replace" +
" DB with downloaded checkpoint. Reloading old OM state.", e);
@@ -3307,15 +3337,43 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
// Restart (unpause) the state machine and update its last applied index
// to the installed checkpoint's snapshot index.
try {
- reloadOMState(lastAppliedIndex, term);
- omRatisServer.getOmStateMachine().unpause(lastAppliedIndex, term);
- LOG.info("Reloaded OM state with Term: {} and Index: {}", term,
- lastAppliedIndex);
+ if (oldOmMetadataManagerStopped) {
+ time = Time.monotonicNow();
+ reloadOMState(lastAppliedIndex, term);
+ omRatisServer.getOmStateMachine().unpause(lastAppliedIndex, term);
+ newMetadataManagerStarted = true;
+ LOG.info("Reloaded OM state with Term: {} and Index: {}. Spend {} ms",
+ term, lastAppliedIndex, Time.monotonicNow() - time);
+ } else {
+ // OM DB is not stopped. Start the services.
+ keyManager.start(configuration);
+ startSecretManagerIfNecessary();
+ startTrashEmptier(configuration);
+ omRatisServer.getOmStateMachine().unpause(lastAppliedIndex, term);
+ LOG.info("OM DB is not stopped. Started services with Term: {} and " +
+ "Index: {}", term, lastAppliedIndex);
+ }
} catch (Exception ex) {
+
String errorMsg = "Failed to reload OM state and instantiate services.";
exitManager.exitSystem(1, errorMsg, ex, LOG);
}
+ if (omRpcServerStopped && newMetadataManagerStarted) {
+ // Start the RPC server. RPC server start requires metadataManager
+ try {
+ time = Time.monotonicNow();
+ omRpcServer = getRpcServer(configuration);
+ omRpcServer.start();
+ isOmRpcServerRunning = true;
+ LOG.info("RPC server is re-started. Spend " +
+ (Time.monotonicNow() - time) + " ms.");
+ } catch (Exception e) {
+ String errorMsg = "Failed to start RPC Server.";
+ exitManager.exitSystem(1, errorMsg, e, LOG);
+ }
+ }
+
// Delete the backup DB
try {
if (dbBackup != null) {
@@ -3334,6 +3392,9 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
// TODO: We should only return the snpashotIndex to the leader.
// Should be fixed after RATIS-586
TermIndex newTermIndex = TermIndex.valueOf(term, lastAppliedIndex);
+ LOG.info("Install Checkpoint is finished with Term: {} and Index: {}. " +
+ "Spend {} ms.", newTermIndex.getTerm(), newTermIndex.getIndex(),
+ (Time.monotonicNow() - startTime));
return newTermIndex;
}
@@ -3356,13 +3417,6 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
return null;
}
- void stopServices() throws Exception {
- keyManager.stop();
- stopSecretManager();
- metadataManager.stop();
- stopTrashEmptier();
- }
-
private void stopTrashEmptier() {
if (this.emptier != null) {
emptier.interrupt();
@@ -3436,6 +3490,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
// Restart required services
metadataManager.start(configuration);
keyManager.start(configuration);
+ startSecretManagerIfNecessary();
startTrashEmptier(configuration);
// Set metrics and start metrics back ground thread
@@ -3976,7 +4031,10 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
this.minMultipartUploadPartSize = partSizeForTest;
}
-
+ @VisibleForTesting
+ public boolean isOmRpcServerRunning() {
+ return isOmRpcServerRunning;
+ }
/**
* Write down Layout version of a finalized feature to DB on finalization.
* @param lvm OMLayoutVersionManager
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
index b3c826244a..5543a6c2c2 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
@@ -383,14 +383,13 @@ public final class OzoneManagerDoubleBuffer {
ExitUtils.terminate(1, message, ex, LOG);
} else {
LOG.info("OMDoubleBuffer flush thread {} is interrupted and will "
- + "exit. {}", Thread.currentThread().getName(),
- Thread.currentThread().getName());
+ + "exit.", Thread.currentThread().getName());
}
} catch (IOException ex) {
terminate(ex);
} catch (Throwable t) {
- final String s = "OMDoubleBuffer flush thread" +
- Thread.currentThread().getName() + "encountered Throwable error";
+ final String s = "OMDoubleBuffer flush thread " +
+ Thread.currentThread().getName() + " encountered Throwable error";
ExitUtils.terminate(2, s, t, LOG);
}
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
index 78cbabfbc7..c8c69d4f6d 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.TransactionInfo;
@@ -92,6 +93,7 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
private final ExecutorService executorService;
private final ExecutorService installSnapshotExecutor;
private final boolean isTracingEnabled;
+ private final AtomicInteger statePausedCount = new AtomicInteger(0);
// Map which contains index and term for the ratis transactions which are
// stateMachine entries which are received through applyTransaction.
@@ -139,12 +141,12 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
}
@Override
- public void reinitialize() throws IOException {
- getLifeCycle().startAndTransition(() -> {
- loadSnapshotInfoFromDB();
- this.ozoneManagerDoubleBuffer = buildDoubleBufferForRatis();
- handler.updateDoubleBuffer(ozoneManagerDoubleBuffer);
- });
+ public synchronized void reinitialize() throws IOException {
+ loadSnapshotInfoFromDB();
+ if (getLifeCycleState() == LifeCycle.State.PAUSED) {
+ unpause(getLastAppliedTermIndex().getIndex(),
+ getLastAppliedTermIndex().getTerm());
+ }
}
@Override
@@ -378,7 +380,12 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
}
@Override
- public void pause() {
+ public synchronized void pause() {
+ LOG.info("OzoneManagerStateMachine is pausing");
+ statePausedCount.incrementAndGet();
+ if (getLifeCycleState() == LifeCycle.State.PAUSED) {
+ return;
+ }
getLifeCycle().transition(LifeCycle.State.PAUSING);
getLifeCycle().transition(LifeCycle.State.PAUSED);
ozoneManagerDoubleBuffer.stop();
@@ -389,14 +396,17 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
* lastAppliedIndex. This should be done after uploading new state to the
* StateMachine.
*/
- public void unpause(long newLastAppliedSnaphsotIndex,
+ public synchronized void unpause(long newLastAppliedSnaphsotIndex,
long newLastAppliedSnapShotTermIndex) {
- getLifeCycle().startAndTransition(() -> {
- this.ozoneManagerDoubleBuffer = buildDoubleBufferForRatis();
- handler.updateDoubleBuffer(ozoneManagerDoubleBuffer);
- this.setLastAppliedTermIndex(TermIndex.valueOf(
- newLastAppliedSnapShotTermIndex, newLastAppliedSnaphsotIndex));
- });
+ LOG.info("OzoneManagerStateMachine is un-pausing");
+ if (statePausedCount.decrementAndGet() == 0) {
+ getLifeCycle().startAndTransition(() -> {
+ this.ozoneManagerDoubleBuffer = buildDoubleBufferForRatis();
+ handler.updateDoubleBuffer(ozoneManagerDoubleBuffer);
+ this.setLastAppliedTermIndex(TermIndex.valueOf(
+ newLastAppliedSnapShotTermIndex, newLastAppliedSnaphsotIndex));
+ });
+ }
}
public OzoneManagerDoubleBuffer buildDoubleBufferForRatis() {
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java
index 21e67d0bd9..83d3602cb7 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java
@@ -110,7 +110,7 @@ public class TestOzoneManagerDoubleBufferWithOMResponse {
doubleBuffer = new OzoneManagerDoubleBuffer.Builder()
.setOmMetadataManager(omMetadataManager)
.setOzoneManagerRatisSnapShot(ozoneManagerRatisSnapshot)
- .setmaxUnFlushedTransactionCount(1)
+ .setmaxUnFlushedTransactionCount(100000)
.enableRatis(true)
.setIndexToTerm((i) -> term)
.build();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org