You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2019/04/04 05:50:39 UTC

[hadoop] branch trunk updated: HDDS-1339. Implement ratis snapshots on OM (#651)

This is an automated email from the ASF dual-hosted git repository.

hanishakoneru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f09a78f  HDDS-1339. Implement ratis snapshots on OM (#651)
f09a78f is described below

commit f09a78f73fbb5bed5bac87ab86354b63bb605e5b
Author: Hanisha Koneru <ko...@gmail.com>
AuthorDate: Wed Apr 3 22:50:28 2019 -0700

    HDDS-1339. Implement ratis snapshots on OM (#651)
---
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |   3 +
 .../org/apache/hadoop/ozone/common/Storage.java    |   2 +-
 .../java/org/apache/hadoop/utils/db/DBStore.java   |   6 ++
 .../java/org/apache/hadoop/utils/db/RDBStore.java  |  11 ++
 .../common/src/main/resources/ozone-default.xml    |  13 ++-
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |   6 ++
 .../ozone/om/protocol/OzoneManagerHAProtocol.java  |  10 +-
 .../hadoop/ozone/MiniOzoneHAClusterImpl.java       |   4 +
 .../apache/hadoop/ozone/om/TestOzoneManagerHA.java |  90 +++++++++++++++-
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  45 +++++++-
 .../ozone/om/ratis/OzoneManagerRatisServer.java    |  28 +++--
 .../ozone/om/ratis/OzoneManagerStateMachine.java   | 115 +++++++++++++--------
 12 files changed, 265 insertions(+), 68 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 8e3b02a..3e15241 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -279,4 +279,7 @@ public final class OzoneConsts {
 
   // Dummy OMNodeID for OM Clients to use for a non-HA OM setup
   public static final String OM_NODE_ID_DUMMY = "omNodeIdDummy";
+
+  // OM Ratis snapshot file to store the last applied index
+  public static final String OM_RATIS_SNAPSHOT_INDEX = "ratisSnapshotIndex";
 }
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Storage.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Storage.java
index 9ad87ae..f393ed9 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Storage.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Storage.java
@@ -145,7 +145,7 @@ public abstract class Storage {
    *
    * @return the directory path
    */
-  private File getCurrentDir() {
+  public File getCurrentDir() {
     return new File(storageDir, STORAGE_DIR_CURRENT);
   }
 
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java
index 0bc30d0..56166ab 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java
@@ -66,6 +66,12 @@ public interface DBStore extends AutoCloseable {
   ArrayList<Table> listTables() throws IOException;
 
   /**
+   * Flush the DB buffer onto persistent storage.
+   * @throws IOException
+   */
+  void flush() throws IOException;
+
+  /**
    * Compact the entire database.
    *
    * @throws IOException on Failure
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java
index 9a7119e..5bb0fa4 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java
@@ -273,6 +273,17 @@ public class RDBStore implements DBStore {
   }
 
   @Override
+  public void flush() throws IOException {
+    final FlushOptions flushOptions = new FlushOptions().setWaitForFlush(true);
+    try {
+      db.flush(flushOptions);
+    } catch (RocksDBException e) {
+      LOG.error("Unable to Flush RocksDB data", e);
+      throw toIOException("Unable to Flush RocksDB data", e);
+    }
+  }
+
+  @Override
   public DBCheckpoint getCheckpoint(boolean flush) {
     final FlushOptions flushOptions = new FlushOptions().setWaitForFlush(flush);
     try {
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index cbd249a..5580548 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1603,19 +1603,28 @@
   <property>
     <name>ozone.om.ratis.log.appender.queue.num-elements</name>
     <value>1024</value>
-    <tag>OZONE, DEBUG, CONTAINER, RATIS</tag>
+    <tag>OZONE, DEBUG, OM, RATIS</tag>
     <description>Number of operation pending with Raft's Log Worker.
     </description>
   </property>
   <property>
     <name>ozone.om.ratis.log.appender.queue.byte-limit</name>
     <value>32MB</value>
-    <tag>OZONE, DEBUG, CONTAINER, RATIS</tag>
+    <tag>OZONE, DEBUG, OM, RATIS</tag>
     <description>Byte limit for Raft's Log Worker queue.
     </description>
   </property>
 
   <property>
+    <name>ozone.om.ratis.snapshot.auto.trigger.threshold</name>
+    <value>400000</value>
+    <tag>OZONE, DEBUG, OM, RATIS</tag>
+    <description>The log index threshold after ratis will auto trigger
+      snapshot on the OM state machine.
+    </description>
+  </property>
+
+  <property>
     <name>ozone.om.ratis.server.request.timeout</name>
     <value>3s</value>
     <tag>OZONE, OM, RATIS, MANAGEMENT</tag>
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 7b13471..60dde44 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -136,6 +136,12 @@ public final class OMConfigKeys {
   public static final String
       OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT = "32MB";
 
+  // OM Snapshot configurations
+  public static final String OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY
+      = "ozone.om.ratis.snapshot.auto.trigger.threshold";
+  public static final long
+      OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_DEFAULT
+      = 400000;
 
   // OM Ratis server configurations
   public static final String OZONE_OM_RATIS_SERVER_REQUEST_TIMEOUT_KEY
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java
index 8357df2..f598997 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerHAProtocol.java
@@ -37,6 +37,14 @@ import java.io.IOException;
 public interface OzoneManagerHAProtocol {
 
   /**
+   * Store the snapshot index i.e. the raft log index, corresponding to the
+   * last transaction applied to the OM RocksDB, in OM metadata dir on disk.
+   * @return the snapshot index
+   * @throws IOException
+   */
+  long saveRatisSnapshot() throws IOException;
+
+  /**
    * Add a allocate block, it is assumed that the client is having an open
    * key session going on. This block will be appended to this open key session.
    * This will be called only during HA enabled OM, as during HA we get an
@@ -56,7 +64,6 @@ public interface OzoneManagerHAProtocol {
   OmKeyLocationInfo addAllocatedBlock(OmKeyArgs args, long clientID,
       KeyLocation keyLocation) throws IOException;
 
-
   /**
    * Add the openKey entry with given keyInfo and clientID in to openKeyTable.
    * This will be called only from applyTransaction, once after calling
@@ -81,5 +88,4 @@ public interface OzoneManagerHAProtocol {
    */
   OmMultipartInfo applyInitiateMultipartUpload(OmKeyArgs omKeyArgs,
       String multipartUploadID) throws IOException;
-
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
index f84f95e..03c2a2c 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
@@ -87,6 +87,10 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
     return this.ozoneManagers.get(index);
   }
 
+  public OzoneManager getOzoneManager(String omNodeId) {
+    return this.ozoneManagerMap.get(omNodeId);
+  }
+
   @Override
   public void restartOzoneManager() throws IOException {
     for (OzoneManager ozoneManager : ozoneManagers) {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
index 93c120a..06009e2 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
@@ -76,6 +76,7 @@ public class TestOzoneManagerHA {
   private String clusterId;
   private String scmId;
   private int numOfOMs = 3;
+  private static final long SNAPSHOT_THRESHOLD = 50;
 
   @Rule
   public ExpectedException exception = ExpectedException.none();
@@ -99,7 +100,9 @@ public class TestOzoneManagerHA {
     conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2);
     conf.setInt(OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY, 10);
     conf.setInt(OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 10);
-
+    conf.setLong(
+        OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY,
+        SNAPSHOT_THRESHOLD);
     cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf)
         .setClusterId(clusterId)
         .setScmId(scmId)
@@ -326,9 +329,8 @@ public class TestOzoneManagerHA {
         throw e;
       }
     }
-
-
   }
+
   /**
    * Create a volume and test its attribute.
    */
@@ -370,8 +372,6 @@ public class TestOzoneManagerHA {
     }
   }
 
-
-
   /**
    * Test that OMFailoverProxyProvider creates an OM proxy for each OM in the
    * cluster.
@@ -533,4 +533,84 @@ public class TestOzoneManagerHA {
           proxyProvider.getCurrentProxyOMNodeId());
     }
   }
+
+  @Test
+  public void testOMRatisSnapshot() throws Exception {
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
+
+    VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
+        .setOwner(userName)
+        .setAdmin(adminName)
+        .build();
+
+    objectStore.createVolume(volumeName, createVolumeArgs);
+    OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName);
+
+    retVolumeinfo.createBucket(bucketName);
+    OzoneBucket ozoneBucket = retVolumeinfo.getBucket(bucketName);
+
+    String leaderOMNodeId = objectStore.getClientProxy().getOMProxyProvider()
+        .getCurrentProxyOMNodeId();
+    OzoneManager ozoneManager = cluster.getOzoneManager(leaderOMNodeId);
+
+    // Send commands to ratis to increase the log index so that ratis
+    // triggers a snapshot on the state machine.
+
+    long appliedLogIndex = 0;
+    while (appliedLogIndex <= SNAPSHOT_THRESHOLD) {
+      createKey(ozoneBucket);
+      appliedLogIndex = ozoneManager.getOmRatisServer()
+          .getStateMachineLastAppliedIndex();
+    }
+
+    GenericTestUtils.waitFor(() -> {
+      if (ozoneManager.loadRatisSnapshotIndex() > 0) {
+        return true;
+      }
+      return false;
+    }, 1000, 100000);
+
+    // The current lastAppliedLogIndex on the state machine should be greater
+    // than or equal to the saved snapshot index.
+    long smLastAppliedIndex =
+        ozoneManager.getOmRatisServer().getStateMachineLastAppliedIndex();
+    long ratisSnapshotIndex = ozoneManager.loadRatisSnapshotIndex();
+    Assert.assertTrue("LastAppliedIndex on OM State Machine ("
+            + smLastAppliedIndex + ") is less than the saved snapshot index("
+            + ratisSnapshotIndex + ").",
+        smLastAppliedIndex >= ratisSnapshotIndex);
+
+    // Add more transactions to Ratis to trigger another snapshot
+    while (appliedLogIndex <= (smLastAppliedIndex + SNAPSHOT_THRESHOLD)) {
+      createKey(ozoneBucket);
+      appliedLogIndex = ozoneManager.getOmRatisServer()
+          .getStateMachineLastAppliedIndex();
+    }
+
+    GenericTestUtils.waitFor(() -> {
+      if (ozoneManager.loadRatisSnapshotIndex() > 0) {
+        return true;
+      }
+      return false;
+    }, 1000, 100000);
+
+    // The new snapshot index must be greater than the previous snapshot index
+    long ratisSnapshotIndexNew = ozoneManager.loadRatisSnapshotIndex();
+    Assert.assertTrue("Latest snapshot index must be greater than previous " +
+        "snapshot indices", ratisSnapshotIndexNew > ratisSnapshotIndex);
+
+  }
+
+  private void createKey(OzoneBucket ozoneBucket) throws IOException {
+    String keyName = "key" + RandomStringUtils.randomNumeric(5);
+    String data = "data" + RandomStringUtils.randomNumeric(5);
+    OzoneOutputStream ozoneOutputStream = ozoneBucket.createKey(keyName,
+        data.length(), ReplicationType.STAND_ALONE,
+        ReplicationFactor.ONE, new HashMap<>());
+    ozoneOutputStream.write(data.getBytes(), 0, data.length());
+    ozoneOutputStream.close();
+  }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index b8da717..7a87b53 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRe
 import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl;
 import org.apache.hadoop.hdds.tracing.TracingUtil;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.util.PersistentLongFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.ipc.Client;
@@ -179,6 +180,7 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_METRICS_FILE;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_METRICS_TEMP_FILE;
 
+import static org.apache.hadoop.ozone.OzoneConsts.OM_RATIS_SNAPSHOT_INDEX;
 import static org.apache.hadoop.ozone.OzoneConsts.RPC_PORT;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
 import static org.apache.hadoop.ozone.om.OMConfigKeys
@@ -233,11 +235,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   private RPC.Server omRpcServer;
   private InetSocketAddress omRpcAddress;
   private String omId;
-  private OMNodeDetails omNodeDetails;
   private List<OMNodeDetails> peerNodes;
-  private boolean isRatisEnabled;
-  private OzoneManagerRatisServer omRatisServer;
-  private OzoneManagerRatisClient omRatisClient;
   private final OMMetadataManager metadataManager;
   private final VolumeManager volumeManager;
   private final BucketManager bucketManager;
@@ -266,6 +264,13 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   private volatile boolean isOmRpcServerRunning = false;
   private String omComponent;
 
+  private boolean isRatisEnabled;
+  private OzoneManagerRatisServer omRatisServer;
+  private OzoneManagerRatisClient omRatisClient;
+  private OMNodeDetails omNodeDetails;
+  private final File ratisSnapshotFile;
+  private long snapshotIndex;
+
   private KeyProviderCryptoExtension kmsProvider = null;
   private static String keyProviderUriKeyName =
       CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH;
@@ -306,6 +311,10 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     startRatisServer();
     startRatisClient();
 
+    this.ratisSnapshotFile = new File(omStorage.getCurrentDir(),
+        OM_RATIS_SNAPSHOT_INDEX);
+    this.snapshotIndex = loadRatisSnapshotIndex();
+
     InetSocketAddress omNodeRpcAddr = omNodeDetails.getRpcAddress();
     omRpcAddressTxt = new Text(omNodeDetails.getRpcAddressString());
 
@@ -1307,6 +1316,33 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     }
   }
 
+  @VisibleForTesting
+  public long loadRatisSnapshotIndex() {
+    if (ratisSnapshotFile.exists()) {
+      try {
+        return PersistentLongFile.readFile(ratisSnapshotFile, 0);
+      } catch (IOException e) {
+        LOG.error("Unable to read the ratis snapshot index (last applied " +
+            "transaction log index)", e);
+      }
+    }
+    return 0;
+  }
+
+  @Override
+  public long saveRatisSnapshot() throws IOException {
+    snapshotIndex = omRatisServer.getStateMachineLastAppliedIndex();
+
+    // Flush the OM state to disk
+    getMetadataManager().getStore().flush();
+
+    PersistentLongFile.writeFile(ratisSnapshotFile, snapshotIndex);
+    LOG.info("Saved Ratis Snapshot on the OM with snapshotIndex {}",
+        snapshotIndex);
+
+    return snapshotIndex;
+  }
+
   /**
    * Stop service.
    */
@@ -2103,7 +2139,6 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     }
   }
 
-
   @Override
   public OmKeyLocationInfo addAllocatedBlock(OmKeyArgs args, long clientID,
       KeyLocation keyLocation) throws IOException {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
index 01979e4..b16f9f2 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.scm.HddsServerUtil;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OMNodeDetails;
-import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol;
 import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.client.RaftClientConfigKeys;
@@ -60,7 +59,6 @@ import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.statemachine.impl.BaseStateMachine;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.util.LifeCycle;
 import org.apache.ratis.util.SizeInBytes;
@@ -83,6 +81,7 @@ public final class OzoneManagerRatisServer {
   private final RaftPeerId raftPeerId;
 
   private final OzoneManagerServerProtocol ozoneManager;
+  private final OzoneManagerStateMachine omStateMachine;
   private final ClientId clientId = ClientId.randomId();
 
   private final ScheduledExecutorService scheduledRoleChecker;
@@ -130,11 +129,13 @@ public final class OzoneManagerRatisServer {
     LOG.info("Instantiating OM Ratis server with GroupID: {} and " +
         "Raft Peers: {}", raftGroupIdStr, raftPeersStr.toString().substring(2));
 
+    this.omStateMachine = getStateMachine();
+
     this.server = RaftServer.newBuilder()
         .setServerId(this.raftPeerId)
         .setGroup(this.raftGroup)
         .setProperties(serverProperties)
-        .setStateMachine(getStateMachine(this.raftGroupId))
+        .setStateMachine(omStateMachine)
         .build();
 
     // Run a scheduler to check and update the server role on the leader
@@ -156,7 +157,7 @@ public final class OzoneManagerRatisServer {
    * Creates an instance of OzoneManagerRatisServer.
    */
   public static OzoneManagerRatisServer newOMRatisServer(
-      Configuration ozoneConf, OzoneManager om,
+      Configuration ozoneConf, OzoneManagerServerProtocol omProtocol,
       OMNodeDetails omNodeDetails, List<OMNodeDetails> peerNodes)
       throws IOException {
 
@@ -186,7 +187,7 @@ public final class OzoneManagerRatisServer {
       raftPeers.add(raftPeer);
     }
 
-    return new OzoneManagerRatisServer(ozoneConf, om, omServiceId,
+    return new OzoneManagerRatisServer(ozoneConf, omProtocol, omServiceId,
         localRaftPeerId, ratisAddr, raftPeers);
   }
 
@@ -197,7 +198,7 @@ public final class OzoneManagerRatisServer {
   /**
    * Returns OzoneManager StateMachine.
    */
-  private BaseStateMachine getStateMachine(RaftGroupId gid) {
+  private OzoneManagerStateMachine getStateMachine() {
     return  new OzoneManagerStateMachine(this);
   }
 
@@ -382,10 +383,13 @@ public final class OzoneManagerRatisServer {
     this.roleCheckInitialDelayMs = leaderElectionMinTimeout
         .toLong(TimeUnit.MILLISECONDS);
 
-    /**
-     * TODO: when ratis snapshots are implemented, set snapshot threshold and
-     * queue size.
-     */
+    long snapshotAutoTriggerThreshold = conf.getLong(
+        OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY,
+        OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_DEFAULT);
+    RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(
+        properties, true);
+    RaftServerConfigKeys.Snapshot.setAutoTriggerThreshold(
+        properties, snapshotAutoTriggerThreshold);
 
     return properties;
   }
@@ -517,4 +521,8 @@ public final class OzoneManagerRatisServer {
   private UUID getRaftGroupIdFromOmServiceId(String omServiceId) {
     return UUID.nameUUIDFromBytes(omServiceId.getBytes(StandardCharsets.UTF_8));
   }
+
+  public long getStateMachineLastAppliedIndex() {
+    return omStateMachine.getLastAppliedIndex();
+  }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
index 420ffb5..2f3445a 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
@@ -69,6 +69,7 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
   private final OzoneManagerServerProtocol ozoneManager;
   private RequestHandler handler;
   private RaftGroupId raftGroupId;
+  private long lastAppliedIndex = 0;
 
   public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer) {
     this.omRatisServer = ratisServer;
@@ -95,6 +96,7 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
    * should be rejected.
    * @throws IOException thrown by the state machine while validating
    */
+  @Override
   public TransactionContext startTransaction(
       RaftClientRequest raftClientRequest) throws IOException {
     ByteString messageContent = raftClientRequest.getMessage().getContent();
@@ -115,7 +117,63 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
       return ctxt;
     }
     return handleStartTransactionRequests(raftClientRequest, omRequest);
+  }
 
+  /*
+   * Apply a committed log entry to the state machine.
+   */
+  @Override
+  public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
+    try {
+      OMRequest request = OMRatisHelper.convertByteStringToOMRequest(
+          trx.getStateMachineLogEntry().getLogData());
+      long trxLogIndex = trx.getLogEntry().getIndex();
+      CompletableFuture<Message> future = CompletableFuture
+          .supplyAsync(() -> runCommand(request, trxLogIndex));
+      return future;
+    } catch (IOException e) {
+      return completeExceptionally(e);
+    }
+  }
+
+  /**
+   * Query the state machine. The request must be read-only.
+   */
+  @Override
+  public CompletableFuture<Message> query(Message request) {
+    try {
+      OMRequest omRequest = OMRatisHelper.convertByteStringToOMRequest(
+          request.getContent());
+      return CompletableFuture.completedFuture(queryCommand(omRequest));
+    } catch (IOException e) {
+      return completeExceptionally(e);
+    }
+  }
+
+  /**
+   * Take OM Ratis snapshot. Write the snapshot index to file. Snapshot index
+   * is the log index corresponding to the last applied transaction on the OM
+   * State Machine.
+   *
+   * @return the last applied index on the state machine which has been
+   * stored in the snapshot file.
+   */
+  @Override
+  public long takeSnapshot() throws IOException {
+    LOG.info("Saving Ratis snapshot on the OM.");
+    if (ozoneManager != null) {
+      return ozoneManager.saveRatisSnapshot();
+    }
+    return 0;
+  }
+
+  /**
+   * Notifies the state machine that the raft peer is no longer leader.
+   */
+  @Override
+  public void notifyNotLeader(Collection<TransactionContext> pendingEntries)
+      throws IOException {
+    omRatisServer.updateServerRole();
   }
 
   /**
@@ -142,10 +200,8 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
           .setLogData(raftClientRequest.getMessage().getContent())
           .build();
     }
-
   }
 
-
   private TransactionContext handleInitiateMultipartUpload(
       RaftClientRequest raftClientRequest, OMRequest omRequest) {
 
@@ -237,7 +293,6 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
         .build();
   }
 
-
   /**
    * Handle AllocateBlock Request, which needs a special handling. This
    * request needs to be executed on the leader, where it connects to SCM and
@@ -250,7 +305,6 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
       RaftClientRequest raftClientRequest, OMRequest omRequest) {
     OMResponse omResponse = handler.handle(omRequest);
 
-
     // If request is failed, no need to proceed further.
     // Setting the exception with omResponse message and code.
 
@@ -270,7 +324,6 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
       return transactionContext;
     }
 
-
     // Get original request
     OzoneManagerProtocolProtos.AllocateBlockRequest allocateBlockRequest =
         omRequest.getAllocateBlockRequest();
@@ -294,7 +347,6 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
         .setServerRole(RaftProtos.RaftPeerRole.LEADER)
         .setLogData(messageContent)
         .build();
-
   }
 
   /**
@@ -308,56 +360,33 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
         STATUS_CODE + omResponse.getStatus());
   }
 
-  /*
-   * Apply a committed log entry to the state machine.
-   */
-  @Override
-  public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
-    try {
-      OMRequest request = OMRatisHelper.convertByteStringToOMRequest(
-          trx.getStateMachineLogEntry().getLogData());
-      CompletableFuture<Message> future = CompletableFuture
-          .supplyAsync(() -> runCommand(request));
-      return future;
-    } catch (IOException e) {
-      return completeExceptionally(e);
-    }
-  }
-
-  /**
-   * Query the state machine. The request must be read-only.
-   */
-  @Override
-  public CompletableFuture<Message> query(Message request) {
-    try {
-      OMRequest omRequest = OMRatisHelper.convertByteStringToOMRequest(
-          request.getContent());
-      return CompletableFuture.completedFuture(runCommand(omRequest));
-    } catch (IOException e) {
-      return completeExceptionally(e);
-    }
-  }
-
   /**
-   * Notifies the state machine that the raft peer is no longer leader.
+   * Submits write request to OM and returns the response Message.
+   * @param request OMRequest
+   * @return response from OM
+   * @throws ServiceException
    */
-  @Override
-  public void notifyNotLeader(Collection<TransactionContext> pendingEntries)
-      throws IOException {
-    omRatisServer.updateServerRole();
+  private Message runCommand(OMRequest request, long trxLogIndex) {
+    OMResponse response = handler.handle(request);
+    lastAppliedIndex = trxLogIndex;
+    return OMRatisHelper.convertResponseToMessage(response);
   }
 
   /**
-   * Submits request to OM and returns the response Message.
+   * Submits read request to OM and returns the response Message.
    * @param request OMRequest
    * @return response from OM
    * @throws ServiceException
    */
-  private Message runCommand(OMRequest request) {
+  private Message queryCommand(OMRequest request) {
     OMResponse response = handler.handle(request);
     return OMRatisHelper.convertResponseToMessage(response);
   }
 
+  public long getLastAppliedIndex() {
+    return lastAppliedIndex;
+  }
+
   private static <T> CompletableFuture<T> completeExceptionally(Exception e) {
     final CompletableFuture<T> future = new CompletableFuture<>();
     future.completeExceptionally(e);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org