You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yq...@apache.org on 2019/03/21 03:03:08 UTC

[hadoop] branch trunk updated: HDDS-1233. Create an Ozone Manager Service provider for Recon. Contributed by Aravindan Vijayan.

This is an automated email from the ASF dual-hosted git repository.

yqlin pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 60cdd4c  HDDS-1233. Create an Ozone Manager Service provider for Recon. Contributed by Aravindan Vijayan.
60cdd4c is described below

commit 60cdd4cac17e547edbd9cd58c19ef27a8409b9c3
Author: Yiqun Lin <yq...@apache.org>
AuthorDate: Thu Mar 21 11:02:29 2019 +0800

    HDDS-1233. Create an Ozone Manager Service provider for Recon. Contributed by Aravindan Vijayan.
---
 .../org/apache/hadoop/utils/db/DBStoreBuilder.java |   8 +-
 .../hadoop/utils/db/RDBCheckpointManager.java      |  46 ----
 .../java/org/apache/hadoop/utils/db/RDBStore.java  |  25 +-
 .../apache/hadoop/utils/db/RocksDBCheckpoint.java  |  81 ++++++
 .../common/src/main/resources/ozone-default.xml    |  85 ++++++-
 .../org/apache/hadoop/utils/db/TestRDBStore.java   |  43 ++++
 .../org/apache/hadoop/hdds/server/ServerUtils.java |  19 +-
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     | 145 ++++++-----
 hadoop-ozone/ozone-recon/pom.xml                   |  26 +-
 .../apache/hadoop/ozone/recon/ReconConstants.java  |   3 +
 .../hadoop/ozone/recon/ReconControllerModule.java  |  14 +-
 .../hadoop/ozone/recon/ReconServerConfigKeys.java  |  37 ++-
 .../org/apache/hadoop/ozone/recon/ReconUtils.java  | 178 +++++++++++++
 .../ReconOMMetadataManager.java}                   |  24 +-
 .../recon/recovery/ReconOmMetadataManagerImpl.java |  99 ++++++++
 .../recon/spi/OzoneManagerServiceProvider.java     |  15 ++
 .../ozone/recon/spi/ReconContainerDBProvider.java  |  35 +--
 .../spi/impl/OzoneManagerServiceProviderImpl.java  | 211 ++++++++++++++++
 .../apache/hadoop/ozone/recon/TestReconUtils.java  | 135 ++++++++++
 .../recovery/TestReconOmMetadataManagerImpl.java   | 148 +++++++++++
 .../hadoop/ozone/recon/recovery/package-info.java} |   8 +-
 .../impl/TestContainerDBServiceProviderImpl.java   |   3 -
 .../impl/TestOzoneManagerServiceProviderImpl.java  | 275 +++++++++++++++++++++
 23 files changed, 1505 insertions(+), 158 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStoreBuilder.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStoreBuilder.java
index 3459b20..34bdc5d 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStoreBuilder.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStoreBuilder.java
@@ -57,6 +57,7 @@ public final class DBStoreBuilder {
   private List<String> tableNames;
   private Configuration configuration;
   private CodecRegistry registry;
+  private boolean readOnly = false;
 
   private DBStoreBuilder(Configuration configuration) {
     tables = new HashSet<>();
@@ -113,6 +114,11 @@ public final class DBStoreBuilder {
     return this;
   }
 
+  public DBStoreBuilder setReadOnly(boolean rdOnly) {
+    readOnly = rdOnly;
+    return this;
+  }
+
   /**
    * Builds a DBStore instance and returns that.
    *
@@ -131,7 +137,7 @@ public final class DBStoreBuilder {
     if (!dbFile.getParentFile().exists()) {
       throw new IOException("The DB destination directory should exist.");
     }
-    return new RDBStore(dbFile, options, tables, registry);
+    return new RDBStore(dbFile, options, tables, registry, readOnly);
   }
 
   /**
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBCheckpointManager.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBCheckpointManager.java
index ce716c3..68d196f 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBCheckpointManager.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBCheckpointManager.java
@@ -19,13 +19,11 @@
 
 package org.apache.hadoop.utils.db;
 
-import java.io.IOException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.time.Duration;
 import java.time.Instant;
 
-import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.rocksdb.Checkpoint;
 import org.rocksdb.RocksDB;
@@ -99,48 +97,4 @@ public class RDBCheckpointManager {
     }
     return null;
   }
-
-  static class RocksDBCheckpoint implements DBCheckpoint {
-
-    private Path checkpointLocation;
-    private long checkpointTimestamp;
-    private long latestSequenceNumber;
-    private long checkpointCreationTimeTaken;
-
-    RocksDBCheckpoint(Path checkpointLocation,
-                              long snapshotTimestamp,
-                              long latestSequenceNumber,
-                              long checkpointCreationTimeTaken) {
-      this.checkpointLocation = checkpointLocation;
-      this.checkpointTimestamp = snapshotTimestamp;
-      this.latestSequenceNumber = latestSequenceNumber;
-      this.checkpointCreationTimeTaken = checkpointCreationTimeTaken;
-    }
-
-    @Override
-    public Path getCheckpointLocation() {
-      return this.checkpointLocation;
-    }
-
-    @Override
-    public long getCheckpointTimestamp() {
-      return this.checkpointTimestamp;
-    }
-
-    @Override
-    public long getLatestSequenceNumber() {
-      return this.latestSequenceNumber;
-    }
-
-    @Override
-    public long checkpointCreationTimeTaken() {
-      return checkpointCreationTimeTaken;
-    }
-
-    @Override
-    public void cleanupCheckpoint() throws IOException {
-      LOG.debug("Cleaning up checkpoint at " + checkpointLocation.toString());
-      FileUtils.deleteDirectory(checkpointLocation.toFile());
-    }
-  }
 }
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java
index a5f029a..f35df95 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java
@@ -55,24 +55,24 @@ import org.slf4j.LoggerFactory;
 public class RDBStore implements DBStore {
   private static final Logger LOG =
       LoggerFactory.getLogger(RDBStore.class);
-  private final RocksDB db;
-  private final File dbLocation;
+  private RocksDB db;
+  private File dbLocation;
   private final WriteOptions writeOptions;
   private final DBOptions dbOptions;
   private final CodecRegistry codecRegistry;
   private final Hashtable<String, ColumnFamilyHandle> handleTable;
   private ObjectName statMBeanName;
   private RDBCheckpointManager checkPointManager;
-  private final String checkpointsParentDir;
+  private String checkpointsParentDir;
 
   @VisibleForTesting
   public RDBStore(File dbFile, DBOptions options,
                   Set<TableConfig> families) throws IOException {
-    this(dbFile, options, families, new CodecRegistry());
+    this(dbFile, options, families, new CodecRegistry(), false);
   }
 
   public RDBStore(File dbFile, DBOptions options, Set<TableConfig> families,
-                  CodecRegistry registry)
+                  CodecRegistry registry, boolean readOnly)
       throws IOException {
     Preconditions.checkNotNull(dbFile, "DB file location cannot be null");
     Preconditions.checkNotNull(families);
@@ -93,8 +93,13 @@ public class RDBStore implements DBStore {
     writeOptions = new WriteOptions();
 
     try {
-      db = RocksDB.open(dbOptions, dbLocation.getAbsolutePath(),
-          columnFamilyDescriptors, columnFamilyHandles);
+      if (readOnly) {
+        db = RocksDB.openReadOnly(dbOptions, dbLocation.getAbsolutePath(),
+            columnFamilyDescriptors, columnFamilyHandles);
+      } else {
+        db = RocksDB.open(dbOptions, dbLocation.getAbsolutePath(),
+            columnFamilyDescriptors, columnFamilyHandles);
+      }
 
       for (int x = 0; x < columnFamilyHandles.size(); x++) {
         handleTable.put(
@@ -278,4 +283,10 @@ public class RDBStore implements DBStore {
     return checkPointManager.createCheckpoint(checkpointsParentDir);
   }
 
+  /**
+   * Get current DB Location.
+   */
+  public File getDbLocation() {
+    return dbLocation;
+  }
 }
\ No newline at end of file
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RocksDBCheckpoint.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RocksDBCheckpoint.java
new file mode 100644
index 0000000..88b3f75
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RocksDBCheckpoint.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.utils.db;
+
+import java.io.IOException;
+import java.nio.file.Path;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class to hold information and location of a RocksDB Checkpoint.
+ */
+public class RocksDBCheckpoint implements DBCheckpoint {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RocksDBCheckpoint.class);
+
+  private Path checkpointLocation;
+  private long checkpointTimestamp = System.currentTimeMillis();
+  private long latestSequenceNumber = -1;
+  private long checkpointCreationTimeTaken = 0L;
+
+  public RocksDBCheckpoint(Path checkpointLocation) {
+    this.checkpointLocation = checkpointLocation;
+  }
+
+  public RocksDBCheckpoint(Path checkpointLocation,
+                    long snapshotTimestamp,
+                    long latestSequenceNumber,
+                    long checkpointCreationTimeTaken) {
+    this.checkpointLocation = checkpointLocation;
+    this.checkpointTimestamp = snapshotTimestamp;
+    this.latestSequenceNumber = latestSequenceNumber;
+    this.checkpointCreationTimeTaken = checkpointCreationTimeTaken;
+  }
+
+  @Override
+  public Path getCheckpointLocation() {
+    return this.checkpointLocation;
+  }
+
+  @Override
+  public long getCheckpointTimestamp() {
+    return this.checkpointTimestamp;
+  }
+
+  @Override
+  public long getLatestSequenceNumber() {
+    return this.latestSequenceNumber;
+  }
+
+  @Override
+  public long checkpointCreationTimeTaken() {
+    return checkpointCreationTimeTaken;
+  }
+
+  @Override
+  public void cleanupCheckpoint() throws IOException {
+    LOG.debug("Cleaning up checkpoint at " + checkpointLocation.toString());
+    FileUtils.deleteDirectory(checkpointLocation.toFile());
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 99977f8..462a07b 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -2258,7 +2258,7 @@
     </description>
   </property>
   <property>
-    <name>ozone.recon.db.dirs</name>
+    <name>ozone.recon.db.dir</name>
     <value/>
     <tag>OZONE, RECON, STORAGE, PERFORMANCE</tag>
     <description>
@@ -2273,7 +2273,88 @@
     <name>ozone.scm.network.topology.schema.file</name>
     <value>network-topology-default.xm</value>
     <tag>OZONE, MANAGEMENT</tag>
-    <description>The schema file defines the ozone network topology.
+    <description>
+      The schema file defines the ozone network topology
+    </description>
+  </property>
+  <property>
+    <name>ozone.recon.container.db.impl</name>
+    <value>RocksDB</value>
+    <tag>OZONE, RECON, STORAGE</tag>
+    <description>
+      Ozone Recon container DB store implementation.Supported value is either
+      LevelDB or RocksDB.
+    </description>
+  </property>
+  <property>
+    <name>ozone.recon.om.db.dir</name>
+  <value/>
+  <tag>OZONE, RECON, STORAGE</tag>
+  <description>
+    Directory where the Recon Server stores its OM snapshot DB. This should
+    be specified as a single directory. If the directory does not
+    exist then the Recon will attempt to create it.
+    If undefined, then the Recon will log a warning and fallback to
+    ozone.metadata.dirs.
+  </description>
+</property>
+  <property>
+    <name>recon.om.connection.request.timeout</name>
+    <value>5000</value>
+    <tag>OZONE, RECON, OM</tag>
+    <description>
+      Connection request timeout in milliseconds for HTTP call made by Recon to
+      request OM DB snapshot.
+    </description>
+  </property>
+  <property>
+    <name>recon.om.connection.timeout</name>
+    <value>5s</value>
+    <tag>OZONE, RECON, OM</tag>
+    <description>
+      Connection timeout for HTTP call in milliseconds made by Recon to request
+      OM snapshot.
+    </description>
+  </property>
+  <property>
+    <name>recon.om.socket.timeout</name>
+    <value>5s</value>
+    <tag>OZONE, RECON, OM</tag>
+    <description>
+      Socket timeout in milliseconds for HTTP call made by Recon to request
+      OM snapshot.
+    </description>
+  </property>
+  <property>
+    <name>recon.om.socket.timeout</name>
+    <value>5s</value>
+    <tag>OZONE, RECON, OM</tag>
+    <description>
+      Socket timeout for HTTP call made by Recon to request OM snapshot.
+    </description>
+  </property>
+  <property>
+    <name>recon.om.snapshot.task.initial.delay</name>
+    <value>1m</value>
+    <tag>OZONE, RECON, OM</tag>
+    <description>
+      Initial delay in MINUTES by Recon to request OM DB Snapshot.
+    </description>
+  </property>
+  <property>
+    <name>recon.om.snapshot.task.interval.delay</name>
+    <value>10m</value>
+    <tag>OZONE, RECON, OM</tag>
+    <description>
+      Interval in MINUTES by Recon to request OM DB Snapshot.
+    </description>
+  </property>
+  <property>
+    <name>recon.om.snapshot.task.flush.param</name>
+    <value>false</value>
+    <tag>OZONE, RECON, OM</tag>
+    <description>
+      Request to flush the OM DB before taking checkpoint snapshot.
     </description>
   </property>
 </configuration>
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/TestRDBStore.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/TestRDBStore.java
index 6d51034..24a9ee5 100644
--- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/TestRDBStore.java
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/TestRDBStore.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.utils.db;
 
 import javax.management.MBeanServer;
 
+import java.io.File;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.nio.charset.StandardCharsets;
@@ -289,4 +290,46 @@ public class TestRDBStore {
           checkpoint.getCheckpointLocation()));
     }
   }
+
+  @Test
+  public void testReadOnlyRocksDB() throws Exception {
+    File dbFile = folder.newFolder();
+    byte[] key = "Key1".getBytes();
+    byte[] value = "Value1".getBytes();
+
+    //Create Rdb and write some data into it.
+    RDBStore newStore = new RDBStore(dbFile, options, configSet);
+    Assert.assertNotNull("DB Store cannot be null", newStore);
+    Table firstTable = newStore.getTable(families.get(0));
+    Assert.assertNotNull("Table cannot be null", firstTable);
+    firstTable.put(key, value);
+
+    RocksDBCheckpoint checkpoint = (RocksDBCheckpoint) newStore.getCheckpoint(
+        true);
+
+    //Create Read Only DB from snapshot of first DB.
+    RDBStore snapshotStore = new RDBStore(checkpoint.getCheckpointLocation()
+        .toFile(), options, configSet, new CodecRegistry(), true);
+
+    Assert.assertNotNull("DB Store cannot be null", newStore);
+
+    //Verify read is allowed.
+    firstTable = snapshotStore.getTable(families.get(0));
+    Assert.assertNotNull("Table cannot be null", firstTable);
+    Assert.assertTrue(Arrays.equals(((byte[])firstTable.get(key)), value));
+
+    //Verify write is not allowed.
+    byte[] key2 =
+        RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
+    byte[] value2 =
+        RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
+    try {
+      firstTable.put(key2, value2);
+      Assert.fail();
+    } catch (IOException e) {
+      Assert.assertTrue(e.getMessage()
+          .contains("Not supported operation in read only mode"));
+    }
+    checkpoint.cleanupCheckpoint();
+  }
 }
\ No newline at end of file
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/ServerUtils.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/ServerUtils.java
index ab5d2ec..22b50c0 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/ServerUtils.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/ServerUtils.java
@@ -125,8 +125,7 @@ public final class ServerUtils {
    * @return
    */
   public static File getScmDbDir(Configuration conf) {
-
-    File metadataDir = getDirWithFallBackToOzoneMetadata(conf, ScmConfigKeys
+    File metadataDir = getDirectoryFromConfig(conf, ScmConfigKeys
         .OZONE_SCM_DB_DIRS, "SCM");
     if (metadataDir != null) {
       return metadataDir;
@@ -138,9 +137,17 @@ public final class ServerUtils {
     return getOzoneMetaDirPath(conf);
   }
 
-  public static File getDirWithFallBackToOzoneMetadata(Configuration conf,
-                                                       String key,
-                                                       String componentName) {
+  /**
+   * Utility method to get value of a given key that corresponds to a DB
+   * directory.
+   * @param conf configuration bag
+   * @param key Key to test
+   * @param componentName Which component's key is this
+   * @return File created from the value of the key in conf.
+   */
+  public static File getDirectoryFromConfig(Configuration conf,
+                                             String key,
+                                             String componentName) {
     final Collection<String> metadirs = conf.getTrimmedStringCollection(key);
 
     if (metadirs.size() > 1) {
@@ -155,7 +162,7 @@ public final class ServerUtils {
       if (!dbDirPath.exists() && !dbDirPath.mkdirs()) {
         throw new IllegalArgumentException("Unable to create directory " +
             dbDirPath + " specified in configuration setting " +
-            ScmConfigKeys.OZONE_SCM_DB_DIRS);
+            componentName);
       }
       return dbDirPath;
     }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index 51d51b8..cddddc7 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -132,6 +132,15 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
     start(conf);
   }
 
+  /**
+   * For subclass overriding.
+   */
+  protected OmMetadataManagerImpl() {
+    this.lock = new OzoneManagerLock(new OzoneConfiguration());
+    this.openKeyExpireThresholdMS =
+        OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT;
+  }
+
   @Override
   public Table<String, VolumeList> getUserTable() {
     return userTable;
@@ -198,67 +207,79 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
     if (store == null) {
       File metaDir = OmUtils.getOmDbDir(configuration);
 
-      this.store = DBStoreBuilder.newBuilder(configuration)
+      DBStoreBuilder dbStoreBuilder = DBStoreBuilder.newBuilder(configuration)
           .setName(OM_DB_NAME)
-          .setPath(Paths.get(metaDir.getPath()))
-          .addTable(USER_TABLE)
-          .addTable(VOLUME_TABLE)
-          .addTable(BUCKET_TABLE)
-          .addTable(KEY_TABLE)
-          .addTable(DELETED_TABLE)
-          .addTable(OPEN_KEY_TABLE)
-          .addTable(S3_TABLE)
-          .addTable(MULTIPARTINFO_TABLE)
-          .addTable(S3_SECRET_TABLE)
-          .addTable(DELEGATION_TOKEN_TABLE)
-          .addCodec(OzoneTokenIdentifier.class, new TokenIdentifierCodec())
-          .addCodec(OmKeyInfo.class, new OmKeyInfoCodec())
-          .addCodec(OmBucketInfo.class, new OmBucketInfoCodec())
-          .addCodec(OmVolumeArgs.class, new OmVolumeArgsCodec())
-          .addCodec(VolumeList.class, new VolumeListCodec())
-          .addCodec(OmMultipartKeyInfo.class, new OmMultipartKeyInfoCodec())
-          .build();
-
-      userTable =
-          this.store.getTable(USER_TABLE, String.class, VolumeList.class);
-      checkTableStatus(userTable, USER_TABLE);
-      this.store.getTable(VOLUME_TABLE, String.class,
-          String.class);
-      volumeTable =
-          this.store.getTable(VOLUME_TABLE, String.class, OmVolumeArgs.class);
-      checkTableStatus(volumeTable, VOLUME_TABLE);
-
-      bucketTable =
-          this.store.getTable(BUCKET_TABLE, String.class, OmBucketInfo.class);
-      checkTableStatus(bucketTable, BUCKET_TABLE);
-
-      keyTable = this.store.getTable(KEY_TABLE, String.class, OmKeyInfo.class);
-      checkTableStatus(keyTable, KEY_TABLE);
-
-      deletedTable =
-          this.store.getTable(DELETED_TABLE, String.class, OmKeyInfo.class);
-      checkTableStatus(deletedTable, DELETED_TABLE);
-
-      openKeyTable =
-          this.store.getTable(OPEN_KEY_TABLE, String.class, OmKeyInfo.class);
-      checkTableStatus(openKeyTable, OPEN_KEY_TABLE);
-
-      s3Table = this.store.getTable(S3_TABLE);
-      checkTableStatus(s3Table, S3_TABLE);
-
-      dTokenTable = this.store.getTable(DELEGATION_TOKEN_TABLE,
-          OzoneTokenIdentifier.class, Long.class);
-      checkTableStatus(dTokenTable, DELEGATION_TOKEN_TABLE);
-
-      multipartInfoTable = this.store.getTable(MULTIPARTINFO_TABLE,
-          String.class, OmMultipartKeyInfo.class);
-      checkTableStatus(multipartInfoTable, MULTIPARTINFO_TABLE);
-
-      s3SecretTable = this.store.getTable(S3_SECRET_TABLE);
-      checkTableStatus(s3SecretTable, S3_SECRET_TABLE);
+          .setPath(Paths.get(metaDir.getPath()));
+      this.store = addOMTablesAndCodecs(dbStoreBuilder).build();
+      initializeOmTables();
     }
   }
 
+  protected DBStoreBuilder addOMTablesAndCodecs(DBStoreBuilder builder) {
+
+    return builder.addTable(USER_TABLE)
+        .addTable(VOLUME_TABLE)
+        .addTable(BUCKET_TABLE)
+        .addTable(KEY_TABLE)
+        .addTable(DELETED_TABLE)
+        .addTable(OPEN_KEY_TABLE)
+        .addTable(S3_TABLE)
+        .addTable(MULTIPARTINFO_TABLE)
+        .addTable(DELEGATION_TOKEN_TABLE)
+        .addTable(S3_SECRET_TABLE)
+        .addCodec(OzoneTokenIdentifier.class, new TokenIdentifierCodec())
+        .addCodec(OmKeyInfo.class, new OmKeyInfoCodec())
+        .addCodec(OmBucketInfo.class, new OmBucketInfoCodec())
+        .addCodec(OmVolumeArgs.class, new OmVolumeArgsCodec())
+        .addCodec(VolumeList.class, new VolumeListCodec())
+        .addCodec(OmMultipartKeyInfo.class, new OmMultipartKeyInfoCodec());
+  }
+
+  /**
+   * Initialize OM Tables.
+   *
+   * @throws IOException
+   */
+  protected void initializeOmTables() throws IOException {
+    userTable =
+        this.store.getTable(USER_TABLE, String.class, VolumeList.class);
+    checkTableStatus(userTable, USER_TABLE);
+    this.store.getTable(VOLUME_TABLE, String.class,
+        String.class);
+    volumeTable =
+        this.store.getTable(VOLUME_TABLE, String.class, OmVolumeArgs.class);
+    checkTableStatus(volumeTable, VOLUME_TABLE);
+
+    bucketTable =
+        this.store.getTable(BUCKET_TABLE, String.class, OmBucketInfo.class);
+    checkTableStatus(bucketTable, BUCKET_TABLE);
+
+    keyTable = this.store.getTable(KEY_TABLE, String.class, OmKeyInfo.class);
+    checkTableStatus(keyTable, KEY_TABLE);
+
+    deletedTable =
+        this.store.getTable(DELETED_TABLE, String.class, OmKeyInfo.class);
+    checkTableStatus(deletedTable, DELETED_TABLE);
+
+    openKeyTable =
+        this.store.getTable(OPEN_KEY_TABLE, String.class, OmKeyInfo.class);
+    checkTableStatus(openKeyTable, OPEN_KEY_TABLE);
+
+    s3Table = this.store.getTable(S3_TABLE);
+    checkTableStatus(s3Table, S3_TABLE);
+
+    multipartInfoTable = this.store.getTable(MULTIPARTINFO_TABLE,
+        String.class, OmMultipartKeyInfo.class);
+    checkTableStatus(multipartInfoTable, MULTIPARTINFO_TABLE);
+
+    dTokenTable = this.store.getTable(DELEGATION_TOKEN_TABLE,
+        OzoneTokenIdentifier.class, Long.class);
+    checkTableStatus(dTokenTable, DELEGATION_TOKEN_TABLE);
+
+    s3SecretTable = this.store.getTable(S3_SECRET_TABLE);
+    checkTableStatus(s3SecretTable, S3_SECRET_TABLE);
+  }
+
   /**
    * Stop metadata manager.
    */
@@ -683,4 +704,14 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
   public Table<byte[], byte[]> getS3SecretTable() {
     return s3SecretTable;
   }
+
+  /**
+   * Update store used by subclass.
+   *
+   * @param store DB store.
+   */
+  protected void setStore(DBStore store) {
+    this.store = store;
+  }
+
 }
diff --git a/hadoop-ozone/ozone-recon/pom.xml b/hadoop-ozone/ozone-recon/pom.xml
index 50ffefa..bcb85e1 100644
--- a/hadoop-ozone/ozone-recon/pom.xml
+++ b/hadoop-ozone/ozone-recon/pom.xml
@@ -29,6 +29,10 @@
       <artifactId>hadoop-ozone-common</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-ozone-ozone-manager</artifactId>
+    </dependency>
+    <dependency>
       <groupId>com.google.inject</groupId>
       <artifactId>guice</artifactId>
       <version>${guice.version}</version>
@@ -57,9 +61,27 @@
     </dependency>
     <dependency>
       <groupId>org.mockito</groupId>
-      <artifactId>mockito-all</artifactId>
-      <version>1.10.19</version>
+      <artifactId>mockito-core</artifactId>
+      <version>2.8.9</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.powermock</groupId>
+      <artifactId>powermock-module-junit4</artifactId>
+      <version>1.7.4</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.powermock</groupId>
+      <artifactId>powermock-api-mockito2</artifactId>
+      <version>1.7.4</version>
       <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.mockito</groupId>
+          <artifactId>mockito-core</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
   </dependencies>
 </project>
\ No newline at end of file
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
index 1ea132a..dd399fd 100644
--- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
@@ -31,4 +31,7 @@ public final class ReconConstants {
 
   public static final String RECON_CONTAINER_DB = "recon-" +
       CONTAINER_DB_SUFFIX;
+
+  public static final String RECON_OM_SNAPSHOT_DB =
+      "om.snapshot.db";
 }
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
index f282b9a..cc0d8a1 100644
--- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
@@ -18,9 +18,13 @@
 package org.apache.hadoop.ozone.recon;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
+import org.apache.hadoop.ozone.recon.recovery.ReconOmMetadataManagerImpl;
+import org.apache.hadoop.ozone.recon.spi.OzoneManagerServiceProvider;
 import org.apache.hadoop.ozone.recon.spi.ReconContainerDBProvider;
 import org.apache.hadoop.ozone.recon.spi.ContainerDBServiceProvider;
 import org.apache.hadoop.ozone.recon.spi.impl.ContainerDBServiceProviderImpl;
+import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl;
 import org.apache.hadoop.utils.MetadataStore;
 
 import com.google.inject.AbstractModule;
@@ -34,10 +38,14 @@ public class ReconControllerModule extends AbstractModule {
   protected void configure() {
     bind(OzoneConfiguration.class).toProvider(OzoneConfigurationProvider.class);
     bind(ReconHttpServer.class).in(Singleton.class);
-    bind(MetadataStore.class).toProvider(ReconContainerDBProvider.class);
+    bind(MetadataStore.class)
+        .toProvider(ReconContainerDBProvider.class).in(Singleton.class);
+    bind(ReconOMMetadataManager.class)
+      .to(ReconOmMetadataManagerImpl.class).in(Singleton.class);
     bind(ContainerDBServiceProvider.class)
-        .to(ContainerDBServiceProviderImpl.class);
+      .to(ContainerDBServiceProviderImpl.class).in(Singleton.class);
+    bind(OzoneManagerServiceProvider.class)
+      .to(OzoneManagerServiceProviderImpl.class).in(Singleton.class);
   }
 
-
 }
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
index 5e4b732..1abc513 100644
--- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.ozone.recon;
 
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_ROCKSDB;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
@@ -50,7 +52,40 @@ public final class ReconServerConfigKeys {
       "ozone.recon.container.db.cache.size.mb";
   public static final int OZONE_RECON_CONTAINER_DB_CACHE_SIZE_DEFAULT = 128;
 
-  public static final String OZONE_RECON_DB_DIRS = "ozone.recon.db.dirs";
+  public static final String OZONE_RECON_DB_DIR = "ozone.recon.db.dir";
+
+  public static final String OZONE_RECON_OM_SNAPSHOT_DB_DIR =
+      "ozone.recon.om.db.dir";
+
+  public static final String RECON_OM_SOCKET_TIMEOUT =
+      "recon.om.socket.timeout";
+  public static final String RECON_OM_SOCKET_TIMEOUT_DEFAULT = "5s";
+
+  public static final String RECON_OM_CONNECTION_TIMEOUT =
+      "recon.om.connection.timeout";
+  public static final String RECON_OM_CONNECTION_TIMEOUT_DEFAULT = "5s";
+
+  public static final String RECON_OM_CONNECTION_REQUEST_TIMEOUT =
+      "recon.om.connection.request.timeout";
+  public static final String RECON_OM_CONNECTION_REQUEST_TIMEOUT_DEFAULT = "5s";
+
+  public static final String RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY =
+      "recon.om.snapshot.task.initial.delay";
+  public static final String
+      RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT = "1m";
+
+  public static final String OZONE_RECON_CONTAINER_DB_STORE_IMPL =
+      "ozone.recon.container.db.impl";
+  public static final String OZONE_RECON_CONTAINER_DB_STORE_IMPL_DEFAULT =
+      OZONE_METADATA_STORE_IMPL_ROCKSDB;
+
+  public static final String RECON_OM_SNAPSHOT_TASK_INTERVAL =
+      "recon.om.snapshot.task.interval.delay";
+  public static final String RECON_OM_SNAPSHOT_TASK_INTERVAL_DEFAULT
+      = "10m";
+
+  public static final String RECON_OM_SNAPSHOT_TASK_FLUSH_PARAM =
+      "recon.om.snapshot.task.flush.param";
 
   /**
    * Private constructor for utility class.
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
new file mode 100644
index 0000000..7bbbd92
--- /dev/null
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon;
+
+import static java.net.HttpURLConnection.HTTP_CREATED;
+import static java.net.HttpURLConnection.HTTP_OK;
+import static org.apache.hadoop.hdds.server.ServerUtils.getDirectoryFromConfig;
+import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Recon Utility class.
+ */
+public final class ReconUtils {
+
+  private final static int WRITE_BUFFER = 1048576; //1MB
+
+  private ReconUtils() {
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      ReconUtils.class);
+
+  /**
+   * Get configured Recon DB directory value based on config. If not present,
+   * fallback to om.metadata.dirs
+   *
+   * @param conf         configuration bag
+   * @param dirConfigKey key to check
+   * @return Return File based on configured or fallback value.
+   */
+  public static File getReconDbDir(Configuration conf, String dirConfigKey) {
+
+    File metadataDir = getDirectoryFromConfig(conf, dirConfigKey,
+        "Recon");
+    if (metadataDir != null) {
+      return metadataDir;
+    }
+
+    LOG.warn("{} is not configured. We recommend adding this setting. " +
+            "Falling back to {} instead.",
+        dirConfigKey, HddsConfigKeys.OZONE_METADATA_DIRS);
+    return getOzoneMetaDirPath(conf);
+  }
+
+  /**
+   * Untar DB snapshot tar file to recon OM snapshot directory.
+   *
+   * @param tarFile  source tar file
+   * @param destPath destination path to untar to.
+   * @throws IOException ioException
+   */
+  public static void untarCheckpointFile(File tarFile, Path destPath)
+      throws IOException {
+
+    FileInputStream fileInputStream = null;
+    BufferedInputStream buffIn = null;
+    GzipCompressorInputStream gzIn = null;
+    try {
+      fileInputStream = new FileInputStream(tarFile);
+      buffIn = new BufferedInputStream(fileInputStream);
+      gzIn = new GzipCompressorInputStream(buffIn);
+
+      //Create Destination directory if it does not exist.
+      if (!destPath.toFile().exists()) {
+        boolean success = destPath.toFile().mkdirs();
+        if (!success) {
+          throw new IOException("Unable to create Destination directory.");
+        }
+      }
+
+      try (TarArchiveInputStream tarInStream =
+               new TarArchiveInputStream(gzIn)) {
+        TarArchiveEntry entry = null;
+
+        while ((entry = (TarArchiveEntry) tarInStream.getNextEntry()) != null) {
+          //If directory, create a directory.
+          if (entry.isDirectory()) {
+            File f = new File(Paths.get(destPath.toString(),
+                entry.getName()).toString());
+            boolean success = f.mkdirs();
+            if (!success) {
+              LOG.error("Unable to create directory found in tar.");
+            }
+          } else {
+            //Write contents of file in archive to a new file.
+            int count;
+            byte[] data = new byte[WRITE_BUFFER];
+
+            FileOutputStream fos = new FileOutputStream(
+                Paths.get(destPath.toString(), entry.getName()).toString());
+            try (BufferedOutputStream dest =
+                     new BufferedOutputStream(fos, WRITE_BUFFER)) {
+              while ((count =
+                  tarInStream.read(data, 0, WRITE_BUFFER)) != -1) {
+                dest.write(data, 0, count);
+              }
+            }
+          }
+        }
+      }
+    } finally {
+      IOUtils.closeStream(gzIn);
+      IOUtils.closeStream(buffIn);
+      IOUtils.closeStream(fileInputStream);
+    }
+  }
+
+  /**
+   * Make HTTP GET call on the URL and return inputstream to the response.
+   * @param httpClient HttpClient to use.
+   * @param url url to call
+   * @return Inputstream to the response of the HTTP call.
+   * @throws IOException While reading the response.
+   */
+  public static InputStream makeHttpCall(CloseableHttpClient httpClient,
+                                         String url)
+      throws IOException {
+
+    HttpGet httpGet = new HttpGet(url);
+    HttpResponse response = httpClient.execute(httpGet);
+    int errorCode = response.getStatusLine().getStatusCode();
+    HttpEntity entity = response.getEntity();
+
+    if ((errorCode == HTTP_OK) || (errorCode == HTTP_CREATED)) {
+      return entity.getContent();
+    }
+
+    if (entity != null) {
+      throw new IOException("Unexpected exception when trying to reach Ozone " +
+          "Manager, " + EntityUtils.toString(entity));
+    } else {
+      throw new IOException("Unexpected null in http payload," +
+          " while processing request");
+    }
+  }
+
+}
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/recovery/ReconOMMetadataManager.java
similarity index 61%
copy from hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
copy to hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/recovery/ReconOMMetadataManager.java
index 1ea132a..fcfcaa5 100644
--- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/recovery/ReconOMMetadataManager.java
@@ -16,19 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.ozone.recon;
+package org.apache.hadoop.ozone.recon.recovery;
 
-import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_DB_SUFFIX;
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.ozone.om.OMMetadataManager;
 
 /**
- * Recon Server constants file.
+ * Interface for the OM Metadata Manager + DB store maintained by
+ * Recon.
  */
-public final class ReconConstants {
-
-  private ReconConstants() {
-    // Never Constructed
-  }
+public interface ReconOMMetadataManager extends OMMetadataManager {
 
-  public static final String RECON_CONTAINER_DB = "recon-" +
-      CONTAINER_DB_SUFFIX;
+  /**
+   * Refresh the DB instance to point to a new location. Get rid of the old
+   * DB instance.
+   * @param dbLocation New location of the OM Snapshot DB.
+   */
+  void updateOmDB(File dbLocation) throws IOException;
 }
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/recovery/ReconOmMetadataManagerImpl.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/recovery/ReconOmMetadataManagerImpl.java
new file mode 100644
index 0000000..e868314
--- /dev/null
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/recovery/ReconOmMetadataManagerImpl.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.recovery;
+
+import java.io.File;
+import java.io.IOException;
+
+import javax.inject.Inject;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.utils.db.DBStore;
+import org.apache.hadoop.utils.db.DBStoreBuilder;
+import org.apache.hadoop.utils.db.RDBStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Recon's implementation of the OM Metadata manager. By extending and
+ * relying on the OmMetadataManagerImpl, we can make sure all changes made to
+ * schema in OM will be automatically picked up by Recon.
+ */
+public class ReconOmMetadataManagerImpl extends OmMetadataManagerImpl
+    implements ReconOMMetadataManager {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ReconOmMetadataManagerImpl.class);
+
+  @Inject
+  private OzoneConfiguration ozoneConfiguration;
+
+  @Inject
+  public ReconOmMetadataManagerImpl(OzoneConfiguration configuration) {
+    this.ozoneConfiguration = configuration;
+  }
+
+  @Override
+  public void start(OzoneConfiguration configuration) throws IOException {
+    LOG.info("Starting ReconOMMetadataManagerImpl");
+  }
+
+  /**
+   * Replace existing DB instance with new one.
+   *
+   * @param dbFile new DB file location.
+   */
+  private void initializeNewRdbStore(File dbFile) throws IOException {
+    try {
+      DBStoreBuilder dbStoreBuilder =
+          DBStoreBuilder.newBuilder(ozoneConfiguration)
+          .setReadOnly(true)
+          .setName(dbFile.getName())
+          .setPath(dbFile.toPath().getParent());
+      addOMTablesAndCodecs(dbStoreBuilder);
+      DBStore newStore = dbStoreBuilder.build();
+      setStore(newStore);
+      LOG.info("Created new OM DB snapshot at {}.",
+          dbFile.getAbsolutePath());
+    } catch (IOException ioEx) {
+      LOG.error("Unable to initialize Recon OM DB snapshot store.",
+          ioEx);
+    }
+    if (getStore() != null) {
+      initializeOmTables();
+    }
+  }
+
+  @Override
+  public void updateOmDB(File newDbLocation) throws IOException {
+    if (getStore() != null) {
+      RDBStore rdbStore = (RDBStore) getStore();
+      File oldDBLocation = rdbStore.getDbLocation();
+      if (oldDBLocation.exists()) {
+        LOG.info("Cleaning up old OM snapshot db at {}.",
+            oldDBLocation.getAbsolutePath());
+        FileUtils.deleteQuietly(oldDBLocation);
+      }
+    }
+    initializeNewRdbStore(newDbLocation);
+  }
+
+}
\ No newline at end of file
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/OzoneManagerServiceProvider.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/OzoneManagerServiceProvider.java
index 7dae610..cdc87a0 100644
--- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/OzoneManagerServiceProvider.java
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/OzoneManagerServiceProvider.java
@@ -18,8 +18,23 @@ package org.apache.hadoop.ozone.recon.spi;
  * limitations under the License.
  */
 
+import java.io.IOException;
+
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+
 /**
  * Interface to access OM endpoints.
  */
 public interface OzoneManagerServiceProvider {
+
+  /**
+   * Start taking OM Snapshots.
+   */
+  void start() throws IOException;
+
+  /**
+   * Return instance of OM Metadata manager.
+   * @return OM metadata manager instance.
+   */
+  OMMetadataManager getOMMetadataManagerInstance();
 }
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/ReconContainerDBProvider.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/ReconContainerDBProvider.java
index 2227d49..1b61d92 100644
--- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/ReconContainerDBProvider.java
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/ReconContainerDBProvider.java
@@ -18,20 +18,18 @@
 
 package org.apache.hadoop.ozone.recon.spi;
 
-import static org.apache.hadoop.ozone.recon.ReconConstants.
-    RECON_CONTAINER_DB;
-import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.
-    OZONE_RECON_CONTAINER_DB_CACHE_SIZE_DEFAULT;
-import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.
-    OZONE_RECON_CONTAINER_DB_CACHE_SIZE_MB;
-import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.
-    OZONE_RECON_DB_DIRS;
+import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_CONTAINER_DB;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_CONTAINER_DB_CACHE_SIZE_DEFAULT;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_CONTAINER_DB_CACHE_SIZE_MB;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_CONTAINER_DB_STORE_IMPL;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_CONTAINER_DB_STORE_IMPL_DEFAULT;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DB_DIR;
+import static org.apache.hadoop.ozone.recon.ReconUtils.getReconDbDir;
 
 import java.io.File;
 import java.io.IOException;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.server.ServerUtils;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.utils.MetadataStore;
 import org.apache.hadoop.utils.MetadataStoreBuilder;
@@ -41,6 +39,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
+import com.google.inject.ProvisionException;
 
 /**
  * Provider for the Recon container DB (Metadata store).
@@ -57,21 +56,29 @@ public class ReconContainerDBProvider implements
 
   @Override
   public MetadataStore get() {
-    File metaDir = ServerUtils.getDirWithFallBackToOzoneMetadata(configuration,
-        OZONE_RECON_DB_DIRS, "Recon");
-    File containerDBPath = new File(metaDir, RECON_CONTAINER_DB);
+    File metaDir = getReconDbDir(configuration, OZONE_RECON_DB_DIR);
+    File containerDBPath = new File(metaDir,
+        RECON_CONTAINER_DB);
     int cacheSize = configuration.getInt(OZONE_RECON_CONTAINER_DB_CACHE_SIZE_MB,
         OZONE_RECON_CONTAINER_DB_CACHE_SIZE_DEFAULT);
 
+    String dbType = configuration.get(OZONE_RECON_CONTAINER_DB_STORE_IMPL,
+        OZONE_RECON_CONTAINER_DB_STORE_IMPL_DEFAULT);
+    MetadataStore metadataStore = null;
     try {
-      return MetadataStoreBuilder.newBuilder()
+      metadataStore = MetadataStoreBuilder.newBuilder()
           .setConf(configuration)
+          .setDBType(dbType)
           .setDbFile(containerDBPath)
           .setCacheSize(cacheSize * OzoneConsts.MB)
           .build();
     } catch (IOException ioEx) {
       LOG.error("Unable to initialize Recon container metadata store.", ioEx);
     }
-    return null;
+    if (metadataStore == null) {
+      throw new ProvisionException("Unable to provide instance of Metadata " +
+          "store.");
+    }
+    return metadataStore;
   }
 }
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
new file mode 100644
index 0000000..4a2670d
--- /dev/null
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.spi.impl;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_FLUSH;
+import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_OM_SNAPSHOT_DB;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_DB_DIR;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_CONNECTION_REQUEST_TIMEOUT;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_CONNECTION_REQUEST_TIMEOUT_DEFAULT;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_CONNECTION_TIMEOUT;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_CONNECTION_TIMEOUT_DEFAULT;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_FLUSH_PARAM;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INTERVAL;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INTERVAL_DEFAULT;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SOCKET_TIMEOUT;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SOCKET_TIMEOUT_DEFAULT;
+import static org.apache.hadoop.ozone.recon.ReconUtils.getReconDbDir;
+import static org.apache.hadoop.ozone.recon.ReconUtils.makeHttpCall;
+import static org.apache.hadoop.ozone.recon.ReconUtils.untarCheckpointFile;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
+import org.apache.hadoop.ozone.recon.spi.OzoneManagerServiceProvider;
+import org.apache.hadoop.utils.db.DBCheckpoint;
+import org.apache.hadoop.utils.db.RocksDBCheckpoint;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Implementation of the OzoneManager Service provider.
+ */
+@Singleton
+public class OzoneManagerServiceProviderImpl
+    implements OzoneManagerServiceProvider {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OzoneManagerServiceProviderImpl.class);
+
+  private ScheduledExecutorService executorService;
+  private final String dbCheckpointEndPoint = "/dbCheckpoint";
+  private final CloseableHttpClient httpClient;
+  private File omSnapshotDBParentDir = null;
+  private String omDBSnapshotUrl;
+
+  @Inject
+  private OzoneConfiguration configuration;
+
+  @Inject
+  private ReconOMMetadataManager omMetadataManager;
+
+  @Inject
+  public OzoneManagerServiceProviderImpl(OzoneConfiguration configuration) {
+    executorService = Executors.newSingleThreadScheduledExecutor();
+
+    String ozoneManagerHttpAddress = configuration.get(OMConfigKeys
+        .OZONE_OM_HTTP_ADDRESS_KEY);
+
+    String ozoneManagerHttpsAddress = configuration.get(OMConfigKeys
+        .OZONE_OM_HTTPS_ADDRESS_KEY);
+
+    omSnapshotDBParentDir = getReconDbDir(configuration,
+        OZONE_RECON_OM_SNAPSHOT_DB_DIR);
+
+    boolean ozoneSecurityEnabled = configuration.getBoolean(
+        OZONE_SECURITY_ENABLED_KEY, false);
+
+    int socketTimeout = (int) configuration.getTimeDuration(
+        RECON_OM_SOCKET_TIMEOUT, RECON_OM_SOCKET_TIMEOUT_DEFAULT,
+            TimeUnit.MILLISECONDS);
+    int connectionTimeout = (int) configuration.getTimeDuration(
+        RECON_OM_CONNECTION_TIMEOUT,
+        RECON_OM_CONNECTION_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
+    int connectionRequestTimeout = (int)configuration.getTimeDuration(
+        RECON_OM_CONNECTION_REQUEST_TIMEOUT,
+        RECON_OM_CONNECTION_REQUEST_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
+
+    RequestConfig config = RequestConfig.custom()
+        .setConnectTimeout(socketTimeout)
+        .setConnectionRequestTimeout(connectionTimeout)
+        .setSocketTimeout(connectionRequestTimeout).build();
+
+    httpClient = HttpClientBuilder
+        .create()
+        .setDefaultRequestConfig(config)
+        .build();
+
+    omDBSnapshotUrl = "http://" + ozoneManagerHttpAddress +
+        dbCheckpointEndPoint;
+
+    if (ozoneSecurityEnabled) {
+      omDBSnapshotUrl = "https://" + ozoneManagerHttpsAddress +
+          dbCheckpointEndPoint;
+    }
+
+    boolean flushParam = configuration.getBoolean(
+        RECON_OM_SNAPSHOT_TASK_FLUSH_PARAM, false);
+
+    if (flushParam) {
+      omDBSnapshotUrl += "?" + OZONE_DB_CHECKPOINT_REQUEST_FLUSH + "=true";
+    }
+
+  }
+
+  @Override
+  public void start() throws IOException {
+
+    //Schedule a task to periodically obtain the DB snapshot from OM and
+    //update the in house OM metadata managed DB instance.
+    long initialDelay = configuration.getTimeDuration(
+        RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY,
+        RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    long interval = configuration.getTimeDuration(
+        RECON_OM_SNAPSHOT_TASK_INTERVAL,
+        RECON_OM_SNAPSHOT_TASK_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
+
+    LOG.info("Starting thread to get OM DB Snapshot.");
+    executorService.scheduleAtFixedRate(() -> {
+      DBCheckpoint dbSnapshot = getOzoneManagerDBSnapshot();
+      if (dbSnapshot != null && dbSnapshot.getCheckpointLocation() != null) {
+        try {
+          omMetadataManager.updateOmDB(dbSnapshot.getCheckpointLocation()
+              .toFile());
+        } catch (IOException e) {
+          LOG.error("Unable to refresh Recon OM DB Snapshot. ", e);
+        }
+      } else {
+        LOG.error("Null snapshot got from OM, {}",
+            dbSnapshot.getCheckpointLocation());
+      }
+    }, initialDelay, interval, TimeUnit.MILLISECONDS);
+  }
+
+  @Override
+  public OMMetadataManager getOMMetadataManagerInstance() {
+    return omMetadataManager;
+  }
+
+  /**
+   * Method to obtain current OM DB Snapshot.
+   * @return DBCheckpoint instance.
+   */
+  @VisibleForTesting
+  protected DBCheckpoint getOzoneManagerDBSnapshot() {
+    String snapshotFileName = RECON_OM_SNAPSHOT_DB + "_" + System
+        .currentTimeMillis();
+    File targetFile = new File(omSnapshotDBParentDir, snapshotFileName +
+        ".tar.gz");
+    try {
+      try (InputStream inputStream = makeHttpCall(httpClient,
+          omDBSnapshotUrl)) {
+        FileUtils.copyInputStreamToFile(inputStream, targetFile);
+      }
+
+      //Untar the checkpoint file.
+      Path untarredDbDir = Paths.get(omSnapshotDBParentDir.getAbsolutePath(),
+          snapshotFileName);
+      untarCheckpointFile(targetFile, untarredDbDir);
+      FileUtils.deleteQuietly(targetFile);
+
+      //TODO Create Checkpoint based on OM DB type.
+      // Currently, OM DB type is not configurable. Hence, defaulting to
+      // RocksDB.
+      return new RocksDBCheckpoint(untarredDbDir);
+    } catch (IOException e) {
+      LOG.error("Unable to obtain Ozone Manager DB Snapshot. ", e);
+    }
+    return null;
+  }
+}
+
diff --git a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconUtils.java b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconUtils.java
new file mode 100644
index 0000000..170e109
--- /dev/null
+++ b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconUtils.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.nio.file.Paths;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.StatusLine;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Test Recon Utility methods.
+ */
+public class TestReconUtils {
+
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+
+  @Test
+  public void testGetReconDbDir() throws Exception {
+
+    String filePath = folder.getRoot().getAbsolutePath();
+    OzoneConfiguration configuration = new OzoneConfiguration();
+    configuration.set("TEST_DB_DIR", filePath);
+
+    File file = ReconUtils.getReconDbDir(configuration,
+        "TEST_DB_DIR");
+    Assert.assertEquals(file.getAbsolutePath(), filePath);
+  }
+
+  @Test
+  public void testUntarCheckpointFile() throws Exception {
+
+    File newDir = folder.newFolder();
+
+    File file1 = Paths.get(newDir.getAbsolutePath(), "file1")
+        .toFile();
+    String str = "File1 Contents";
+    BufferedWriter writer = new BufferedWriter(new FileWriter(
+        file1.getAbsolutePath()));
+    writer.write(str);
+    writer.close();
+
+    File file2 = Paths.get(newDir.getAbsolutePath(), "file2")
+        .toFile();
+    str = "File2 Contents";
+    writer = new BufferedWriter(new FileWriter(file2.getAbsolutePath()));
+    writer.write(str);
+    writer.close();
+
+    //Create test tar file.
+    File tarFile = OmUtils.createTarFile(newDir.toPath());
+    File outputDir = folder.newFolder();
+    ReconUtils.untarCheckpointFile(tarFile, outputDir.toPath());
+
+    assertTrue(outputDir.isDirectory());
+    assertTrue(outputDir.listFiles().length == 2);
+  }
+
+  @Test
+  public void testMakeHttpCall() throws Exception {
+
+    CloseableHttpClient httpClientMock = mock(CloseableHttpClient.class);
+    String url = "http://localhost:9874/dbCheckpoint";
+
+    CloseableHttpResponse httpResponseMock = mock(CloseableHttpResponse.class);
+    when(httpClientMock.execute(any(HttpGet.class)))
+        .thenReturn(httpResponseMock);
+
+    StatusLine statusLineMock = mock(StatusLine.class);
+    when(statusLineMock.getStatusCode()).thenReturn(200);
+    when(httpResponseMock.getStatusLine()).thenReturn(statusLineMock);
+
+    HttpEntity httpEntityMock = mock(HttpEntity.class);
+    when(httpResponseMock.getEntity()).thenReturn(httpEntityMock);
+    File file1 = Paths.get(folder.getRoot().getPath(), "file1")
+        .toFile();
+    BufferedWriter writer = new BufferedWriter(new FileWriter(
+        file1.getAbsolutePath()));
+    writer.write("File 1 Contents");
+    writer.close();
+    InputStream fileInputStream = new FileInputStream(file1);
+
+    when(httpEntityMock.getContent()).thenReturn(new InputStream() {
+      @Override
+      public int read() throws IOException {
+        return fileInputStream.read();
+      }
+    });
+
+    InputStream inputStream = ReconUtils.makeHttpCall(httpClientMock, url);
+    String contents = IOUtils.toString(inputStream, Charset.defaultCharset());
+
+    assertEquals(contents, "File 1 Contents");
+  }
+
+}
\ No newline at end of file
diff --git a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/recovery/TestReconOmMetadataManagerImpl.java b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/recovery/TestReconOmMetadataManagerImpl.java
new file mode 100644
index 0000000..ba2dd0b
--- /dev/null
+++ b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/recovery/TestReconOmMetadataManagerImpl.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.recovery;
+
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DB_DIRS;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_DB_DIR;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.om.BucketManager;
+import org.apache.hadoop.ozone.om.BucketManagerImpl;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.utils.db.DBCheckpoint;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Test Recon OM Metadata Manager implementation.
+ */
+public class TestReconOmMetadataManagerImpl {
+
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  @Test
+  public void testUpdateOmDB() throws Exception {
+
+    //Create a new OM Metadata Manager instance + DB.
+    File omDbDir = temporaryFolder.newFolder();
+    OzoneConfiguration omConfiguration = new OzoneConfiguration();
+    omConfiguration.set(OZONE_OM_DB_DIRS,
+        omDbDir.getAbsolutePath());
+    OMMetadataManager omMetadataManager = new OmMetadataManagerImpl(
+        omConfiguration);
+
+    //Create a volume + bucket + 2 keys.
+    String volumeKey = omMetadataManager.getVolumeKey("sampleVol");
+    OmVolumeArgs args =
+        OmVolumeArgs.newBuilder()
+            .setVolume("sampleVol")
+            .setAdminName("TestUser")
+            .setOwnerName("TestUser")
+            .build();
+    omMetadataManager.getVolumeTable().put(volumeKey, args);
+
+    BucketManager bucketManager = new BucketManagerImpl(omMetadataManager);
+    OmBucketInfo bucketInfo = OmBucketInfo.newBuilder()
+        .setVolumeName("sampleVol")
+        .setBucketName("bucketOne")
+        .build();
+    bucketManager.createBucket(bucketInfo);
+
+    omMetadataManager.getKeyTable().put("/sampleVol/bucketOne/key_one",
+        new OmKeyInfo.Builder()
+            .setBucketName("bucketOne")
+            .setVolumeName("sampleVol")
+            .setKeyName("key_one")
+            .setReplicationFactor(HddsProtos.ReplicationFactor.ONE)
+            .setReplicationType(HddsProtos.ReplicationType.STAND_ALONE)
+            .build());
+    omMetadataManager.getKeyTable().put("/sampleVol/bucketOne/key_two",
+        new OmKeyInfo.Builder()
+            .setBucketName("bucketOne")
+            .setVolumeName("sampleVol")
+            .setKeyName("key_two")
+            .setReplicationFactor(HddsProtos.ReplicationFactor.ONE)
+            .setReplicationType(HddsProtos.ReplicationType.STAND_ALONE)
+            .build());
+
+    //Make sure OM Metadata reflects the keys that were inserted.
+    Assert.assertNotNull(omMetadataManager.getKeyTable()
+        .get("/sampleVol/bucketOne/key_one"));
+    Assert.assertNotNull(omMetadataManager.getKeyTable()
+        .get("/sampleVol/bucketOne/key_two"));
+
+    //Take checkpoint of OM DB.
+    DBCheckpoint checkpoint = omMetadataManager.getStore()
+        .getCheckpoint(true);
+    Assert.assertNotNull(checkpoint.getCheckpointLocation());
+
+    //Create new Recon OM Metadata manager instance.
+    File reconOmDbDir = temporaryFolder.newFolder();
+    OzoneConfiguration configuration = new OzoneConfiguration();
+    configuration.set(OZONE_RECON_OM_SNAPSHOT_DB_DIR, reconOmDbDir
+        .getAbsolutePath());
+    ReconOMMetadataManager reconOMMetadataManager =
+        new ReconOmMetadataManagerImpl(configuration);
+    reconOMMetadataManager.start(configuration);
+
+    //Before accepting a snapshot, the metadata should have null tables.
+    Assert.assertNull(reconOMMetadataManager.getBucketTable());
+
+    //Update Recon OM DB with the OM DB checkpoint location.
+    reconOMMetadataManager.updateOmDB(
+        checkpoint.getCheckpointLocation().toFile());
+
+    //Now, the tables should have been initialized.
+    Assert.assertNotNull(reconOMMetadataManager.getBucketTable());
+
+    //Verify Keys inserted in OM DB are available in Recon OM DB.
+    Assert.assertNotNull(reconOMMetadataManager.getKeyTable()
+        .get("/sampleVol/bucketOne/key_one"));
+    Assert.assertNotNull(reconOMMetadataManager.getKeyTable()
+        .get("/sampleVol/bucketOne/key_two"));
+
+    //Verify that we cannot write data to Recon OM DB (Read Only)
+    try {
+      reconOMMetadataManager.getKeyTable().put(
+          "/sampleVol/bucketOne/fail_key", new OmKeyInfo.Builder()
+              .setBucketName("bucketOne")
+              .setVolumeName("sampleVol")
+              .setKeyName("fail_key")
+              .setReplicationFactor(HddsProtos.ReplicationFactor.ONE)
+              .setReplicationType(HddsProtos.ReplicationType.STAND_ALONE)
+              .build());
+      Assert.fail();
+    } catch (IOException e) {
+      Assert.assertTrue(e.getMessage()
+          .contains("Not supported operation in read only mode"));
+    }
+  }
+
+}
\ No newline at end of file
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/OzoneManagerServiceProvider.java b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/recovery/package-info.java
similarity index 86%
copy from hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/OzoneManagerServiceProvider.java
copy to hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/recovery/package-info.java
index 7dae610..c3b0b34 100644
--- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/OzoneManagerServiceProvider.java
+++ b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/recovery/package-info.java
@@ -1,5 +1,3 @@
-package org.apache.hadoop.ozone.recon.spi;
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -17,9 +15,7 @@ package org.apache.hadoop.ozone.recon.spi;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 /**
- * Interface to access OM endpoints.
+ * Package for recon server - OM service specific tests.
  */
-public interface OzoneManagerServiceProvider {
-}
+package org.apache.hadoop.ozone.recon.recovery;
\ No newline at end of file
diff --git a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestContainerDBServiceProviderImpl.java b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestContainerDBServiceProviderImpl.java
index 2fc0642..9e5aa70 100644
--- a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestContainerDBServiceProviderImpl.java
+++ b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestContainerDBServiceProviderImpl.java
@@ -36,8 +36,6 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.mockito.runners.MockitoJUnitRunner;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
@@ -46,7 +44,6 @@ import com.google.inject.Injector;
 /**
  * Unit Tests for ContainerDBServiceProviderImpl.
  */
-@RunWith(MockitoJUnitRunner.class)
 public class TestContainerDBServiceProviderImpl {
 
   @Rule
diff --git a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java
new file mode 100644
index 0000000..e91f67c
--- /dev/null
+++ b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.spi.impl;
+
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DB_DIRS;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_DB_DIR;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INTERVAL;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Paths;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.om.BucketManager;
+import org.apache.hadoop.ozone.om.BucketManagerImpl;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.recon.ReconUtils;
+import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
+import org.apache.hadoop.ozone.recon.recovery.ReconOmMetadataManagerImpl;
+import org.apache.hadoop.ozone.recon.spi.OzoneManagerServiceProvider;
+import org.apache.hadoop.utils.db.DBCheckpoint;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+/**
+ * Class to test Ozone Manager Service Provider Implementation.
+ */
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore({"javax.management.*", "javax.net.ssl.*"})
+@PrepareForTest(ReconUtils.class)
+public class TestOzoneManagerServiceProviderImpl {
+
+  private OMMetadataManager omMetadataManager;
+  private ReconOMMetadataManager reconOMMetadataManager;
+  private Injector injector;
+  private OzoneManagerServiceProviderImpl ozoneManagerServiceProvider;
+
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  @Before
+  public void setUp() throws Exception {
+    initializeNewOmMetadataManager();
+    injector = Guice.createInjector(new AbstractModule() {
+      @Override
+      protected void configure() {
+        try {
+          initializeNewOmMetadataManager();
+          bind(OzoneConfiguration.class).toInstance(
+              getTestOzoneConfiguration());
+          reconOMMetadataManager = getTestMetadataManager();
+          bind(ReconOMMetadataManager.class).toInstance(reconOMMetadataManager);
+          ozoneManagerServiceProvider = new OzoneManagerServiceProviderImpl(
+              getTestOzoneConfiguration());
+          bind(OzoneManagerServiceProvider.class)
+              .toInstance(ozoneManagerServiceProvider);
+        } catch (IOException e) {
+          Assert.fail();
+        }
+      }
+    });
+
+  }
+
+  @Test(timeout = 60000)
+  public void testStart() throws Exception {
+
+    Assert.assertNotNull(reconOMMetadataManager.getKeyTable()
+        .get("/sampleVol/bucketOne/key_one"));
+    Assert.assertNull(reconOMMetadataManager.getKeyTable()
+        .get("/sampleVol/bucketOne/key_two"));
+
+    writeDataToOm();
+    DBCheckpoint checkpoint = omMetadataManager.getStore()
+        .getCheckpoint(true);
+    File tarFile = OmUtils.createTarFile(checkpoint.getCheckpointLocation());
+    InputStream inputStream = new FileInputStream(tarFile);
+    PowerMockito.stub(PowerMockito.method(ReconUtils.class,
+        "makeHttpCall",
+        CloseableHttpClient.class, String.class))
+        .toReturn(inputStream);
+
+    ozoneManagerServiceProvider.start();
+    Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+
+    Assert.assertNotNull(reconOMMetadataManager.getKeyTable()
+        .get("/sampleVol/bucketOne/key_one"));
+    Assert.assertNotNull(reconOMMetadataManager.getKeyTable()
+        .get("/sampleVol/bucketOne/key_two"));
+  }
+
+  @Test
+  public void testGetOMMetadataManagerInstance() throws Exception {
+    OMMetadataManager omMetaMgr = ozoneManagerServiceProvider
+        .getOMMetadataManagerInstance();
+    assertNotNull(omMetaMgr);
+  }
+
+  @Test
+  public void testGetOzoneManagerDBSnapshot() throws Exception {
+
+    File reconOmSnapshotDbDir = temporaryFolder.newFolder();
+
+    File checkpointDir = Paths.get(reconOmSnapshotDbDir.getAbsolutePath(),
+        "testGetOzoneManagerDBSnapshot").toFile();
+    checkpointDir.mkdir();
+
+    File file1 = Paths.get(checkpointDir.getAbsolutePath(), "file1")
+        .toFile();
+    String str = "File1 Contents";
+    BufferedWriter writer = new BufferedWriter(new FileWriter(
+        file1.getAbsolutePath()));
+    writer.write(str);
+    writer.close();
+
+    File file2 = Paths.get(checkpointDir.getAbsolutePath(), "file2")
+        .toFile();
+    str = "File2 Contents";
+    writer = new BufferedWriter(new FileWriter(file2.getAbsolutePath()));
+    writer.write(str);
+    writer.close();
+
+    //Create test tar file.
+    File tarFile = OmUtils.createTarFile(checkpointDir.toPath());
+
+    InputStream fileInputStream = new FileInputStream(tarFile);
+    PowerMockito.stub(PowerMockito.method(ReconUtils.class,
+        "makeHttpCall",
+        CloseableHttpClient.class, String.class))
+        .toReturn(fileInputStream);
+
+    DBCheckpoint checkpoint = ozoneManagerServiceProvider
+        .getOzoneManagerDBSnapshot();
+    assertNotNull(checkpoint);
+    assertTrue(checkpoint.getCheckpointLocation().toFile().isDirectory());
+    assertTrue(checkpoint.getCheckpointLocation().toFile()
+        .listFiles().length == 2);
+  }
+
+  /**
+   * Get Test OzoneConfiguration instance.
+   * @return OzoneConfiguration
+   * @throws IOException ioEx.
+   */
+  private OzoneConfiguration getTestOzoneConfiguration() throws IOException {
+    OzoneConfiguration configuration = new OzoneConfiguration();
+    configuration.set(RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY,
+        "0m");
+    configuration.set(RECON_OM_SNAPSHOT_TASK_INTERVAL, "1m");
+    configuration.set(OZONE_RECON_OM_SNAPSHOT_DB_DIR,
+        temporaryFolder.newFolder().getAbsolutePath());
+    return configuration;
+  }
+
+  /**
+   * Create a new OM Metadata manager instance.
+   * @throws IOException ioEx
+   */
+  private void initializeNewOmMetadataManager() throws IOException {
+    File omDbDir = temporaryFolder.newFolder();
+    OzoneConfiguration omConfiguration = new OzoneConfiguration();
+    omConfiguration.set(OZONE_OM_DB_DIRS,
+        omDbDir.getAbsolutePath());
+    omMetadataManager = new OmMetadataManagerImpl(omConfiguration);
+
+    String volumeKey = omMetadataManager.getVolumeKey("sampleVol");
+    OmVolumeArgs args =
+        OmVolumeArgs.newBuilder()
+            .setVolume("sampleVol")
+            .setAdminName("TestUser")
+            .setOwnerName("TestUser")
+            .build();
+    omMetadataManager.getVolumeTable().put(volumeKey, args);
+
+    BucketManager bucketManager = new BucketManagerImpl(omMetadataManager);
+    OmBucketInfo bucketInfo = OmBucketInfo.newBuilder()
+        .setVolumeName("sampleVol")
+        .setBucketName("bucketOne")
+        .build();
+    bucketManager.createBucket(bucketInfo);
+
+    omMetadataManager.getKeyTable().put("/sampleVol/bucketOne/key_one",
+        new OmKeyInfo.Builder()
+            .setBucketName("bucketOne")
+            .setVolumeName("sampleVol")
+            .setKeyName("key_one")
+            .setReplicationFactor(HddsProtos.ReplicationFactor.ONE)
+            .setReplicationType(HddsProtos.ReplicationType.STAND_ALONE)
+            .build());
+  }
+
+  /**
+   * Get an instance of Recon OM Metadata manager.
+   * @return ReconOMMetadataManager
+   * @throws IOException when creating the RocksDB instance.
+   */
+  private ReconOMMetadataManager getTestMetadataManager() throws IOException {
+
+    DBCheckpoint checkpoint = omMetadataManager.getStore()
+        .getCheckpoint(true);
+    assertNotNull(checkpoint.getCheckpointLocation());
+
+    File reconOmDbDir = temporaryFolder.newFolder();
+    OzoneConfiguration configuration = new OzoneConfiguration();
+    configuration.set(OZONE_RECON_OM_SNAPSHOT_DB_DIR, reconOmDbDir
+        .getAbsolutePath());
+
+    ReconOMMetadataManager reconOMMetaMgr =
+        new ReconOmMetadataManagerImpl(configuration);
+    reconOMMetaMgr.start(configuration);
+
+    reconOMMetaMgr.updateOmDB(
+        checkpoint.getCheckpointLocation().toFile());
+    return reconOMMetaMgr;
+  }
+
+  /**
+   * Write a key to OM instance.
+   * @throws IOException while writing.
+   */
+  private void writeDataToOm() throws IOException {
+    omMetadataManager.getKeyTable().put("/sampleVol/bucketOne/key_two",
+        new OmKeyInfo.Builder()
+            .setBucketName("bucketOne")
+            .setVolumeName("sampleVol")
+            .setKeyName("key_two")
+            .setReplicationFactor(HddsProtos.ReplicationFactor.ONE)
+            .setReplicationType(HddsProtos.ReplicationType.STAND_ALONE)
+            .build());
+  }
+
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org