You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by av...@apache.org on 2020/12/06 05:39:40 UTC

[ozone] branch HDDS-3698-upgrade updated: HDDS-4480. Implement OM Prepare Request/Response. (#1613)

This is an automated email from the ASF dual-hosted git repository.

avijayan pushed a commit to branch HDDS-3698-upgrade
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3698-upgrade by this push:
     new a7797aa  HDDS-4480. Implement OM Prepare Request/Response. (#1613)
a7797aa is described below

commit a7797aacea8925b4e489f935b4b8344561b10439
Author: Ethan Rose <33...@users.noreply.github.com>
AuthorDate: Sun Dec 6 00:39:24 2020 -0500

    HDDS-4480. Implement OM Prepare Request/Response. (#1613)
---
 .../hadoop/hdds/ratis/RatisUpgradeUtils.java       |  96 ------
 .../hadoop/hdds/ratis/TestRatisUpgradeUtils.java   |  97 ------
 .../main/java/org/apache/hadoop/ozone/OmUtils.java |   1 +
 .../hadoop/ozone/om/exceptions/OMException.java    |   3 +-
 .../hadoop/ozone/om/TestOzoneManagerPrepare.java   | 326 +++++++++++++++++++++
 .../src/main/proto/OmClientProtocol.proto          |  12 +
 .../apache/hadoop/ozone/om/OMStarterInterface.java |   2 -
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  55 +---
 .../hadoop/ozone/om/OzoneManagerStarter.java       |  65 ++--
 .../hadoop/ozone/om/codec/OMDBDefinition.java      |  11 +-
 .../ozone/om/request/upgrade/OMPrepareRequest.java | 218 ++++++++++++++
 .../om/response/upgrade/OMPrepareResponse.java     |  44 +++
 .../hadoop/ozone/om/TestOmMetadataManager.java     |   9 +-
 .../om/ratis/TestOzoneManagerStateMachine.java     |   4 +-
 .../ozone/om/response/TestCleanupTableInfo.java    |   5 +-
 15 files changed, 650 insertions(+), 298 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisUpgradeUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisUpgradeUtils.java
deleted file mode 100644
index 796668d..0000000
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisUpgradeUtils.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.hadoop.hdds.ratis;
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.impl.RaftServerProxy;
-import org.apache.ratis.statemachine.StateMachine;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Ratis utility functions.
- */
-public final class RatisUpgradeUtils {
-
-  private RatisUpgradeUtils() {
-  }
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(RatisUpgradeUtils.class);
-
-  /**
-   * Flush all committed transactions in a given Raft Server for a given group.
-   * @param stateMachine state machine to use
-   * @param raftGroup raft group
-   * @param server Raft server proxy instance.
-   * @param maxTimeToWaitSeconds Max time to wait before declaring failure.
-   * @throws InterruptedException when interrupted
-   * @throws IOException on error while waiting
-   */
-  public static void waitForAllTxnsApplied(
-      StateMachine stateMachine,
-      RaftGroup raftGroup,
-      RaftServerProxy server,
-      long maxTimeToWaitSeconds,
-      long timeBetweenRetryInSeconds)
-      throws InterruptedException, IOException {
-
-    long intervalTime = TimeUnit.SECONDS.toMillis(timeBetweenRetryInSeconds);
-    long endTime = System.currentTimeMillis() +
-        TimeUnit.SECONDS.toMillis(maxTimeToWaitSeconds);
-    boolean success = false;
-    while (System.currentTimeMillis() < endTime) {
-      success = checkIfAllTransactionsApplied(stateMachine, server, raftGroup);
-      if (success) {
-        break;
-      }
-      Thread.sleep(intervalTime);
-    }
-
-    if (!success) {
-      throw new IOException(String.format("After waiting for %d seconds, " +
-          "State Machine has not applied  all the transactions.",
-          maxTimeToWaitSeconds));
-    }
-
-    long snapshotIndex = stateMachine.takeSnapshot();
-    if (snapshotIndex != stateMachine.getLastAppliedTermIndex().getIndex()) {
-      throw new IOException("Index from Snapshot does not match last applied " +
-          "Index");
-    }
-  }
-
-  private static boolean checkIfAllTransactionsApplied(
-      StateMachine stateMachine,
-      RaftServerProxy serverProxy,
-      RaftGroup raftGroup) throws IOException {
-    LOG.info("Checking for pending transactions to be applied.");
-    RaftServerImpl impl = serverProxy.getImpl(raftGroup.getGroupId());
-    long lastCommittedIndex = impl.getState().getLog().getLastCommittedIndex();
-    long appliedIndex = stateMachine.getLastAppliedTermIndex().getIndex();
-    LOG.info("lastCommittedIndex = {}, appliedIndex = {}",
-        lastCommittedIndex, appliedIndex);
-    return (lastCommittedIndex == appliedIndex);
-  }
-
-}
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/ratis/TestRatisUpgradeUtils.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/ratis/TestRatisUpgradeUtils.java
deleted file mode 100644
index 078bbb5..0000000
--- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/ratis/TestRatisUpgradeUtils.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.hadoop.hdds.ratis;
-
-import static org.apache.hadoop.hdds.ratis.RatisUpgradeUtils.waitForAllTxnsApplied;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-
-import org.apache.hadoop.test.LambdaTestUtils;
-import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.impl.RaftServerProxy;
-import org.apache.ratis.server.impl.ServerState;
-import org.apache.ratis.server.protocol.TermIndex;
-import org.apache.ratis.server.raftlog.RaftLog;
-import org.apache.ratis.statemachine.StateMachine;
-import org.junit.Test;
-
-/**
- * Testing util methods in TestRatisUpgradeUtils.
- */
-public class TestRatisUpgradeUtils {
-
-  @Test
-  public void testWaitForAllTxnsApplied() throws IOException,
-      InterruptedException {
-
-    StateMachine stateMachine = mock(StateMachine.class);
-    RaftGroup raftGroup = RaftGroup.emptyGroup();
-    RaftServerProxy raftServerProxy = mock(RaftServerProxy.class);
-    RaftServerImpl raftServer = mock(RaftServerImpl.class);
-    ServerState serverState = mock(ServerState.class);
-    RaftLog raftLog = mock(RaftLog.class);
-
-    when(raftServerProxy.getImpl(
-        raftGroup.getGroupId())).thenReturn(raftServer);
-    when(raftServer.getState()).thenReturn(serverState);
-    when(serverState.getLog()).thenReturn(raftLog);
-    when(raftLog.getLastCommittedIndex()).thenReturn(1L);
-
-    TermIndex termIndex = mock(TermIndex.class);
-    when(termIndex.getIndex()).thenReturn(0L).thenReturn(0L).thenReturn(1L);
-    when(stateMachine.getLastAppliedTermIndex()).thenReturn(termIndex);
-    when(stateMachine.takeSnapshot()).thenReturn(1L);
-
-    waitForAllTxnsApplied(stateMachine, raftGroup, raftServerProxy, 10, 2);
-    verify(stateMachine.getLastAppliedTermIndex(),
-        times(4)); // 3 checks + 1 after snapshot
-  }
-
-  @Test
-  public void testWaitForAllTxnsAppliedTimeOut() throws Exception {
-
-    StateMachine stateMachine = mock(StateMachine.class);
-    RaftGroup raftGroup = RaftGroup.emptyGroup();
-    RaftServerProxy raftServerProxy = mock(RaftServerProxy.class);
-    RaftServerImpl raftServer = mock(RaftServerImpl.class);
-    ServerState serverState = mock(ServerState.class);
-    RaftLog raftLog = mock(RaftLog.class);
-
-    when(raftServerProxy.getImpl(
-        raftGroup.getGroupId())).thenReturn(raftServer);
-    when(raftServer.getState()).thenReturn(serverState);
-    when(serverState.getLog()).thenReturn(raftLog);
-    when(raftLog.getLastCommittedIndex()).thenReturn(1L);
-
-    TermIndex termIndex = mock(TermIndex.class);
-    when(termIndex.getIndex()).thenReturn(0L);
-    when(stateMachine.getLastAppliedTermIndex()).thenReturn(termIndex);
-    when(stateMachine.takeSnapshot()).thenReturn(1L);
-
-    LambdaTestUtils.intercept(IOException.class, "State Machine has not " +
-        "applied  all the transactions", () ->
-        waitForAllTxnsApplied(stateMachine, raftGroup, raftServerProxy,
-            10, 2));
-  }
-
-}
\ No newline at end of file
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
index 8015fed..fc3e61b 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
@@ -290,6 +290,7 @@ public final class OmUtils {
     case PurgeKeys:
     case RecoverTrash:
     case FinalizeUpgrade:
+    case Prepare:
     case DeleteOpenKeys:
       return false;
     default:
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
index 3c28ae6..c1e8148 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
@@ -236,6 +236,7 @@ public class OMException extends IOException {
     PERSIST_UPGRADE_TO_LAYOUT_VERSION_FAILED,
     REMOVE_UPGRADE_TO_LAYOUT_VERSION_FAILED,
     UPDATE_LAYOUT_VERSION_FAILED,
-    LAYOUT_FEATURE_FINALIZATION_FAILED;
+    LAYOUT_FEATURE_FINALIZATION_FAILED,
+    PREPARE_FAILED
   }
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerPrepare.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerPrepare.java
new file mode 100644
index 0000000..711ec2a
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerPrepare.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
+
+import java.io.File;
+import java.nio.file.Paths;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.TestHelper;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.ratis.OMTransactionInfo;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareRequest;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test OM prepare against actual mini cluster.
+ */
+public class TestOzoneManagerPrepare extends TestOzoneManagerHA {
+
+  private final String keyPrefix = "key";
+  private final int timeoutMillis = 30000;
+
+  /**
+   * Calls prepare on all OMs when they have no transaction information.
+   * Checks that they are brought into prepare mode successfully.
+   */
+  @Test
+  public void testPrepareWithoutTransactions() throws Exception {
+    MiniOzoneHAClusterImpl cluster = getCluster();
+    OzoneManager leader = cluster.getOMLeader();
+    OMResponse omResponse = submitPrepareRequest(leader.getOmRatisServer());
+    // Get the log index of the prepare request.
+    long prepareRequestLogIndex =
+        omResponse.getPrepareResponse().getTxnID();
+
+    // Prepare response processing is included in the snapshot,
+    // giving index of 1.
+    Assert.assertEquals(1, prepareRequestLogIndex);
+    for (OzoneManager om: cluster.getOzoneManagersList()) {
+      // Leader should be prepared as soon as it returns response.
+      if (om == leader) {
+        checkPrepared(om, prepareRequestLogIndex);
+      } else {
+        waitAndCheckPrepared(om, prepareRequestLogIndex);
+      }
+    }
+  }
+
+  /**
+   * Writes data to the cluster via the leader OM, and then prepares it.
+   * Checks that every OM is prepared successfully.
+   */
+  @Test
+  public void testPrepareWithTransactions() throws Exception {
+    MiniOzoneHAClusterImpl cluster = getCluster();
+    OzoneClient ozClient = OzoneClientFactory.getRpcClient(getConf());
+
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    ObjectStore store = ozClient.getObjectStore();
+
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+
+    Set<String> writtenKeys = new HashSet<>();
+    for (int i = 1; i <= 10; i++) {
+      String keyName = keyPrefix + i;
+      writeTestData(store, volumeName, bucketName, keyName);
+      writtenKeys.add(keyName);
+    }
+
+    // Make sure all OMs have logs from writing data, so we can check that
+    // they are purged after prepare.
+    for (OzoneManager om: cluster.getOzoneManagersList()) {
+      LambdaTestUtils.await(timeoutMillis, 1000,
+          () -> logFilesPresentInRatisPeer(om));
+    }
+
+    OzoneManager leader = cluster.getOMLeader();
+    OMResponse omResponse = submitPrepareRequest(leader.getOmRatisServer());
+    // Get the log index of the prepare request.
+    long prepareRequestLogIndex =
+        omResponse.getPrepareResponse().getTxnID();
+
+    // Make sure all OMs are prepared and all OMs still have their data.
+    for (OzoneManager om: cluster.getOzoneManagersList()) {
+      // Leader should be prepared as soon as it returns response.
+      if (om == leader) {
+        checkPrepared(om, prepareRequestLogIndex);
+      } else {
+        waitAndCheckPrepared(om, prepareRequestLogIndex);
+      }
+
+      List<OmKeyInfo> keys = om.getMetadataManager().listKeys(volumeName,
+          bucketName, null, keyPrefix, 100);
+
+      Assert.assertEquals(writtenKeys.size(), keys.size());
+      for (OmKeyInfo keyInfo: keys) {
+        Assert.assertTrue(writtenKeys.contains(keyInfo.getKeyName()));
+      }
+    }
+  }
+
+  /**
+   * Writes data to the cluster.
+   * Shuts down one OM.
+   * Writes more data to the cluster.
+   * Submits prepare as ratis request.
+   * Checks that two live OMs are prepared.
+   * Revives the third OM
+   * Checks that third OM received all transactions and is prepared.
+   * @throws Exception
+   */
+  // TODO: Fix this test so it passes.
+  //   @Test
+  public void testPrepareDownedOM() throws Exception {
+    // Index of the OM that will be shut down during this test.
+    final int shutdownOMIndex = 2;
+
+    MiniOzoneHAClusterImpl cluster = getCluster();
+    OzoneClient ozClient = OzoneClientFactory.getRpcClient(getConf());
+
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    ObjectStore store = ozClient.getObjectStore();
+
+    // Create keys with all 3 OMs up.
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+
+    Set<String> writtenKeys = new HashSet<>();
+    for (int i = 1; i <= 50; i++) {
+      String keyName = keyPrefix + i;
+      writeTestData(store, volumeName, bucketName, keyName);
+      writtenKeys.add(keyName);
+    }
+
+    // Make sure all OMs have logs from writing data, so we can check that
+    // they are purged after prepare.
+    for (OzoneManager om: cluster.getOzoneManagersList()) {
+      LambdaTestUtils.await(timeoutMillis, 1000,
+          () -> logFilesPresentInRatisPeer(om));
+    }
+
+    // Shut down one OM.
+    cluster.stopOzoneManager(shutdownOMIndex);
+    OzoneManager downedOM = cluster.getOzoneManager(shutdownOMIndex);
+    Assert.assertFalse(downedOM.isRunning());
+
+    // Write keys with the remaining OMs up.
+    for (int i = 51; i <= 100; i++) {
+      String keyName = keyPrefix + i;
+      writeTestData(store, volumeName, bucketName, keyName);
+      writtenKeys.add(keyName);
+    }
+
+    // Submit prepare request via Ratis.
+    OzoneManager leaderOM = cluster.getOMLeader();
+    long prepareIndex = submitPrepareRequest(leaderOM.getOmRatisServer())
+            .getPrepareResponse()
+            .getTxnID();
+
+    // Check that the two live OMs are prepared.
+    for (OzoneManager om: cluster.getOzoneManagersList()) {
+      if (om == leaderOM) {
+        // Leader should have been prepared after we got the response.
+        checkPrepared(om, prepareIndex);
+      } else if (om != downedOM) {
+        // Follower may still be applying transactions.
+        waitAndCheckPrepared(om, prepareIndex);
+      }
+    }
+
+    // Restart the downed OM and wait for it to catch up.
+    // Since prepare was the last Ratis transaction, it should have all data
+    // it missed once it receives the prepare transaction.
+    cluster.restartOzoneManager(downedOM, true);
+    // Wait for other OMs to catch this one up on transactions.
+    LambdaTestUtils.await(timeoutMillis, 1000,
+        () -> downedOM.getRatisSnapshotIndex() == prepareIndex);
+    checkPrepared(downedOM, prepareIndex);
+
+    // Make sure all OMs are prepared and still have data.
+    for (OzoneManager om: cluster.getOzoneManagersList()) {
+      waitAndCheckPrepared(om, prepareIndex);
+
+      List<OmKeyInfo> readKeys = om.getMetadataManager().listKeys(volumeName,
+          bucketName, null, keyPrefix, 100);
+
+      Assert.assertEquals(writtenKeys.size(), readKeys.size());
+      for (OmKeyInfo keyInfo: readKeys) {
+        Assert.assertTrue(writtenKeys.contains(keyInfo.getKeyName()));
+      }
+    }
+  }
+
+  private boolean logFilesPresentInRatisPeer(OzoneManager om) {
+    String ratisDir = om.getOmRatisServer().getServer().getProperties()
+        .get("raft.server.storage.dir");
+    String groupIdDirName =
+        om.getOmRatisServer().getServer().getGroupIds().iterator()
+            .next().getUuid().toString();
+    File logDir = Paths.get(ratisDir, groupIdDirName, "current")
+        .toFile();
+
+    for (File file : logDir.listFiles()) {
+      if (file.getName().startsWith("log")) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private void writeTestData(ObjectStore store, String volumeName,
+                             String bucketName, String keyName)
+      throws Exception {
+    String keyString = UUID.randomUUID().toString();
+    byte[] data = ContainerTestHelper.getFixedLengthString(
+        keyString, 100).getBytes(UTF_8);
+    OzoneOutputStream keyStream = TestHelper.createKey(
+        keyName, ReplicationType.STAND_ALONE, ReplicationFactor.ONE,
+        100, store, volumeName, bucketName);
+    keyStream.write(data);
+    keyStream.close();
+  }
+
+  private OMRequest buildPrepareRequest() {
+    PrepareRequest requestProto = PrepareRequest.newBuilder().build();
+
+    return OMRequest.newBuilder()
+        .setPrepareRequest(requestProto)
+        .setCmdType(Type.Prepare)
+        .setClientId(UUID.randomUUID().toString())
+        .build();
+  }
+
+  private void waitAndCheckPrepared(OzoneManager om,
+      long prepareRequestLogIndex) throws Exception {
+    // Log files are deleted after the snapshot is taken,
+    // So once log files have been deleted, OM should be prepared.
+    LambdaTestUtils.await(timeoutMillis, 1000,
+        () -> !logFilesPresentInRatisPeer(om));
+    checkPrepared(om, prepareRequestLogIndex);
+  }
+
+  private void checkPrepared(OzoneManager om, long prepareRequestLogIndex)
+      throws Exception {
+    Assert.assertFalse(logFilesPresentInRatisPeer(om));
+
+    // If no transactions have been persisted to the DB, transaction info
+    // will be null, not zero.
+    // This will cause a null pointer exception if we use
+    // OzoneManager#getRatisSnapshotIndex to get the index from the DB.
+    OMTransactionInfo txnInfo = om.getMetadataManager()
+        .getTransactionInfoTable().get(TRANSACTION_INFO_KEY);
+    if (prepareRequestLogIndex == 0) {
+      Assert.assertNull(txnInfo);
+    } else {
+      Assert.assertEquals(txnInfo.getTransactionIndex(),
+          prepareRequestLogIndex);
+    }
+  }
+
+  private OMResponse submitPrepareRequest(OzoneManagerRatisServer server)
+      throws Exception {
+    PrepareRequest requestProto = PrepareRequest.newBuilder().build();
+
+    OMRequest omRequest = OMRequest.newBuilder()
+        .setPrepareRequest(requestProto)
+        .setCmdType(Type.Prepare)
+        .setClientId(UUID.randomUUID().toString())
+        .build();
+
+    RaftClientRequest raftClientRequest = new RaftClientRequest(
+        ClientId.randomId(),
+        server.getRaftPeerId(),
+        server.getRaftGroupId(),
+        0,
+        Message.valueOf(OMRatisHelper.convertRequestToByteString(omRequest)),
+        RaftClientRequest.writeRequestType(), null);
+
+    return server.submitRequest(omRequest, raftClientRequest);
+  }
+}
\ No newline at end of file
diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 72cea50..5a90a75 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -74,6 +74,7 @@ enum Type {
   DBUpdates = 53;
   FinalizeUpgrade = 54;
   FinalizeUpgradeProgress = 55;
+  Prepare = 56;
 
   GetDelegationToken = 61;
   RenewDelegationToken = 62;
@@ -145,6 +146,7 @@ message OMRequest {
   optional DBUpdatesRequest                  dbUpdatesRequest              = 53;
   optional FinalizeUpgradeRequest           finalizeUpgradeRequest         = 54;
   optional FinalizeUpgradeProgressRequest   finalizeUpgradeProgressRequest = 55;
+  optional PrepareRequest                   prepareRequest                 = 56;
 
   optional hadoop.common.GetDelegationTokenRequestProto getDelegationTokenRequest = 61;
   optional hadoop.common.RenewDelegationTokenRequestProto renewDelegationTokenRequest= 62;
@@ -220,6 +222,7 @@ message OMResponse {
   optional DBUpdatesResponse                 dbUpdatesResponse             = 52;
   optional FinalizeUpgradeResponse           finalizeUpgradeResponse       = 54;
   optional FinalizeUpgradeProgressResponse finalizeUpgradeProgressResponse = 55;
+  optional PrepareResponse                 prepareResponse                 = 56;
 
   optional GetDelegationTokenResponseProto getDelegationTokenResponse = 61;
   optional RenewDelegationTokenResponseProto renewDelegationTokenResponse = 62;
@@ -329,6 +332,7 @@ enum Status {
     REMOVE_UPGRADE_TO_LAYOUT_VERSION_FAILED = 68;
     UPDATE_LAYOUT_VERSION_FAILED = 69;
     LAYOUT_FEATURE_FINALIZATION_FAILED = 70;
+    PREPARE_FAILED = 71;
 }
 
 /**
@@ -1074,6 +1078,14 @@ message FinalizeUpgradeProgressResponse {
     required UpgradeFinalizationStatus status = 1;
 }
 
+message PrepareRequest {
+
+}
+
+message PrepareResponse {
+    required uint64 txnID = 1;
+}
+
 message ServicePort {
     enum Type {
         RPC = 1;
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMStarterInterface.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMStarterInterface.java
index 14252a7..f632ad1 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMStarterInterface.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMStarterInterface.java
@@ -30,6 +30,4 @@ public interface OMStarterInterface {
       AuthenticationException;
   boolean init(OzoneConfiguration conf) throws IOException,
       AuthenticationException;
-  boolean prepareForUpgrade(OzoneConfiguration conf) throws IOException,
-      AuthenticationException;
 }
\ No newline at end of file
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index bee4861..e5cee22 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -200,7 +200,6 @@ import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED_DEFAULT;
 import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForBlockClients;
 import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForClients;
-import static org.apache.hadoop.hdds.ratis.RatisUpgradeUtils.waitForAllTxnsApplied;
 import static org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest.getEncodedString;
 import static org.apache.hadoop.hdds.server.ServerUtils.getRemoteUserName;
 import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
@@ -224,12 +223,10 @@ import static org.apache.hadoop.ozone.OzoneConsts.RPC_PORT;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS_DEFAULT;
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_FLUSH_TXNS_RETRY_INTERVAL_SECONDS;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HANDLER_COUNT_DEFAULT;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HANDLER_COUNT_KEY;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_KERBEROS_KEYTAB_FILE_KEY;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY;
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_MAX_TIME_TO_WAIT_FLUSH_TXNS;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_METRICS_SAVE_INTERVAL;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_METRICS_SAVE_INTERVAL_DEFAULT;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_USER_MAX_VOLUME;
@@ -246,7 +243,6 @@ import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.
 
 import org.apache.hadoop.util.Time;
 import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
-import org.apache.ratis.server.impl.RaftServerProxy;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.util.ExitUtils;
 import org.apache.ratis.util.FileUtils;
@@ -341,7 +337,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   private final boolean useRatisForReplication;
 
   private boolean isNativeAuthorizerEnabled;
-  private boolean prepareForUpgrade;
+  private boolean isPrepared;
 
   private ExitManager exitManager;
 
@@ -356,6 +352,9 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   private State omState;
   private Thread emptier;
 
+  // TODO: Utilize the forUpgrade parameter to remove the marker file and
+  //  take the OM out of prepare mode on startup.
+  @SuppressWarnings("methodlength")
   private OzoneManager(OzoneConfiguration conf, boolean forUpgrade)
       throws IOException, AuthenticationException {
     super(OzoneVersionInfo.OZONE_VERSION_INFO);
@@ -504,8 +503,11 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     };
     ShutdownHookManager.get().addShutdownHook(shutdownHook,
         SHUTDOWN_HOOK_PRIORITY);
-    this.prepareForUpgrade = forUpgrade;
     omState = State.INITIALIZED;
+
+    // TODO: When marker file is added, check for that on startup to
+    //  determine prepare mode.
+    this.isPrepared = false;
   }
 
   private void logVersionMismatch(OzoneConfiguration conf, ScmInfo scmInfo) {
@@ -1020,39 +1022,6 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     }
   }
 
-  public boolean applyAllPendingTransactions()
-      throws InterruptedException, IOException {
-
-    if (!isRatisEnabled) {
-      LOG.info("Ratis not enabled. Nothing to do.");
-      return true;
-    }
-
-    waitForAllTxnsApplied(omRatisServer.getOmStateMachine(),
-        omRatisServer.getRaftGroup(),
-        (RaftServerProxy) omRatisServer.getServer(),
-        OZONE_OM_MAX_TIME_TO_WAIT_FLUSH_TXNS,
-        OZONE_OM_FLUSH_TXNS_RETRY_INTERVAL_SECONDS);
-
-    long appliedIndexFromRatis =
-        omRatisServer.getOmStateMachine().getLastAppliedTermIndex().getIndex();
-    OMTransactionInfo omTransactionInfo =
-        OMTransactionInfo.readTransactionInfo(metadataManager);
-    long index = omTransactionInfo.getTermIndex().getIndex();
-    if (index != appliedIndexFromRatis) {
-      throw new IllegalStateException(
-          String.format("Cannot prepare OM for Upgrade " +
-          "since transaction info table index %d does not match ratis %s",
-              index, appliedIndexFromRatis));
-    }
-
-    LOG.info("OM has been prepared for upgrade. All transactions " +
-        "upto {} have been flushed to the state machine, " +
-        "and a snapshot has been taken.",
-        omRatisServer.getOmStateMachine().getLastAppliedTermIndex().getIndex());
-    return true;
-  }
-
   /**
    * Initializes secure OzoneManager.
    */
@@ -1239,10 +1208,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
       LOG.error("OM HttpServer failed to start.", ex);
     }
 
-    if (!prepareForUpgrade) {
-      omRpcServer.start();
-      isOmRpcServerRunning = true;
-    }
+    omRpcServer.start();
+    isOmRpcServerRunning = true;
 
     // TODO: Start this thread only on the leader node.
     //  Should be fixed after HDDS-4451.
@@ -1253,7 +1220,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     startJVMPauseMonitor();
     setStartTime();
 
-    if (!prepareForUpgrade) {
+    if (!isPrepared) {
       omState = State.RUNNING;
     } else {
       omState = State.PREPARING_FOR_UPGRADE;
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerStarter.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerStarter.java
index aac97fa..76e2ced 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerStarter.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerStarter.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import picocli.CommandLine;
 import picocli.CommandLine.Command;
 
@@ -100,28 +99,29 @@ public class OzoneManagerStarter extends GenericCli {
     }
   }
 
-
-  /**
-   * This function implements a sub-command to allow the OM to be
-   * "prepared for upgrade".
-   */
-  @CommandLine.Command(name = "--prepareForUpgrade",
-      aliases = {"--prepareForDowngrade", "--flushTransactions"},
-      customSynopsis = "ozone om [global options] --prepareForUpgrade",
-      hidden = false,
-      description = "Prepare the OM for upgrade/downgrade. (Flush Raft log " +
-          "transactions.)",
-      mixinStandardHelpOptions = true,
-      versionProvider = HddsVersionProvider.class)
-  @SuppressFBWarnings("DM_EXIT")
-  public void prepareOmForUpgrade() throws Exception {
-    commonInit();
-    boolean result = receiver.prepareForUpgrade(conf);
-    if (!result) {
-      throw new Exception("Prepare OM For Upgrade failed.");
-    }
-    System.exit(0);
-  }
+  // TODO: Convert this flag to bring the OM out of prepare mode with the new
+  //  bits when prepare marker files have been implemented.
+//  /**
+//   * This function implements a sub-command to allow the OM to be
+//   * "prepared for upgrade".
+//   */
+//  @CommandLine.Command(name = "--prepareForUpgrade",
+//      aliases = {"--prepareForDowngrade", "--flushTransactions"},
+//      customSynopsis = "ozone om [global options] --prepareForUpgrade",
+//      hidden = false,
+//      description = "Prepare the OM for upgrade/downgrade. (Flush Raft log " +
+//          "transactions.)",
+//      mixinStandardHelpOptions = true,
+//      versionProvider = HddsVersionProvider.class)
+//  @SuppressFBWarnings("DM_EXIT")
+//  public void prepareOmForUpgrade() throws Exception {
+//    commonInit();
+//    boolean result = receiver.prepareForUpgrade(conf);
+//    if (!result) {
+//      throw new Exception("Prepare OM For Upgrade failed.");
+//    }
+//    System.exit(0);
+//  }
 
   /**
    * This function should be called by each command to ensure the configuration
@@ -143,7 +143,7 @@ public class OzoneManagerStarter extends GenericCli {
    * testing.
    */
   static class OMStarterHelper implements OMStarterInterface{
-
+    @Override
     public void start(OzoneConfiguration conf) throws IOException,
         AuthenticationException {
       OzoneManager om = OzoneManager.createOm(conf);
@@ -151,26 +151,11 @@ public class OzoneManagerStarter extends GenericCli {
       om.join();
     }
 
+    @Override
     public boolean init(OzoneConfiguration conf) throws IOException,
         AuthenticationException {
       return OzoneManager.omInit(conf);
     }
-
-    public boolean prepareForUpgrade(OzoneConfiguration conf)
-        throws IOException, AuthenticationException {
-      try (OzoneManager om = OzoneManager.createOmUpgradeMode(conf)) {
-        om.start();
-        boolean success = false;
-        try {
-          LOG.info("Preparing OM for upgrade.");
-          success = om.applyAllPendingTransactions();
-        } catch (InterruptedException e) {
-          LOG.error("Error preparing OM for upgrade.", e);
-          Thread.currentThread().interrupt();
-        }
-        return success;
-      }
-    }
   }
 
 }
\ No newline at end of file
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java
index 1161fd0..1302922 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java
@@ -69,15 +69,6 @@ public class OMDBDefinition implements DBDefinition {
                     OmVolumeArgs.class,
                     new OmVolumeArgsCodec());
 
-  public static final DBColumnFamilyDefinition<String, String>
-            S3_TABLE=
-            new DBColumnFamilyDefinition<>(
-                    "s3Table",
-                    String.class,
-                    new StringCodec(),
-                    String.class,
-                    new StringCodec());
-
   public static final DBColumnFamilyDefinition<String, OmKeyInfo>
             OPEN_KEY_TABLE =
             new DBColumnFamilyDefinition<>(
@@ -164,7 +155,7 @@ public class OMDBDefinition implements DBDefinition {
   @Override
   public DBColumnFamilyDefinition[] getColumnFamilies() {
     return new DBColumnFamilyDefinition[] {DELETED_TABLE, USER_TABLE,
-        VOLUME_TABLE, S3_TABLE, OPEN_KEY_TABLE, KEY_TABLE,
+        VOLUME_TABLE, OPEN_KEY_TABLE, KEY_TABLE,
         BUCKET_TABLE, MULTIPART_INFO_TABLE, PREFIX_TABLE, DTOKEN_TABLE,
         S3_SECRET_TABLE, TRANSACTION_INFO_TABLE};
   }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
new file mode 100644
index 0000000..021d7df
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.upgrade;
+
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.ratis.OMTransactionInfo;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.upgrade.OMPrepareResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+
+import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
+import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerProxy;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.statemachine.StateMachine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * OM Request used to flush all transactions to disk, take a DB snapshot, and
+ * purge the logs, leaving Ratis in a clean state without unapplied log
+ * entries. This prepares the OM for upgrades/downgrades so that no request
+ * in the log is applied to the database in the old version of the code in one
+ * OM, and the new version of the code on another OM.
+ */
+public class OMPrepareRequest extends OMClientRequest {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMPrepareRequest.class);
+
+  // Allow double buffer this many seconds to flush all transactions before
+  // returning an error to the caller.
+  private static final Duration DOUBLE_BUFFER_FLUSH_TIMEOUT =
+      Duration.of(5, ChronoUnit.MINUTES);
+  // Time between checks to see if double buffer finished flushing.
+  private static final Duration DOUBLE_BUFFER_FLUSH_CHECK_INTERVAL =
+      Duration.of(1, ChronoUnit.SECONDS);
+
+  public OMPrepareRequest(OMRequest omRequest) {
+    super(omRequest);
+  }
+
+  @Override
+  public OMClientResponse validateAndUpdateCache(
+      OzoneManager ozoneManager, long transactionLogIndex,
+      OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper) {
+
+    LOG.info("Received prepare request with log index {}", transactionLogIndex);
+
+    OMResponse.Builder responseBuilder =
+        OmResponseUtil.getOMResponseBuilder(getOmRequest());
+    responseBuilder.setCmdType(Type.Prepare);
+    OMClientResponse response = null;
+
+    try {
+      // Create response.
+      PrepareResponse omResponse = PrepareResponse.newBuilder()
+              .setTxnID(transactionLogIndex)
+              .build();
+      responseBuilder.setPrepareResponse(omResponse);
+      response = new OMPrepareResponse(responseBuilder.build());
+
+      // Add response to double buffer before clearing logs.
+      // This guarantees the log index of this request will be the same as
+      // the snapshot index in the prepared state.
+      ozoneManagerDoubleBufferHelper.add(response, transactionLogIndex);
+
+      OzoneManagerRatisServer omRatisServer = ozoneManager.getOmRatisServer();
+      RaftServerProxy server = (RaftServerProxy) omRatisServer.getServer();
+      RaftServerImpl serverImpl =
+          server.getImpl(omRatisServer.getRaftGroup().getGroupId());
+
+      // Wait for outstanding double buffer entries to flush to disk,
+      // so they will not be purged from the log before being persisted to
+      // the DB.
+      // Since the response for this request was added to the double buffer
+      // already, once this index reaches the state machine, we know all
+      // transactions have been flushed.
+      waitForLogIndex(transactionLogIndex,
+          ozoneManager.getMetadataManager(), serverImpl);
+      takeSnapshotAndPurgeLogs(serverImpl);
+
+      // TODO: Create marker file with txn index.
+
+      LOG.info("OM prepared at log index {}. Returning response {}",
+          ozoneManager.getRatisSnapshotIndex(), omResponse);
+    } catch (OMException e) {
+      response = new OMPrepareResponse(
+          createErrorOMResponse(responseBuilder, e));
+    } catch (InterruptedException | IOException e) {
+      // Set error code so that prepare failure does not cause the OM to
+      // terminate.
+      response = new OMPrepareResponse(
+          createErrorOMResponse(responseBuilder, new OMException(e,
+              OMException.ResultCodes.PREPARE_FAILED)));
+    }
+
+    return response;
+  }
+
+  /**
+   * Waits for the specified index to be flushed to the state machine on
+   * disk, and to be updated in memory in Ratis.
+   */
+  private static void waitForLogIndex(long indexToWaitFor,
+      OMMetadataManager metadataManager, RaftServerImpl server)
+      throws InterruptedException, IOException {
+
+    long endTime = System.currentTimeMillis() +
+        DOUBLE_BUFFER_FLUSH_TIMEOUT.toMillis();
+    boolean success = false;
+
+    while (!success && System.currentTimeMillis() < endTime) {
+      // If no transactions have been persisted to the DB, transaction info
+      // will be null, not zero, causing a null pointer exception within
+      // ozoneManager#getRatisSnaphotIndex.
+      // Get the transaction directly instead to handle the case when it is
+      // null.
+      OMTransactionInfo dbTxnInfo = metadataManager
+          .getTransactionInfoTable().get(TRANSACTION_INFO_KEY);
+      long ratisTxnIndex =
+          server.getStateMachine().getLastAppliedTermIndex().getIndex();
+
+      // Ratis may apply meta transactions after the prepare request, causing
+      // its in memory index to always be greater than the DB index.
+      if (dbTxnInfo == null) {
+        // If there are no transactions in the DB, we are prepared to log
+        // index 0 only.
+        success = (indexToWaitFor == 0)
+            && (ratisTxnIndex >= indexToWaitFor);
+      } else {
+        success = (dbTxnInfo.getTransactionIndex() == indexToWaitFor)
+            && (ratisTxnIndex >= indexToWaitFor);
+      }
+
+      if (!success) {
+        Thread.sleep(DOUBLE_BUFFER_FLUSH_CHECK_INTERVAL.toMillis());
+      }
+    }
+
+    // If the timeout waiting for all transactions to reach the state machine
+    // is exceeded, the exception is propagated, resulting in an error response
+    // to the client. They can retry the prepare request.
+    if (!success) {
+      throw new IOException(String.format("After waiting for %d seconds, " +
+              "State Machine has not applied  all the transactions.",
+          DOUBLE_BUFFER_FLUSH_TIMEOUT.toMillis() * 1000));
+    }
+  }
+
+  /**
+   * Take a snapshot of the state machine at the last index, and purge ALL logs.
+   * @param impl RaftServerImpl instance
+   * @throws IOException on Error.
+   */
+  public static long takeSnapshotAndPurgeLogs(RaftServerImpl impl)
+      throws IOException {
+
+    StateMachine stateMachine = impl.getStateMachine();
+    long snapshotIndex = stateMachine.takeSnapshot();
+    RaftLog raftLog = impl.getState().getLog();
+    long raftLogIndex = raftLog.getLastEntryTermIndex().getIndex();
+
+    // Ensure that Ratis's in memory snapshot index is the same as the index
+    // of its last log entry.
+    if (snapshotIndex != raftLogIndex) {
+      throw new IOException("Snapshot index " + snapshotIndex + " does not " +
+          "match last log index " + raftLogIndex);
+    }
+
+    CompletableFuture<Long> purgeFuture =
+        raftLog.syncWithSnapshot(snapshotIndex);
+    try {
+      Long purgeIndex = purgeFuture.get();
+      if (purgeIndex != snapshotIndex) {
+        throw new IOException("Purge index " + purgeIndex +
+            " does not match last index " + snapshotIndex);
+      }
+    } catch (Exception e) {
+      throw new IOException("Unable to purge logs.", e);
+    }
+
+    return snapshotIndex;
+  }
+
+  public static String getRequestType() {
+    return Type.Prepare.name();
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/upgrade/OMPrepareResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/upgrade/OMPrepareResponse.java
new file mode 100644
index 0000000..cac9049
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/upgrade/OMPrepareResponse.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.response.upgrade;
+
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+
+import java.io.IOException;
+
+/**
+ * Response for prepare request.
+ */
+@CleanupTableInfo()
+public class OMPrepareResponse extends OMClientResponse {
+  public OMPrepareResponse(
+      OzoneManagerProtocolProtos.OMResponse omResponse) {
+    super(omResponse);
+  }
+
+  @Override
+  protected void addToDBBatch(OMMetadataManager omMetadataManager,
+      BatchOperation batchOperation) throws IOException {
+    // The Prepare request will create a marker file to indicate the OM is
+    // prepared, so there is no DB update for the prepare operation.
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java
index 71193c9..2364005 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java
@@ -33,6 +33,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -44,8 +45,6 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRE
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DB_DIRS;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertThat;
 
 /**
  * Tests OzoneManager MetadataManager.
@@ -616,10 +615,10 @@ public class TestOmMetadataManager {
 
   @Test
   public void testAllTablesAreProperInOMMetadataManagerImpl() {
-    String[] tablesByDefinition = OmMetadataManagerImpl.ALL_TABLES;
-
+    Set<String> tablesByDefinition =
+        new HashSet<>(Arrays.asList(OmMetadataManagerImpl.ALL_TABLES));
     Set<String> tablesInManager = omMetadataManager.listTableNames();
 
-    assertThat(tablesInManager, containsInAnyOrder(tablesByDefinition));
+    Assert.assertEquals(tablesByDefinition, tablesInManager);
   }
 }
\ No newline at end of file
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java
index 285c992..a0a21b9 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java
@@ -123,8 +123,8 @@ public class TestOzoneManagerStateMachine {
   public void testApplyTransactionsUpdateLastAppliedIndexCalledLate() {
     // Now try a scenario where 1,2,3 transactions are in applyTransactionMap
     // and updateLastAppliedIndex is not called for them, and before that
-    // notifyIndexUpdate is called with transaction 4. And see now at the end
-    // when updateLastAppliedIndex is called with epochs we have
+    // notifyTermIndexUpdated is called with transaction 4. And see now at the
+    // end when updateLastAppliedIndex is called with epochs we have
     // lastAppliedIndex as 4 or not.
 
     // Conf/metadata transaction.
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/TestCleanupTableInfo.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/TestCleanupTableInfo.java
index 90c08cb..99944ce 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/TestCleanupTableInfo.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/TestCleanupTableInfo.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.ozone.om.request.file.OMFileCreateRequest;
 import org.apache.hadoop.ozone.om.request.key.OMKeyCreateRequest;
 import org.apache.hadoop.ozone.om.response.file.OMFileCreateResponse;
 import org.apache.hadoop.ozone.om.response.key.OMKeyCreateResponse;
+import org.apache.hadoop.ozone.om.response.upgrade.OMPrepareResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateFileRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateKeyRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
@@ -155,7 +156,9 @@ public class TestCleanupTableInfo {
         Assert.assertTrue(
             Arrays.stream(cleanupTables).allMatch(tables::contains)
         );
-      } else {
+      } else if (aClass != OMPrepareResponse.class) {
+        // Prepare response is allowed to have no cleanup tables, since it
+        // does not modify the DB.
         Assert.assertTrue(cleanupAll);
       }
     });


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org