You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sh...@apache.org on 2021/01/20 17:54:41 UTC

[ozone] branch HDDS-2823 updated: HDDS-3208. Implement Ratis snapshot on SCM (#1725)

This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-2823 by this push:
     new 3ff677d  HDDS-3208. Implement Ratis snapshot on SCM (#1725)
3ff677d is described below

commit 3ff677d9400fe3c033e011a8a3bbc91cfb13684f
Author: Rui Wang <am...@163.com>
AuthorDate: Wed Jan 20 09:54:24 2021 -0800

    HDDS-3208. Implement Ratis snapshot on SCM (#1725)
---
 .../hdds/scm/container/ContainerManagerImpl.java   |   9 +-
 .../scm/container/ContainerStateManagerImpl.java   |  40 ++++--
 ...{SCMHAManager.java => DBTransactionBuffer.java} |  38 +++---
 .../hdds/scm/ha/MockDBTransactionBuffer.java       |  82 ++++++++++++
 .../hadoop/hdds/scm/ha/MockSCMHAManager.java       |  17 ++-
 .../hadoop/hdds/scm/ha/SCMDBTransactionBuffer.java | 112 ++++++++++++++++
 .../apache/hadoop/hdds/scm/ha/SCMHAManager.java    |   5 +
 .../hadoop/hdds/scm/ha/SCMHAManagerImpl.java       |  13 +-
 .../hadoop/hdds/scm/ha/SCMRatisServerImpl.java     |   7 +-
 .../hadoop/hdds/scm/ha/SCMRatisSnapshotInfo.java   |  71 ++++++++++
 .../apache/hadoop/hdds/scm/ha/SCMStateMachine.java |  64 ++++++++-
 .../hadoop/hdds/scm/ha/SCMTransactionInfo.java     | 145 +++++++++++++++++++++
 .../hadoop/hdds/scm/metadata/SCMDBDefinition.java  |  13 +-
 .../hadoop/hdds/scm/metadata/SCMMetadataStore.java |   6 +
 .../hdds/scm/metadata/SCMMetadataStoreImpl.java    |  13 ++
 .../hdds/scm/metadata/SCMTransactionInfoCodec.java |  48 +++++++
 .../hdds/scm/pipeline/PipelineManagerV2Impl.java   |   1 +
 .../scm/pipeline/PipelineStateManagerV2Impl.java   |  25 +++-
 .../hdds/scm/server/StorageContainerManager.java   |  14 +-
 .../scm/metadata/TestSCMTransactionInfoCodec.java  |  78 +++++++++++
 .../hdds/scm/pipeline/TestPipelineManagerImpl.java |  35 ++++-
 ...TestSCMStoreImplWithOldPipelineIDKeyFormat.java |   6 +
 .../apache/hadoop/hdds/scm/TestSCMSnapshot.java    | 109 ++++++++++++++++
 .../apache/hadoop/ozone/MiniOzoneClusterImpl.java  |   1 +
 .../ozone/recon/scm/ReconContainerManager.java     |   7 +-
 .../scm/ReconStorageContainerManagerFacade.java    |   1 +
 .../scm/AbstractReconContainerManagerTest.java     |   1 +
 .../ozone/recon/scm/TestReconContainerManager.java |   5 +
 28 files changed, 902 insertions(+), 64 deletions(-)

diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
index 91684ce..c0ace46 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
@@ -32,6 +32,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -110,6 +111,7 @@ public class ContainerManagerImpl implements ContainerManagerV2 {
         .setPipelineManager(pipelineManager)
         .setRatisServer(scmHaManager.getRatisServer())
         .setContainerStore(containerStore)
+        .setSCMDBTransactionBuffer(scmHaManager.getDBTransactionBuffer())
         .build();
 
     this.numContainerPerVolume = conf
@@ -117,8 +119,6 @@ public class ContainerManagerImpl implements ContainerManagerV2 {
             ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT_DEFAULT);
 
     this.scmContainerManagerMetrics = SCMContainerManagerMetrics.create();
-
-
   }
 
   @Override
@@ -397,4 +397,9 @@ public class ContainerManagerImpl implements ContainerManagerV2 {
   protected ContainerStateManagerV2 getContainerStateManager() {
     return containerStateManager;
   }
+
+  @VisibleForTesting
+  public SCMHAManager getSCMHAManager() {
+    return haManager;
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
index 68e4d35..c26afef 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.states.ContainerState;
 import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
 import org.apache.hadoop.hdds.scm.ha.CheckedConsumer;
+import org.apache.hadoop.hdds.scm.ha.DBTransactionBuffer;
 import org.apache.hadoop.hdds.scm.ha.ExecutionUtil;
 import org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler;
 import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
@@ -99,7 +100,9 @@ public final class ContainerStateManagerImpl
   /**
    * Persistent store for Container States.
    */
-  private final Table<ContainerID, ContainerInfo> containerStore;
+  private Table<ContainerID, ContainerInfo> containerStore;
+
+  private final DBTransactionBuffer transactionBuffer;
 
   /**
    * PipelineManager instance.
@@ -135,7 +138,8 @@ public final class ContainerStateManagerImpl
    */
   private ContainerStateManagerImpl(final Configuration conf,
       final PipelineManager pipelineManager,
-      final Table<ContainerID, ContainerInfo> containerStore)
+      final Table<ContainerID, ContainerInfo> containerStore,
+      final DBTransactionBuffer buffer)
       throws IOException {
     this.pipelineManager = pipelineManager;
     this.containerStore = containerStore;
@@ -144,6 +148,7 @@ public final class ContainerStateManagerImpl
     this.containers = new ContainerStateMap();
     this.lastUsedMap = new ConcurrentHashMap<>();
     this.containerStateChangeActions = getContainerStateChangeActions();
+    this.transactionBuffer = buffer;
 
     initialize();
   }
@@ -298,12 +303,16 @@ public final class ContainerStateManagerImpl
     try {
       if (!containers.contains(containerID)) {
         ExecutionUtil.create(() -> {
-          containerStore.put(containerID, container);
+          containerStore.putWithBatch(
+              transactionBuffer.getCurrentBatchOperation(),
+              containerID, container);
           containers.addContainer(container);
           pipelineManager.addContainerToPipeline(pipelineID, containerID);
         }).onException(() -> {
           containers.removeContainer(containerID);
-          containerStore.delete(containerID);
+          containerStore.deleteWithBatch(
+              transactionBuffer.getCurrentBatchOperation(),
+              containerID);
         }).execute();
       }
     } finally {
@@ -339,13 +348,17 @@ public final class ContainerStateManagerImpl
         if (newState.getNumber() > oldState.getNumber()) {
           ExecutionUtil.create(() -> {
             containers.updateState(id, oldState, newState);
-            containerStore.put(id, containers.getContainerInfo(id));
+            containerStore.putWithBatch(
+                transactionBuffer.getCurrentBatchOperation(),
+                id, containers.getContainerInfo(id));
           }).onException(() -> {
-            containerStore.put(id, oldInfo);
+            containerStore.putWithBatch(
+                transactionBuffer.getCurrentBatchOperation(),
+                id, oldInfo);
             containers.updateState(id, newState, oldState);
           }).execute();
-          containerStateChangeActions.getOrDefault(event, info -> {})
-              .execute(oldInfo);
+          containerStateChangeActions.getOrDefault(event, info -> {
+          }).execute(oldInfo);
         }
       }
     } finally {
@@ -475,7 +488,9 @@ public final class ContainerStateManagerImpl
       final ContainerID cid = ContainerID.getFromProtobuf(id);
       final ContainerInfo containerInfo = containers.getContainerInfo(cid);
       ExecutionUtil.create(() -> {
-        containerStore.delete(cid);
+        containerStore.deleteWithBatch(
+            transactionBuffer.getCurrentBatchOperation(),
+            cid);
         containers.removeContainer(cid);
       }).onException(() -> containerStore.put(cid, containerInfo)).execute();
     } finally {
@@ -504,7 +519,12 @@ public final class ContainerStateManagerImpl
     private PipelineManager pipelineMgr;
     private SCMRatisServer scmRatisServer;
     private Table<ContainerID, ContainerInfo> table;
+    private DBTransactionBuffer transactionBuffer;
 
+    public Builder setSCMDBTransactionBuffer(DBTransactionBuffer buffer) {
+      this.transactionBuffer = buffer;
+      return this;
+    }
     public Builder setConfiguration(final Configuration config) {
       conf = config;
       return this;
@@ -533,7 +553,7 @@ public final class ContainerStateManagerImpl
       Preconditions.checkNotNull(table);
 
       final ContainerStateManagerV2 csm = new ContainerStateManagerImpl(
-          conf, pipelineMgr, table);
+          conf, pipelineMgr, table, transactionBuffer);
 
       final SCMHAInvocationHandler invocationHandler =
           new SCMHAInvocationHandler(RequestType.CONTAINER, csm,
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/DBTransactionBuffer.java
similarity index 56%
copy from hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
copy to hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/DBTransactionBuffer.java
index 8fe6d7f..ae74939 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/DBTransactionBuffer.java
@@ -14,28 +14,30 @@
  * License for the specific language governing permissions and limitations under
  * the License.
  */
-
 package org.apache.hadoop.hdds.scm.ha;
 
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.ratis.statemachine.SnapshotInfo;
+
+import java.io.Closeable;
 import java.io.IOException;
 
 /**
- * SCMHAManager provides HA service for SCM.
+ * DB transaction that buffers SCM DB transactions. Call the flush method
+ * to flush a batch into SCM DB. This buffer also maintains a latest transaction
+ * info to indicate the information of the latest transaction in the buffer.
  */
-public interface SCMHAManager {
-
-  /**
-   * Starts HA service.
-   */
-  void start() throws IOException;
-
-  /**
-   * Returns RatisServer instance associated with the SCM instance.
-   */
-  SCMRatisServer getRatisServer();
-
-  /**
-   * Stops the HA service.
-   */
-  void shutdown() throws IOException;
+public interface DBTransactionBuffer extends Closeable {
+
+  BatchOperation getCurrentBatchOperation();
+
+  void updateLatestTrxInfo(SCMTransactionInfo info);
+
+  SCMTransactionInfo getLatestTrxInfo();
+
+  SnapshotInfo getLatestSnapshot();
+
+  void setLatestSnapshot(SnapshotInfo latestSnapshot);
+
+  void flush() throws IOException;
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockDBTransactionBuffer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockDBTransactionBuffer.java
new file mode 100644
index 0000000..2a1d615
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockDBTransactionBuffer.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.ha;
+
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.RDBBatchOperation;
+import org.apache.ratis.statemachine.SnapshotInfo;
+
+import java.io.IOException;
+
+public class MockDBTransactionBuffer implements DBTransactionBuffer {
+  private DBStore dbStore;
+  private BatchOperation currentBatchOperation;
+
+  public MockDBTransactionBuffer() {
+  }
+
+  public MockDBTransactionBuffer(DBStore store) {
+    this.dbStore = store;
+  }
+
+  @Override
+  public BatchOperation getCurrentBatchOperation() {
+    if (currentBatchOperation == null) {
+      if (dbStore != null) {
+        currentBatchOperation = dbStore.initBatchOperation();
+      } else {
+        currentBatchOperation = new RDBBatchOperation();
+      }
+    }
+    return currentBatchOperation;
+  }
+
+  @Override
+  public void updateLatestTrxInfo(SCMTransactionInfo info) {
+
+  }
+
+  @Override
+  public SCMTransactionInfo getLatestTrxInfo() {
+    return null;
+  }
+
+  @Override
+  public SnapshotInfo getLatestSnapshot() {
+    return null;
+  }
+
+  @Override
+  public void setLatestSnapshot(SnapshotInfo latestSnapshot) {
+
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (dbStore != null) {
+      dbStore.commitBatchOperation(currentBatchOperation);
+      currentBatchOperation.close();
+      currentBatchOperation = null;
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    flush();
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
index bf25ad5..116994e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
@@ -16,7 +16,6 @@
  */
 
 package org.apache.hadoop.hdds.scm.ha;
-
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
@@ -45,17 +44,28 @@ public final class MockSCMHAManager implements SCMHAManager {
 
   private final SCMRatisServer ratisServer;
   private boolean isLeader;
+  private DBTransactionBuffer transactionBuffer;
 
   public static SCMHAManager getInstance(boolean isLeader) {
     return new MockSCMHAManager(isLeader);
   }
 
+  public static SCMHAManager getInstance(boolean isLeader,
+      DBTransactionBuffer buffer) {
+    return new MockSCMHAManager(isLeader, buffer);
+  }
+
   /**
    * Creates MockSCMHAManager instance.
    */
   private MockSCMHAManager(boolean isLeader) {
+    this(isLeader, new MockDBTransactionBuffer());
+  }
+
+  private MockSCMHAManager(boolean isLeader, DBTransactionBuffer buffer) {
     this.ratisServer = new MockRatisServer();
     this.isLeader = isLeader;
+    this.transactionBuffer = buffer;
   }
 
   @Override
@@ -82,6 +92,11 @@ public final class MockSCMHAManager implements SCMHAManager {
     return ratisServer;
   }
 
+  @Override
+  public DBTransactionBuffer getDBTransactionBuffer() {
+    return transactionBuffer;
+  }
+
   /**
    * {@inheritDoc}
    */
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMDBTransactionBuffer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMDBTransactionBuffer.java
new file mode 100644
index 0000000..e5af076
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMDBTransactionBuffer.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.ha;
+
+import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.ratis.statemachine.SnapshotInfo;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
+
+/**
+ * This is a transaction buffer that buffers SCM DB operations for Pipeline and
+ * Container. When flush this buffer to DB, a transaction info will also be
+ * written into DB to indicate the term and transaction index for the latest
+ * operation in DB.
+ */
+public class SCMDBTransactionBuffer implements DBTransactionBuffer {
+  private final SCMMetadataStore metadataStore;
+  private BatchOperation currentBatchOperation;
+  private SCMTransactionInfo latestTrxInfo;
+  private SnapshotInfo latestSnapshot;
+
+  public SCMDBTransactionBuffer(SCMMetadataStore store) throws IOException {
+    this.metadataStore = store;
+
+    // initialize a batch operation during construction time
+    currentBatchOperation = this.metadataStore.getStore().initBatchOperation();
+    latestTrxInfo = store.getTransactionInfoTable().get(TRANSACTION_INFO_KEY);
+    if (latestTrxInfo == null) {
+      // transaction table is empty
+      latestTrxInfo =
+          SCMTransactionInfo
+              .builder()
+              .setTransactionIndex(-1)
+              .setCurrentTerm(0)
+              .build();
+    }
+    latestSnapshot = latestTrxInfo.toSnapshotInfo();
+  }
+
+  @Override
+  public BatchOperation getCurrentBatchOperation() {
+    return currentBatchOperation;
+  }
+
+  @Override
+  public void updateLatestTrxInfo(SCMTransactionInfo info) {
+    if (info.compareTo(this.latestTrxInfo) <= 0) {
+      throw new IllegalArgumentException(
+          "Updating DB buffer transaction info by an older transaction info, "
+          + "current: " + this.latestTrxInfo + ", updating to: " + info);
+    }
+    this.latestTrxInfo = info;
+  }
+
+  @Override
+  public SCMTransactionInfo getLatestTrxInfo() {
+    return this.latestTrxInfo;
+  }
+
+  @Override
+  public SnapshotInfo getLatestSnapshot() {
+    return latestSnapshot;
+  }
+
+  @Override
+  public void setLatestSnapshot(SnapshotInfo latestSnapshot) {
+    this.latestSnapshot = latestSnapshot;
+  }
+
+  @Override
+  public void flush() throws IOException {
+    // write latest trx info into trx table in the same batch
+    Table<String, SCMTransactionInfo> transactionInfoTable
+        = metadataStore.getTransactionInfoTable();
+    transactionInfoTable.putWithBatch(currentBatchOperation,
+        TRANSACTION_INFO_KEY, latestTrxInfo);
+
+    metadataStore.getStore().commitBatchOperation(currentBatchOperation);
+    currentBatchOperation.close();
+    this.latestSnapshot = latestTrxInfo.toSnapshotInfo();
+    // reset batch operation
+    currentBatchOperation = metadataStore.getStore().initBatchOperation();
+  }
+
+  @Override
+  public String toString() {
+    return latestTrxInfo.toString();
+  }
+
+  @Override
+  public void close() throws IOException {
+
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
index 8fe6d7f..c8b3ff7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
@@ -35,6 +35,11 @@ public interface SCMHAManager {
   SCMRatisServer getRatisServer();
 
   /**
+   * Returns DB transaction buffer.
+   */
+  DBTransactionBuffer getDBTransactionBuffer();
+
+  /**
    * Stops the HA service.
    */
   void shutdown() throws IOException;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
index db5e937..b795380 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
@@ -39,16 +39,18 @@ public class SCMHAManagerImpl implements SCMHAManager {
 
   private final SCMRatisServer ratisServer;
   private final ConfigurationSource conf;
+  private final SCMDBTransactionBuffer transactionBuffer;
 
   /**
    * Creates SCMHAManager instance.
    */
   public SCMHAManagerImpl(final ConfigurationSource conf,
-                          final StorageContainerManager scm)
-      throws IOException {
+      final StorageContainerManager scm) throws IOException {
     this.conf = conf;
+    this.transactionBuffer =
+        new SCMDBTransactionBuffer(scm.getScmMetadataStore());
     this.ratisServer = new SCMRatisServerImpl(
-        conf.getObject(SCMHAConfiguration.class), conf, scm);
+        conf.getObject(SCMHAConfiguration.class), conf, scm, transactionBuffer);
   }
 
   /**
@@ -63,6 +65,11 @@ public class SCMHAManagerImpl implements SCMHAManager {
     return ratisServer;
   }
 
+  @Override
+  public DBTransactionBuffer getDBTransactionBuffer() {
+    return transactionBuffer;
+  }
+
   /**
    * {@inheritDoc}
    */
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
index 3a453e3..15b2fe1 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
@@ -64,9 +64,8 @@ public class SCMRatisServerImpl implements SCMRatisServer {
   // TODO: Refactor and remove ConfigurationSource and use only
   //  SCMHAConfiguration.
   SCMRatisServerImpl(final SCMHAConfiguration haConf,
-                     final ConfigurationSource conf,
-                     final StorageContainerManager scm)
-      throws IOException {
+      final ConfigurationSource conf, final StorageContainerManager scm,
+      final DBTransactionBuffer buffer) throws IOException {
     this.scm = scm;
     this.address = haConf.getRatisBindAddress();
 
@@ -79,7 +78,7 @@ public class SCMRatisServerImpl implements SCMRatisServer {
         .setServerId(haGrpBuilder.getPeerId())
         .setGroup(haGrpBuilder.getRaftGroup())
         .setProperties(serverProperties)
-        .setStateMachine(new SCMStateMachine(scm, this))
+        .setStateMachine(new SCMStateMachine(scm, this, buffer))
         .build();
 
     this.division = server.getDivision(haGrpBuilder.getRaftGroupId());
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisSnapshotInfo.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisSnapshotInfo.java
new file mode 100644
index 0000000..b2f2ed4
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisSnapshotInfo.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * <p>Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.ha;
+
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.FileInfo;
+import org.apache.ratis.statemachine.SnapshotInfo;
+
+import java.util.List;
+
+import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_SPLIT_KEY;
+
+/**
+ * This class captures the snapshotIndex and term of the latest snapshot in
+ * the SCM
+ * Ratis server loads the snapshotInfo during startup and updates the
+ * lastApplied index to this snapshotIndex. SCM SnapshotInfo does not contain
+ * any files. It is used only to store/ update the last applied index and term.
+ */
+public class SCMRatisSnapshotInfo implements SnapshotInfo {
+  private final long term;
+  private final long snapshotIndex;
+
+  public SCMRatisSnapshotInfo(long term, long index) {
+    this.term = term;
+    this.snapshotIndex = index;
+  }
+
+  @Override
+  public TermIndex getTermIndex() {
+    return TermIndex.valueOf(term, snapshotIndex);
+  }
+
+  @Override
+  public long getTerm() {
+    return term;
+  }
+
+  @Override
+  public long getIndex() {
+    return snapshotIndex;
+  }
+
+  @Override
+  public List<FileInfo> getFiles() {
+    return null;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder stringBuilder = new StringBuilder();
+    stringBuilder.append(term);
+    stringBuilder.append(TRANSACTION_INFO_SPLIT_KEY);
+    stringBuilder.append(snapshotIndex);
+    return stringBuilder.toString();
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
index 052bf4b..aa366e1 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
@@ -31,6 +31,10 @@ import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.statemachine.SnapshotInfo;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
 
@@ -38,6 +42,8 @@ import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.SCM_NOT_INITIALIZED;
+
 /**
  * TODO.
  */
@@ -48,13 +54,27 @@ public class SCMStateMachine extends BaseStateMachine {
   private final StorageContainerManager scm;
   private final SCMRatisServer ratisServer;
   private final Map<RequestType, Object> handlers;
-
+  private final DBTransactionBuffer transactionBuffer;
 
   public SCMStateMachine(final StorageContainerManager scm,
-                         final SCMRatisServer ratisServer) {
+      final SCMRatisServer ratisServer, DBTransactionBuffer buffer)
+      throws SCMException {
     this.scm = scm;
     this.ratisServer = ratisServer;
     this.handlers = new EnumMap<>(RequestType.class);
+    this.transactionBuffer = buffer;
+    SCMTransactionInfo latestTrxInfo =
+        this.transactionBuffer.getLatestTrxInfo();
+    if (!latestTrxInfo.isInitialized()) {
+      if (!updateLastAppliedTermIndex(latestTrxInfo.getTerm(),
+          latestTrxInfo.getTransactionIndex())) {
+        throw new SCMException(
+            String.format("Failed to update LastAppliedTermIndex " +
+                    "in StateMachine to term:{} index:{}",
+                latestTrxInfo.getTerm(), latestTrxInfo.getTransactionIndex()
+            ), SCM_NOT_INITIALIZED);
+      }
+    }
   }
 
   public void registerHandler(RequestType type, Object handler) {
@@ -62,6 +82,11 @@ public class SCMStateMachine extends BaseStateMachine {
   }
 
   @Override
+  public SnapshotInfo getLatestSnapshot() {
+    return transactionBuffer.getLatestSnapshot();
+  }
+
+  @Override
   public CompletableFuture<Message> applyTransaction(
       final TransactionContext trx) {
     final CompletableFuture<Message> applyTransactionFuture =
@@ -70,14 +95,17 @@ public class SCMStateMachine extends BaseStateMachine {
       final SCMRatisRequest request = SCMRatisRequest.decode(
           Message.valueOf(trx.getStateMachineLogEntry().getLogData()));
       applyTransactionFuture.complete(process(request));
+      transactionBuffer.updateLatestTrxInfo(SCMTransactionInfo.builder()
+          .setCurrentTerm(trx.getLogEntry().getTerm())
+          .setTransactionIndex(trx.getLogEntry().getIndex())
+          .build());
     } catch (Exception ex) {
       applyTransactionFuture.completeExceptionally(ex);
     }
     return applyTransactionFuture;
   }
 
-  private Message process(final SCMRatisRequest request)
-      throws Exception {
+  private Message process(final SCMRatisRequest request) throws Exception {
     try {
       final Object handler = handlers.get(request.getType());
 
@@ -93,7 +121,6 @@ public class SCMStateMachine extends BaseStateMachine {
       final Object result = handler.getClass().getMethod(
           request.getOperation(), argumentTypes.toArray(new Class<?>[0]))
           .invoke(handler, request.getArguments());
-
       return SCMRatisResponse.encode(result);
     } catch (NoSuchMethodException | SecurityException ex) {
       throw new InvalidProtocolBufferException(ex.getMessage());
@@ -126,4 +153,31 @@ public class SCMStateMachine extends BaseStateMachine {
     LOG.info("current SCM becomes leader of term {}.", term);
     scm.getScmContext().updateIsLeaderAndTerm(true, term);
   }
+
+  @Override
+  public long takeSnapshot() throws IOException {
+    long startTime = Time.monotonicNow();
+    TermIndex lastTermIndex = getLastAppliedTermIndex();
+    long lastAppliedIndex = lastTermIndex.getIndex();
+    SCMTransactionInfo lastAppliedTrxInfo =
+        SCMTransactionInfo.fromTermIndex(lastTermIndex);
+    if (transactionBuffer.getLatestTrxInfo()
+        .compareTo(lastAppliedTrxInfo) < 0) {
+      transactionBuffer.updateLatestTrxInfo(
+          SCMTransactionInfo.builder()
+              .setCurrentTerm(lastTermIndex.getTerm())
+              .setTransactionIndex(lastTermIndex.getIndex())
+              .build());
+      transactionBuffer.setLatestSnapshot(
+          transactionBuffer.getLatestTrxInfo().toSnapshotInfo());
+    } else {
+      lastAppliedIndex =
+          transactionBuffer.getLatestTrxInfo().getTransactionIndex();
+    }
+
+    transactionBuffer.flush();
+    LOG.info("Current Snapshot Index {}, takeSnapshot took {} ms",
+        lastAppliedIndex, Time.monotonicNow() - startTime);
+    return lastAppliedIndex;
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMTransactionInfo.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMTransactionInfo.java
new file mode 100644
index 0000000..0140839
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMTransactionInfo.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * <p>Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.ha;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.StringUtils;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.statemachine.SnapshotInfo;
+
+import java.util.Objects;
+
+import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_SPLIT_KEY;
+
+/**
+ * SCMTransactionInfo saves two fields for a transaction:
+ *  1. term, of which the transaction belongs to
+ *  2. transactionIndex, which is a monotonic increasing index
+ *     (e.g. Raft Log index)
+ */
+final public class SCMTransactionInfo {
+  private long term;
+  private long transactionIndex;
+
+  private SCMTransactionInfo(String transactionInfo) {
+    String[] tInfo =
+        transactionInfo.split(TRANSACTION_INFO_SPLIT_KEY);
+    Preconditions.checkState(tInfo.length == 2,
+        "Incorrect TransactionInfo value");
+
+    term = Long.parseLong(tInfo[0]);
+    transactionIndex = Long.parseLong(tInfo[1]);
+  }
+
+  private SCMTransactionInfo(long currentTerm, long transactionIndex) {
+    this.term = currentTerm;
+    this.transactionIndex = transactionIndex;
+  }
+
+  public boolean isInitialized() {
+    return transactionIndex == -1 && term == 0;
+  }
+
+  public int compareTo(SCMTransactionInfo info) {
+    if (info.getTerm() == this.getTerm()) {
+      return this.getTransactionIndex() <= info.getTransactionIndex() ? -1 : 1;
+    } else {
+      return this.getTerm() < info.getTerm() ? -1 : 1;
+    }
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    SCMTransactionInfo that = (SCMTransactionInfo) o;
+    return term == that.term &&
+        transactionIndex == that.transactionIndex;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(term, transactionIndex);
+  }
+
+  @Override
+  public String toString() {
+    return "[" + term + ":" + transactionIndex + "]";
+  }
+
+  public long getTerm() {
+    return term;
+  }
+
+  public long getTransactionIndex() {
+    return transactionIndex;
+  }
+
+  public static SCMTransactionInfo fromTermIndex(TermIndex termIndex) {
+    return builder().setCurrentTerm(termIndex.getTerm())
+        .setTransactionIndex(termIndex.getIndex()).build();
+  }
+
+  public SnapshotInfo toSnapshotInfo() {
+    return new SCMRatisSnapshotInfo(term, transactionIndex);
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static class Builder {
+    private long currentTerm = 0;
+    private long transactionIndex = -1;
+
+    public Builder setCurrentTerm(long term) {
+      this.currentTerm = term;
+      return this;
+    }
+
+    public Builder setTransactionIndex(long tIndex) {
+      this.transactionIndex = tIndex;
+      return this;
+    }
+
+    public SCMTransactionInfo build() {
+      return new SCMTransactionInfo(currentTerm, transactionIndex);
+    }
+  }
+
+  public static SCMTransactionInfo getFromByteArray(byte[] bytes) {
+    String tInfo = StringUtils.bytes2String(bytes);
+    return new SCMTransactionInfo(tInfo);
+  }
+
+  public byte[] convertToByteArray() {
+    return StringUtils.string2Bytes(generateTransactionInfo());
+  }
+
+  private String generateTransactionInfo() {
+    StringBuilder stringBuilder = new StringBuilder();
+    stringBuilder.append(term);
+    stringBuilder.append(TRANSACTION_INFO_SPLIT_KEY);
+    stringBuilder.append(transactionIndex);
+
+    return stringBuilder.toString();
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java
index fcddcdd..790b6db 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java
@@ -24,11 +24,13 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.ha.SCMTransactionInfo;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.utils.db.DBColumnFamilyDefinition;
 import org.apache.hadoop.hdds.utils.db.DBDefinition;
 import org.apache.hadoop.hdds.utils.db.LongCodec;
+import org.apache.hadoop.hdds.utils.db.StringCodec;
 
 /**
  * Class defines the structure and types of the scm.db.
@@ -80,6 +82,15 @@ public class SCMDBDefinition implements DBDefinition {
           ContainerInfo.class,
           new ContainerInfoCodec());
 
+  public static final DBColumnFamilyDefinition<String, SCMTransactionInfo>
+      TRANSACTIONINFO =
+      new DBColumnFamilyDefinition<>(
+          "scmTransactionInfos",
+          String.class,
+          new StringCodec(),
+          SCMTransactionInfo.class,
+          new SCMTransactionInfoCodec());
+
   @Override
   public String getName() {
     return "scm.db";
@@ -93,6 +104,6 @@ public class SCMDBDefinition implements DBDefinition {
   @Override
   public DBColumnFamilyDefinition[] getColumnFamilies() {
     return new DBColumnFamilyDefinition[] {DELETED_BLOCKS, VALID_CERTS,
-        REVOKED_CERTS, PIPELINES, CONTAINERS};
+        REVOKED_CERTS, PIPELINES, CONTAINERS, TRANSACTIONINFO};
   }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java
index 0452c05..7ddd11d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.ha.SCMTransactionInfo;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore;
@@ -111,6 +112,11 @@ public interface SCMMetadataStore {
   Table<PipelineID, Pipeline> getPipelineTable();
 
   /**
+   * A Table that keeps the latest transaction index of the DB state.
+   */
+  Table <String, SCMTransactionInfo> getTransactionInfoTable();
+
+  /**
    * Helper to create and write batch transactions.
    */
   BatchOperationHandler getBatchHandler();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java
index 0a609c7..0ac3a60 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.ha.SCMTransactionInfo;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore;
@@ -40,6 +41,7 @@ import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.CONTAINERS;
 import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.DELETED_BLOCKS;
 import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.PIPELINES;
 import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.REVOKED_CERTS;
+import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.TRANSACTIONINFO;
 import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.VALID_CERTS;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,6 +62,8 @@ public class SCMMetadataStoreImpl implements SCMMetadataStore {
 
   private Table<PipelineID, Pipeline> pipelineTable;
 
+  private Table<String, SCMTransactionInfo> transactionInfoTable;
+
   private static final Logger LOG =
       LoggerFactory.getLogger(SCMMetadataStoreImpl.class);
   private DBStore store;
@@ -107,6 +111,10 @@ public class SCMMetadataStoreImpl implements SCMMetadataStore {
       containerTable = CONTAINERS.getTable(store);
 
       checkTableStatus(containerTable, CONTAINERS.getName());
+
+      transactionInfoTable = TRANSACTIONINFO.getTable(store);
+
+      checkTableStatus(transactionInfoTable, TRANSACTIONINFO.getName());
     }
   }
 
@@ -162,6 +170,11 @@ public class SCMMetadataStoreImpl implements SCMMetadataStore {
   }
 
   @Override
+  public Table<String, SCMTransactionInfo> getTransactionInfoTable() {
+    return transactionInfoTable;
+  }
+
+  @Override
   public BatchOperationHandler getBatchHandler() {
     return this.store;
   }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMTransactionInfoCodec.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMTransactionInfoCodec.java
new file mode 100644
index 0000000..6c0e6e1
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMTransactionInfoCodec.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.metadata;
+
+import org.apache.hadoop.hdds.scm.ha.SCMTransactionInfo;
+import org.apache.hadoop.hdds.utils.db.Codec;
+
+import java.io.IOException;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class SCMTransactionInfoCodec implements Codec<SCMTransactionInfo> {
+
+  @Override
+  public byte[] toPersistedFormat(SCMTransactionInfo object)
+      throws IOException {
+    checkNotNull(object, "Null object can't be converted to byte array.");
+    return object.convertToByteArray();
+  }
+
+  @Override
+  public SCMTransactionInfo fromPersistedFormat(byte[] rawData)
+      throws IOException {
+    checkNotNull(rawData, "Null byte array can't be converted to " +
+        "real object.");
+    return SCMTransactionInfo.getFromByteArray(rawData);
+  }
+
+  @Override
+  public SCMTransactionInfo copyObject(SCMTransactionInfo object) {
+    return object;
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
index 286bdc9..3c88174 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
@@ -124,6 +124,7 @@ public final class PipelineManagerV2Impl implements PipelineManager {
         .newBuilder().setPipelineStore(pipelineStore)
         .setRatisServer(scmhaManager.getRatisServer())
         .setNodeManager(nodeManager)
+        .setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
         .build();
 
     // Create PipelineFactory
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java
index 06bae4c..7f88a42 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.ha.DBTransactionBuffer;
 import org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler;
 import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
@@ -52,17 +53,19 @@ public class PipelineStateManagerV2Impl implements StateManager {
   private final PipelineStateMap pipelineStateMap;
   private final NodeManager nodeManager;
   private Table<PipelineID, Pipeline> pipelineStore;
+  private final DBTransactionBuffer transactionBuffer;
 
   // Protect potential contentions between RaftServer and PipelineManager.
   // See https://issues.apache.org/jira/browse/HDDS-4560
   private final ReadWriteLock lock = new ReentrantReadWriteLock();
 
   public PipelineStateManagerV2Impl(
-      Table<PipelineID, Pipeline> pipelineStore, NodeManager nodeManager)
-      throws IOException {
+      Table<PipelineID, Pipeline> pipelineStore, NodeManager nodeManager,
+      DBTransactionBuffer buffer) throws IOException {
     this.pipelineStateMap = new PipelineStateMap();
     this.nodeManager = nodeManager;
     this.pipelineStore = pipelineStore;
+    this.transactionBuffer = buffer;
     initialize();
   }
 
@@ -90,7 +93,8 @@ public class PipelineStateManagerV2Impl implements StateManager {
     try {
       Pipeline pipeline = Pipeline.getFromProtobuf(pipelineProto);
       if (pipelineStore != null) {
-        pipelineStore.put(pipeline.getId(), pipeline);
+        pipelineStore.putWithBatch(transactionBuffer.getCurrentBatchOperation(),
+            pipeline.getId(), pipeline);
         pipelineStateMap.addPipeline(pipeline);
         nodeManager.addPipeline(pipeline);
         LOG.info("Created pipeline {}.", pipeline);
@@ -219,7 +223,8 @@ public class PipelineStateManagerV2Impl implements StateManager {
     try {
       PipelineID pipelineID = PipelineID.getFromProtobuf(pipelineIDProto);
       if (pipelineStore != null) {
-        pipelineStore.delete(pipelineID);
+        pipelineStore.deleteWithBatch(
+            transactionBuffer.getCurrentBatchOperation(), pipelineID);
       }
       Pipeline pipeline = pipelineStateMap.removePipeline(pipelineID);
       nodeManager.removePipeline(pipeline);
@@ -256,7 +261,8 @@ public class PipelineStateManagerV2Impl implements StateManager {
       if (pipelineStore != null) {
         pipelineStateMap.updatePipelineState(pipelineID,
             Pipeline.PipelineState.fromProtobuf(newState));
-        pipelineStore.put(pipelineID, getPipeline(pipelineID));
+        pipelineStore.putWithBatch(transactionBuffer.getCurrentBatchOperation(),
+            pipelineID, getPipeline(pipelineID));
       }
     } catch (IOException ex) {
       LOG.warn("Pipeline {} state update failed", pipelineID);
@@ -333,6 +339,12 @@ public class PipelineStateManagerV2Impl implements StateManager {
     private Table<PipelineID, Pipeline> pipelineStore;
     private NodeManager nodeManager;
     private SCMRatisServer scmRatisServer;
+    private DBTransactionBuffer transactionBuffer;
+
+    public Builder setSCMDBTransactionBuffer(DBTransactionBuffer buffer) {
+      this.transactionBuffer = buffer;
+      return this;
+    }
 
     public Builder setRatisServer(final SCMRatisServer ratisServer) {
       scmRatisServer = ratisServer;
@@ -354,7 +366,8 @@ public class PipelineStateManagerV2Impl implements StateManager {
       Preconditions.checkNotNull(pipelineStore);
 
       final StateManager pipelineStateManager =
-          new PipelineStateManagerV2Impl(pipelineStore, nodeManager);
+          new PipelineStateManagerV2Impl(
+              pipelineStore, nodeManager, transactionBuffer);
 
       final SCMHAInvocationHandler invocationHandler =
           new SCMHAInvocationHandler(SCMRatisProtocol.RequestType.PIPELINE,
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index b0145bb..6782f52 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -271,7 +271,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     // Creates the SCM DBs or opens them if it exists.
     // A valid pointer to the store is required by all the other services below.
     initalizeMetadataStore(conf, configurator);
-
     // Authenticate SCM if security is enabled, this initialization can only
     // be done after the metadata store is initialized.
     if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
@@ -868,7 +867,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
    * Stop service.
    */
   public void stop() {
-
     try {
       LOG.info("Stopping Replication Manager Service.");
       replicationManager.stop();
@@ -962,6 +960,12 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     IOUtils.cleanupWithLogger(LOG, pipelineManager);
 
     try {
+      scmHAManager.shutdown();
+    } catch (Exception ex) {
+      LOG.error("SCM HA Manager stop failed", ex);
+    }
+
+    try {
       scmMetadataStore.stop();
     } catch (Exception ex) {
       LOG.error("SCM Metadata store stop failed", ex);
@@ -971,12 +975,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
       ms.stop();
     }
 
-    try {
-      scmHAManager.shutdown();
-    } catch (Exception ex) {
-      LOG.error("SCM HA Manager stop failed", ex);
-    }
-
     scmSafeModeManager.stop();
   }
 
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/metadata/TestSCMTransactionInfoCodec.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/metadata/TestSCMTransactionInfoCodec.java
new file mode 100644
index 0000000..f0939ab
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/metadata/TestSCMTransactionInfoCodec.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.metadata;
+
+
+import org.apache.hadoop.hdds.scm.ha.SCMTransactionInfo;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.Assert.fail;
+
+public class TestSCMTransactionInfoCodec {
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  private SCMTransactionInfoCodec codec;
+
+  @Before
+  public void setUp() {
+    codec = new SCMTransactionInfoCodec();
+  }
+
+  @Test
+  public void toAndFromPersistedFormat() throws IOException {
+    SCMTransactionInfo scmTransactionInfo =
+        new SCMTransactionInfo.Builder().setTransactionIndex(100)
+            .setCurrentTerm(11).build();
+
+    SCMTransactionInfo convertedTransactionInfo =
+        codec.fromPersistedFormat(codec.toPersistedFormat(scmTransactionInfo));
+
+    Assert.assertEquals(scmTransactionInfo, convertedTransactionInfo);
+  }
+
+  @Test
+  public void testCodecWithNullDataFromTable() throws Exception {
+    thrown.expect(NullPointerException.class);
+    codec.fromPersistedFormat(null);
+  }
+
+  @Test
+  public void testCodecWithNullDataFromUser() throws Exception {
+    thrown.expect(NullPointerException.class);
+    codec.toPersistedFormat(null);
+  }
+
+  @Test
+  public void testCodecWithIncorrectValues() throws Exception {
+    try {
+      codec.fromPersistedFormat("random".getBytes(StandardCharsets.UTF_8));
+      fail("testCodecWithIncorrectValues failed");
+    } catch (IllegalStateException ex) {
+      GenericTestUtils.assertExceptionContains("Incorrect TransactionInfo " +
+          "value", ex);
+    }
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
index c2dc995..654fd9c 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.ha.DBTransactionBuffer;
+import org.apache.hadoop.hdds.scm.ha.MockDBTransactionBuffer;
 import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
@@ -104,9 +106,21 @@ public class TestPipelineManagerImpl {
         SCMContext.emptyContext());
   }
 
+  private PipelineManagerV2Impl createPipelineManager(
+      boolean isLeader, DBTransactionBuffer buffer) throws IOException {
+    return PipelineManagerV2Impl.newPipelineManager(conf,
+        MockSCMHAManager.getInstance(isLeader, buffer),
+        new MockNodeManager(true, 20),
+        SCMDBDefinition.PIPELINES.getTable(dbStore),
+        new EventQueue(),
+        SCMContext.emptyContext());
+  }
+
   @Test
   public void testCreatePipeline() throws Exception {
-    PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
+    DBTransactionBuffer buffer1 = new MockDBTransactionBuffer(dbStore);
+    PipelineManagerV2Impl pipelineManager =
+        createPipelineManager(true, buffer1);
     Assert.assertTrue(pipelineManager.getPipelines().isEmpty());
     pipelineManager.allowPipelineCreation();
     Pipeline pipeline1 = pipelineManager.createPipeline(
@@ -118,15 +132,19 @@ public class TestPipelineManagerImpl {
         HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.ONE);
     Assert.assertEquals(2, pipelineManager.getPipelines().size());
     Assert.assertTrue(pipelineManager.containsPipeline(pipeline2.getId()));
+    buffer1.close();
     pipelineManager.close();
 
-    PipelineManagerV2Impl pipelineManager2 = createPipelineManager(true);
+    DBTransactionBuffer buffer2 = new MockDBTransactionBuffer(dbStore);
+    PipelineManagerV2Impl pipelineManager2 =
+        createPipelineManager(true, buffer2);
     // Should be able to load previous pipelines.
     Assert.assertFalse(pipelineManager2.getPipelines().isEmpty());
     Assert.assertEquals(2, pipelineManager.getPipelines().size());
     pipelineManager2.allowPipelineCreation();
     Pipeline pipeline3 = pipelineManager2.createPipeline(
         HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE);
+    buffer2.close();
     Assert.assertEquals(3, pipelineManager2.getPipelines().size());
     Assert.assertTrue(pipelineManager2.containsPipeline(pipeline3.getId()));
 
@@ -151,7 +169,9 @@ public class TestPipelineManagerImpl {
 
   @Test
   public void testUpdatePipelineStates() throws Exception {
-    PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
+    DBTransactionBuffer buffer = new MockDBTransactionBuffer(dbStore);
+    PipelineManagerV2Impl pipelineManager =
+        createPipelineManager(true, buffer);
     Table<PipelineID, Pipeline> pipelineStore =
         SCMDBDefinition.PIPELINES.getTable(dbStore);
     pipelineManager.allowPipelineCreation();
@@ -160,6 +180,7 @@ public class TestPipelineManagerImpl {
     Assert.assertEquals(1, pipelineManager.getPipelines().size());
     Assert.assertTrue(pipelineManager.containsPipeline(pipeline.getId()));
     Assert.assertEquals(ALLOCATED, pipeline.getPipelineState());
+    buffer.flush();
     Assert.assertEquals(ALLOCATED,
         pipelineStore.get(pipeline.getId()).getPipelineState());
     PipelineID pipelineID = pipeline.getId();
@@ -170,11 +191,13 @@ public class TestPipelineManagerImpl {
         .getPipelines(HddsProtos.ReplicationType.RATIS,
             HddsProtos.ReplicationFactor.THREE,
             Pipeline.PipelineState.OPEN).contains(pipeline));
+    buffer.flush();
     Assert.assertTrue(pipelineStore.get(pipeline.getId()).isOpen());
 
     pipelineManager.deactivatePipeline(pipeline.getId());
     Assert.assertEquals(Pipeline.PipelineState.DORMANT,
         pipelineManager.getPipeline(pipelineID).getPipelineState());
+    buffer.flush();
     Assert.assertEquals(Pipeline.PipelineState.DORMANT,
         pipelineStore.get(pipeline.getId()).getPipelineState());
     Assert.assertFalse(pipelineManager
@@ -187,6 +210,7 @@ public class TestPipelineManagerImpl {
         .getPipelines(HddsProtos.ReplicationType.RATIS,
             HddsProtos.ReplicationFactor.THREE,
             Pipeline.PipelineState.OPEN).contains(pipeline));
+    buffer.flush();
     Assert.assertTrue(pipelineStore.get(pipeline.getId()).isOpen());
     pipelineManager.close();
   }
@@ -430,7 +454,9 @@ public class TestPipelineManagerImpl {
 
   @Test
   public void testPipelineOpenOnlyWhenLeaderReported() throws Exception {
-    PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
+    DBTransactionBuffer buffer1 = new MockDBTransactionBuffer(dbStore);
+    PipelineManagerV2Impl pipelineManager =
+        createPipelineManager(true, buffer1);
     pipelineManager.allowPipelineCreation();
 
     pipelineManager.onMessage(
@@ -439,6 +465,7 @@ public class TestPipelineManagerImpl {
         .createPipeline(HddsProtos.ReplicationType.RATIS,
             HddsProtos.ReplicationFactor.THREE);
     // close manager
+    buffer1.close();
     pipelineManager.close();
     // new pipeline manager loads the pipelines from the db in ALLOCATED state
     pipelineManager = createPipelineManager(true);
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java
index a04ecea..ef579d1 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.ha.SCMTransactionInfo;
 import org.apache.hadoop.hdds.scm.metadata.PipelineCodec;
 import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
 import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore;
@@ -117,6 +118,11 @@ public class TestSCMStoreImplWithOldPipelineIDKeyFormat
   }
 
   @Override
+  public Table<String, SCMTransactionInfo> getTransactionInfoTable() {
+    return null;
+  }
+
+  @Override
   public BatchOperationHandler getBatchHandler() {
     return null;
   }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMSnapshot.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMSnapshot.java
new file mode 100644
index 0000000..e1d43a5
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMSnapshot.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
+import org.apache.hadoop.hdds.scm.ha.SCMHAConfiguration;
+import org.apache.hadoop.hdds.scm.ha.SCMTransactionInfo;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.RATIS;
+import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
+
+public class TestSCMSnapshot {
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration conf;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    conf = new OzoneConfiguration();
+    SCMHAConfiguration scmhaConfiguration = conf.getObject(
+        SCMHAConfiguration.class);
+    scmhaConfiguration.setRatisSnapshotThreshold(1L);
+    conf.setFromObject(scmhaConfiguration);
+    cluster = MiniOzoneCluster
+        .newBuilder(conf)
+        .setNumDatanodes(3)
+        .build();
+    cluster.waitForClusterToBeReady();
+  }
+
+  @Test
+  public void testSnapshot() throws Exception {
+    StorageContainerManager scm = cluster.getStorageContainerManager();
+    long snapshotInfo1 = scm.getScmHAManager().getDBTransactionBuffer()
+        .getLatestTrxInfo().getTransactionIndex();
+    ContainerManagerV2 containerManager = scm.getContainerManager();
+    PipelineManager pipelineManager = scm.getPipelineManager();
+    Pipeline ratisPipeline1 = pipelineManager.getPipeline(
+        containerManager.allocateContainer(
+            RATIS, THREE, "Owner1").getPipelineID());
+    pipelineManager.openPipeline(ratisPipeline1.getId());
+    Pipeline ratisPipeline2 = pipelineManager.getPipeline(
+        containerManager.allocateContainer(
+            RATIS, ONE, "Owner2").getPipelineID());
+    pipelineManager.openPipeline(ratisPipeline2.getId());
+    long snapshotInfo2 = scm.getScmHAManager().getDBTransactionBuffer()
+        .getLatestTrxInfo().getTransactionIndex();
+
+    Assert.assertTrue(
+        String.format("Snapshot index 2 {} should greater than Snapshot " +
+            "index 1 {}", snapshotInfo2, snapshotInfo1),
+        snapshotInfo2 > snapshotInfo1);
+
+    Table<String, SCMTransactionInfo> trxInfo =
+        scm.getScmMetadataStore().getTransactionInfoTable();
+    SCMTransactionInfo scmTransactionInfo = trxInfo.get(TRANSACTION_INFO_KEY);
+
+    Assert.assertTrue(
+        "DB trx info:" + scmTransactionInfo.getTransactionIndex()
+        + ", latestSnapshotInfo:" + snapshotInfo2,
+        scmTransactionInfo.getTransactionIndex() >= snapshotInfo2);
+
+    cluster.restartStorageContainerManager(false);
+    SCMTransactionInfo trxInfoAfterRestart =
+        scm.getScmHAManager().getDBTransactionBuffer().getLatestTrxInfo();
+    Assert.assertTrue(
+        trxInfoAfterRestart.getTransactionIndex() >= snapshotInfo2);
+    try {
+      pipelineManager.getPipeline(ratisPipeline1.getId());
+      pipelineManager.getPipeline(ratisPipeline2.getId());
+    } catch (PipelineNotFoundException e) {
+      Assert.fail("Should not see a PipelineNotFoundException");
+    }
+  }
+
+  @AfterClass
+  public static void shutdown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index 58e4597..d0cd642 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -300,6 +300,7 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
   public void restartStorageContainerManager(boolean waitForDatanode)
       throws TimeoutException, InterruptedException, IOException,
       AuthenticationException {
+    LOG.info("Restarting SCM in cluster " + this.getClass());
     scm.stop();
     scm.join();
     scm = TestUtils.getScmSimple(conf);
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
index 4b71f44..73e260f 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
@@ -32,9 +32,11 @@ import org.apache.hadoop.hdds.scm.container.ContainerManagerImpl;
 import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.ha.MockDBTransactionBuffer;
 import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.utils.db.DBStore;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
 import org.apache.hadoop.ozone.recon.persistence.ContainerSchemaManager;
@@ -66,12 +68,13 @@ public class ReconContainerManager extends ContainerManagerImpl {
    */
   public ReconContainerManager(
       Configuration conf,
+      DBStore store,
       Table<ContainerID, ContainerInfo> containerStore,
       PipelineManager pipelineManager,
       StorageContainerServiceProvider scm,
       ContainerSchemaManager containerSchemaManager) throws IOException {
-    super(conf, MockSCMHAManager.getInstance(true),
-        pipelineManager, containerStore);
+    super(conf, MockSCMHAManager.getInstance(true,
+        new MockDBTransactionBuffer(store)), pipelineManager, containerStore);
     this.scmClient = scm;
     this.pipelineManager = pipelineManager;
     this.containerSchemaManager = containerSchemaManager;
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
index a406779..80648cf 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
@@ -123,6 +123,7 @@ public class ReconStorageContainerManagerFacade
             ReconSCMDBDefinition.PIPELINES.getTable(dbStore),
             eventQueue);
     this.containerManager = new ReconContainerManager(conf,
+        dbStore,
         ReconSCMDBDefinition.CONTAINERS.getTable(dbStore),
         pipelineManager,
         scmServiceProvider,
diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java
index f3592bc..93cf256 100644
--- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java
+++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java
@@ -83,6 +83,7 @@ public class AbstractReconContainerManagerTest {
         ReconSCMDBDefinition.PIPELINES.getTable(store), eventQueue);
     containerManager = new ReconContainerManager(
         conf,
+        store,
         ReconSCMDBDefinition.CONTAINERS.getTable(store),
         pipelineManager,
         getScmServiceProvider(),
diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconContainerManager.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconContainerManager.java
index c493e2c..fec673a 100644
--- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconContainerManager.java
+++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconContainerManager.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
 import org.junit.Test;
 
 /**
@@ -72,6 +73,8 @@ public class TestReconContainerManager
     assertEquals(containerID, containersInPipeline.first());
 
     // Verify container DB.
+    SCMHAManager scmhaManager = containerManager.getSCMHAManager();
+    scmhaManager.getDBTransactionBuffer().close();
     assertTrue(getContainerTable().isExist(containerID));
   }
 
@@ -95,6 +98,8 @@ public class TestReconContainerManager
     assertEquals(1, containers.size());
     assertEquals(containerInfo, containers.get(0));
     // Verify container DB.
+    SCMHAManager scmhaManager = containerManager.getSCMHAManager();
+    scmhaManager.getDBTransactionBuffer().close();
     assertTrue(getContainerTable().isExist(containerID));
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org