You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2019/08/19 17:47:58 UTC

[hadoop] branch trunk updated: HDDS-1105 : Add mechanism in Recon to obtain DB snapshot 'delta' updates from Ozone Manager (#1259)

This is an automated email from the ASF dual-hosted git repository.

hanishakoneru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d69a1a0  HDDS-1105 : Add mechanism in Recon to obtain DB snapshot 'delta' updates from Ozone Manager (#1259)
d69a1a0 is described below

commit d69a1a0aa49614c084fa4b9546aceeeee65aebe4
Author: avijayanhwx <14...@users.noreply.github.com>
AuthorDate: Mon Aug 19 10:47:49 2019 -0700

    HDDS-1105 : Add mechanism in Recon to obtain DB snapshot 'delta' updates from Ozone Manager (#1259)
---
 .../apache/hadoop/utils/db/DBUpdatesWrapper.java   |   4 +
 .../apache/hadoop/utils/db/RDBBatchOperation.java  |   4 +
 .../ozone/om/protocol/OzoneManagerProtocol.java    |  13 +
 ...OzoneManagerProtocolClientSideTranslatorPB.java |  28 ++-
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |   1 +
 .../protocolPB/OzoneManagerRequestHandler.java     |   5 +-
 hadoop-ozone/ozone-recon/pom.xml                   |  24 --
 .../hadoop/ozone/recon/ReconControllerModule.java  |  36 ++-
 .../org/apache/hadoop/ozone/recon/ReconServer.java |  77 ++----
 .../hadoop/ozone/recon/ReconServerConfigKeys.java  |   2 +-
 ...ceProvider.java => ReconTaskBindingModule.java} |  37 ++-
 .../org/apache/hadoop/ozone/recon/ReconUtils.java  |  10 +-
 .../recon/spi/OzoneManagerServiceProvider.java     |  10 +-
 .../spi/impl/ContainerDBServiceProviderImpl.java   |   7 +-
 .../spi/impl/OzoneManagerServiceProviderImpl.java  | 216 +++++++++++++---
 .../recon/spi/impl/ReconContainerDBProvider.java   |  13 +-
 .../ozone/recon/tasks/ContainerKeyMapperTask.java  |  34 +--
 .../ozone/recon/tasks/FileSizeCountTask.java       |  36 ++-
 .../hadoop/ozone/recon/tasks/OMDBUpdateEvent.java  |  43 +---
 .../ozone/recon/tasks/OMDBUpdatesHandler.java      |   9 +-
 .../ozone/recon/tasks/OMUpdateEventBatch.java      |  16 +-
 .../ozone/recon/tasks/ReconDBUpdateTask.java       |  20 +-
 .../ozone/recon/tasks/ReconTaskController.java     |  25 +-
 .../ozone/recon/tasks/ReconTaskControllerImpl.java | 137 +++++++----
 .../ozone/recon/AbstractOMMetadataManagerTest.java |  15 +-
 .../apache/hadoop/ozone/recon/TestReconUtils.java  |   7 +-
 .../ozone/recon/api/TestContainerKeyService.java   |  73 ++----
 .../ozone/recon/api/TestUtilizationService.java    |  15 +-
 .../impl/TestOzoneManagerServiceProviderImpl.java  | 274 ++++++++++++++++-----
 .../hadoop/ozone/recon/tasks/DummyReconDBTask.java |  18 +-
 .../recon/tasks/TestContainerKeyMapperTask.java    |  48 ++--
 .../ozone/recon/tasks/TestFileSizeCountTask.java   |  16 +-
 .../recon/tasks/TestReconTaskControllerImpl.java   |  94 ++++---
 33 files changed, 865 insertions(+), 502 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBUpdatesWrapper.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBUpdatesWrapper.java
index 54ebc7c..c4dca6a 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBUpdatesWrapper.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBUpdatesWrapper.java
@@ -41,6 +41,10 @@ public class DBUpdatesWrapper {
     return dataList;
   }
 
+  public void setCurrentSequenceNumber(long sequenceNumber) {
+    this.currentSequenceNumber = sequenceNumber;
+  }
+
   public long getCurrentSequenceNumber() {
     return currentSequenceNumber;
   }
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBBatchOperation.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBBatchOperation.java
index a8b78ed..19699f5 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBBatchOperation.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBBatchOperation.java
@@ -37,6 +37,10 @@ public class RDBBatchOperation implements BatchOperation {
     writeBatch = new WriteBatch();
   }
 
+  public RDBBatchOperation(WriteBatch writeBatch) {
+    this.writeBatch = writeBatch;
+  }
+
   public void commit(RocksDB db, WriteOptions writeOptions) throws IOException {
     try {
       db.write(writeOptions, writeBatch);
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
index 3d4dd93..f6a4e64 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
 import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
 import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
 
 import java.io.Closeable;
@@ -49,6 +50,8 @@ import org.apache.hadoop.ozone.security.OzoneDelegationTokenSelector;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.security.KerberosInfo;
 import org.apache.hadoop.security.token.TokenInfo;
+import org.apache.hadoop.utils.db.DBUpdatesWrapper;
+import org.apache.hadoop.utils.db.SequenceNumberNotFoundException;
 
 /**
  * Protocol to talk to OM.
@@ -505,4 +508,14 @@ public interface OzoneManagerProtocol
    * */
   List<OzoneAcl> getAcl(OzoneObj obj) throws IOException;
 
+  /**
+   * Get DB updates since a specific sequence number.
+   * @param dbUpdatesRequest request that encapsulates a sequence number.
+   * @return Wrapper containing the updates.
+   * @throws SequenceNumberNotFoundException if db is unable to read the data.
+   */
+  DBUpdatesWrapper getDBUpdates(
+      OzoneManagerProtocolProtos.DBUpdatesRequest dbUpdatesRequest)
+      throws IOException;
+
 }
\ No newline at end of file
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index f761548..83c5316 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -66,7 +66,9 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateF
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateFileResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListStatusRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListStatusResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetAclResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DBUpdatesRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DBUpdatesResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AddAclRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateDirectoryRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetFileStatusResponse;
@@ -138,11 +140,14 @@ import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequ
 import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto;
 import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.utils.db.DBUpdatesWrapper;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
+import com.google.protobuf.ByteString;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -1396,7 +1401,7 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
     OMRequest omRequest = createOMRequest(Type.SetAcl)
         .setSetAclRequest(builder.build())
         .build();
-    OzoneManagerProtocolProtos.SetAclResponse response =
+    SetAclResponse response =
         handleError(submitRequest(omRequest)).getSetAclResponse();
 
     return response.getResponse();
@@ -1426,6 +1431,25 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
   }
 
   @Override
+  public DBUpdatesWrapper getDBUpdates(DBUpdatesRequest dbUpdatesRequest)
+      throws IOException {
+    OMRequest omRequest = createOMRequest(Type.DBUpdates)
+        .setDbUpdatesRequest(dbUpdatesRequest)
+        .build();
+
+    DBUpdatesResponse dbUpdatesResponse =
+        handleError(submitRequest(omRequest)).getDbUpdatesResponse();
+
+    DBUpdatesWrapper dbUpdatesWrapper = new DBUpdatesWrapper();
+    for (ByteString byteString : dbUpdatesResponse.getDataList()) {
+      dbUpdatesWrapper.addWriteBatch(byteString.toByteArray(), 0L);
+    }
+    dbUpdatesWrapper.setCurrentSequenceNumber(
+        dbUpdatesResponse.getSequenceNumber());
+    return dbUpdatesWrapper;
+  }
+
+  @Override
   public OpenKeySession createFile(OmKeyArgs args,
       boolean overWrite, boolean recursive) throws IOException {
     KeyArgs keyArgs = KeyArgs.newBuilder()
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index a81b0de..dbd5d39 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -3422,6 +3422,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
    * @return Wrapper containing the updates.
    * @throws SequenceNumberNotFoundException if db is unable to read the data.
    */
+  @Override
   public DBUpdatesWrapper getDBUpdates(
       DBUpdatesRequest dbUpdatesRequest)
       throws SequenceNumberNotFoundException {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
index 7c9f126..c2d6227 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
@@ -394,9 +394,10 @@ public class OzoneManagerRequestHandler implements RequestHandler {
     DBUpdatesWrapper dbUpdatesWrapper =
         impl.getDBUpdates(dbUpdatesRequest);
     for (int i = 0; i < dbUpdatesWrapper.getData().size(); i++) {
-      builder.setData(i,
-          OMPBHelper.getByteString(dbUpdatesWrapper.getData().get(i)));
+      builder.addData(OMPBHelper.getByteString(
+          dbUpdatesWrapper.getData().get(i)));
     }
+    builder.setSequenceNumber(dbUpdatesWrapper.getCurrentSequenceNumber());
     return builder.build();
   }
 
diff --git a/hadoop-ozone/ozone-recon/pom.xml b/hadoop-ozone/ozone-recon/pom.xml
index 9672796..130ad35 100644
--- a/hadoop-ozone/ozone-recon/pom.xml
+++ b/hadoop-ozone/ozone-recon/pom.xml
@@ -273,30 +273,6 @@
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>org.powermock</groupId>
-      <artifactId>powermock-module-junit4</artifactId>
-      <version>1.7.4</version>
-      <scope>test</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>org.javassist</groupId>
-          <artifactId>javassist</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>org.powermock</groupId>
-      <artifactId>powermock-api-mockito2</artifactId>
-      <version>1.7.4</version>
-      <scope>test</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>org.mockito</groupId>
-          <artifactId>mockito-core</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
       <groupId>org.jooq</groupId>
       <artifactId>jooq</artifactId>
       <version>${jooq.version}</version>
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
index e7c20f0..21bc5be 100644
--- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
@@ -29,8 +29,13 @@ import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SQ
 import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SQL_MAX_IDLE_CONNECTION_AGE;
 import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SQL_MAX_IDLE_CONNECTION_TEST_STMT;
 
+import java.io.IOException;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
 import org.apache.hadoop.ozone.recon.persistence.DataSourceConfiguration;
 import org.apache.hadoop.ozone.recon.persistence.JooqPersistenceModule;
 import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
@@ -40,9 +45,15 @@ import org.apache.hadoop.ozone.recon.spi.OzoneManagerServiceProvider;
 import org.apache.hadoop.ozone.recon.spi.impl.ReconContainerDBProvider;
 import org.apache.hadoop.ozone.recon.spi.impl.ContainerDBServiceProviderImpl;
 import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl;
+import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTask;
+import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTask;
 import org.apache.hadoop.ozone.recon.tasks.ReconTaskController;
 import org.apache.hadoop.ozone.recon.tasks.ReconTaskControllerImpl;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.utils.db.DBStore;
+import org.apache.ratis.protocol.ClientId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Provides;
@@ -52,6 +63,9 @@ import com.google.inject.Singleton;
  * Guice controller that defines concrete bindings.
  */
 public class ReconControllerModule extends AbstractModule {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ReconControllerModule.class);
+
   @Override
   protected void configure() {
     bind(Configuration.class).toProvider(ConfigurationProvider.class);
@@ -60,17 +74,37 @@ public class ReconControllerModule extends AbstractModule {
         .toProvider(ReconContainerDBProvider.class).in(Singleton.class);
     bind(ReconOMMetadataManager.class)
         .to(ReconOmMetadataManagerImpl.class).in(Singleton.class);
+    bind(OMMetadataManager.class).to(ReconOmMetadataManagerImpl.class)
+        .in(Singleton.class);
     bind(ContainerDBServiceProvider.class)
         .to(ContainerDBServiceProviderImpl.class).in(Singleton.class);
     bind(OzoneManagerServiceProvider.class)
         .to(OzoneManagerServiceProviderImpl.class).in(Singleton.class);
-
+    bind(ReconUtils.class).in(Singleton.class);
     // Persistence - inject configuration provider
     install(new JooqPersistenceModule(
         getProvider(DataSourceConfiguration.class)));
 
     bind(ReconTaskController.class)
         .to(ReconTaskControllerImpl.class).in(Singleton.class);
+    bind(ContainerKeyMapperTask.class);
+    bind(FileSizeCountTask.class);
+  }
+
+  @Provides
+  OzoneManagerProtocol getOzoneManagerProtocol(
+      final OzoneConfiguration ozoneConfiguration) {
+    OzoneManagerProtocol ozoneManagerClient = null;
+    try {
+      ClientId clientId = ClientId.randomId();
+      UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+      ozoneManagerClient = new
+          OzoneManagerProtocolClientSideTranslatorPB(
+          ozoneConfiguration, clientId.toString(), ugi);
+    } catch (IOException ioEx) {
+      LOG.error("Error in provisioning OzoneManagerProtocol ", ioEx);
+    }
+    return ozoneManagerClient;
   }
 
   @Provides
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServer.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServer.java
index a11cb5f..1aaf887 100644
--- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServer.java
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServer.java
@@ -18,26 +18,15 @@
 
 package org.apache.hadoop.ozone.recon;
 
-import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY;
-import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT;
-import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INTERVAL;
-import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INTERVAL_DEFAULT;
-
-import java.io.IOException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hdds.cli.GenericCli;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.ozone.recon.spi.ContainerDBServiceProvider;
 import org.apache.hadoop.ozone.recon.spi.OzoneManagerServiceProvider;
-import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTask;
-import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTask;
 import org.hadoop.ozone.recon.schema.ReconInternalSchemaDefinition;
 import org.hadoop.ozone.recon.schema.StatsSchemaDefinition;
 import org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition;
-import org.jooq.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,13 +58,15 @@ public class ReconServer extends GenericCli {
     ConfigurationProvider.setConfiguration(ozoneConfiguration);
 
     injector =  Guice.createInjector(new
-        ReconControllerModule(), new ReconRestServletModule() {
+        ReconControllerModule(),
+        new ReconRestServletModule() {
           @Override
           protected void configureServlets() {
             rest("/api/*")
               .packages("org.apache.hadoop.ozone.recon.api");
           }
-        });
+        },
+        new ReconTaskBindingModule());
 
     //Pass on injector to listener that does the Guice - Jersey HK2 bridging.
     ReconGuiceServletContextListener.setInjector(injector);
@@ -95,14 +86,20 @@ public class ReconServer extends GenericCli {
       reconInternalSchemaDefinition.initializeSchema();
 
       LOG.info("Recon server initialized successfully!");
+
+      httpServer = injector.getInstance(ReconHttpServer.class);
+      LOG.info("Starting Recon server");
+      httpServer.start();
+
+      //Start Ozone Manager Service that pulls data from OM.
+      OzoneManagerServiceProvider ozoneManagerServiceProvider = injector
+          .getInstance(OzoneManagerServiceProvider.class);
+      ozoneManagerServiceProvider.start();
     } catch (Exception e) {
       LOG.error("Error during initializing Recon server.", e);
+      stop();
     }
 
-    httpServer = injector.getInstance(ReconHttpServer.class);
-    LOG.info("Starting Recon server");
-    httpServer.start();
-    scheduleReconTasks();
     Runtime.getRuntime().addShutdownHook(new Thread(() -> {
       try {
         stop();
@@ -113,52 +110,6 @@ public class ReconServer extends GenericCli {
     return null;
   }
 
-  /**
-   * Schedule the tasks that is required by Recon to keep its metadata up to
-   * date.
-   */
-  private void scheduleReconTasks() {
-    OzoneConfiguration configuration = injector.getInstance(
-        OzoneConfiguration.class);
-    ContainerDBServiceProvider containerDBServiceProvider = injector
-        .getInstance(ContainerDBServiceProvider.class);
-    OzoneManagerServiceProvider ozoneManagerServiceProvider = injector
-        .getInstance(OzoneManagerServiceProvider.class);
-    Configuration sqlConfiguration = injector.getInstance(Configuration.class);
-    long initialDelay = configuration.getTimeDuration(
-        RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY,
-        RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT,
-        TimeUnit.MILLISECONDS);
-    long interval = configuration.getTimeDuration(
-        RECON_OM_SNAPSHOT_TASK_INTERVAL,
-        RECON_OM_SNAPSHOT_TASK_INTERVAL_DEFAULT,
-        TimeUnit.MILLISECONDS);
-
-
-    scheduler.scheduleWithFixedDelay(() -> {
-      try {
-        ozoneManagerServiceProvider.updateReconOmDBWithNewSnapshot();
-        // Schedule the task to read OM DB and write the reverse mapping to
-        // Recon container DB.
-        ContainerKeyMapperTask containerKeyMapperTask =
-            new ContainerKeyMapperTask(containerDBServiceProvider,
-                ozoneManagerServiceProvider.getOMMetadataManagerInstance());
-        containerKeyMapperTask.reprocess(
-            ozoneManagerServiceProvider.getOMMetadataManagerInstance());
-        FileSizeCountTask fileSizeCountTask = new
-            FileSizeCountTask(
-                ozoneManagerServiceProvider.getOMMetadataManagerInstance(),
-            sqlConfiguration);
-        fileSizeCountTask.reprocess(
-            ozoneManagerServiceProvider.getOMMetadataManagerInstance());
-
-      } catch (IOException e) {
-        LOG.error("Unable to get OM " +
-            "Snapshot", e);
-      }
-    }, initialDelay, interval, TimeUnit.MILLISECONDS);
-  }
-
   void stop() throws Exception {
     LOG.info("Stopping Recon server");
     httpServer.stop();
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
index 0501093..034af4a 100644
--- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
@@ -114,7 +114,7 @@ public final class ReconServerConfigKeys {
 
   public static final String OZONE_RECON_TASK_THREAD_COUNT_KEY =
       "ozone.recon.task.thread.count";
-  public static final int OZONE_RECON_TASK_THREAD_COUNT_DEFAULT = 1;
+  public static final int OZONE_RECON_TASK_THREAD_COUNT_DEFAULT = 5;
 
   /**
    * Private constructor for utility class.
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/OzoneManagerServiceProvider.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconTaskBindingModule.java
similarity index 54%
copy from hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/OzoneManagerServiceProvider.java
copy to hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconTaskBindingModule.java
index 420f333..19cc0da 100644
--- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/OzoneManagerServiceProvider.java
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconTaskBindingModule.java
@@ -1,5 +1,3 @@
-package org.apache.hadoop.ozone.recon.spi;
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -18,28 +16,25 @@ package org.apache.hadoop.ozone.recon.spi;
  * limitations under the License.
  */
 
-import java.io.IOException;
+package org.apache.hadoop.ozone.recon;
+
+import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTask;
+import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTask;
+import org.apache.hadoop.ozone.recon.tasks.ReconDBUpdateTask;
 
-import org.apache.hadoop.ozone.om.OMMetadataManager;
+import com.google.inject.AbstractModule;
+import com.google.inject.multibindings.Multibinder;
 
 /**
- * Interface to access OM endpoints.
+ * Binds the various Recon Tasks.
  */
-public interface OzoneManagerServiceProvider {
-
-  /**
-   * Initialize Ozone Manager Service Provider Impl.
-   */
-  void init() throws IOException;
-
-  /**
-   * Update Recon OM DB with new snapshot from OM.
-   */
-  void updateReconOmDBWithNewSnapshot() throws IOException;
+public class ReconTaskBindingModule extends AbstractModule {
 
-  /**
-   * Return instance of OM Metadata manager.
-   * @return OM metadata manager instance.
-   */
-  OMMetadataManager getOMMetadataManagerInstance();
+  @Override
+  protected void configure() {
+    Multibinder<ReconDBUpdateTask> taskBinder =
+        Multibinder.newSetBinder(binder(), ReconDBUpdateTask.class);
+    taskBinder.addBinding().to(ContainerKeyMapperTask.class);
+    taskBinder.addBinding().to(FileSizeCountTask.class);
+  }
 }
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
index 324a369..95e6f9b 100644
--- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
@@ -51,11 +51,11 @@ import org.slf4j.LoggerFactory;
 /**
  * Recon Utility class.
  */
-public final class ReconUtils {
+public class ReconUtils {
 
   private final static int WRITE_BUFFER = 1048576; //1MB
 
-  private ReconUtils() {
+  public ReconUtils() {
   }
 
   private static final Logger LOG = LoggerFactory.getLogger(
@@ -69,7 +69,7 @@ public final class ReconUtils {
    * @param dirConfigKey key to check
    * @return Return File based on configured or fallback value.
    */
-  public static File getReconDbDir(Configuration conf, String dirConfigKey) {
+  public File getReconDbDir(Configuration conf, String dirConfigKey) {
 
     File metadataDir = getDirectoryFromConfig(conf, dirConfigKey,
         "Recon");
@@ -90,7 +90,7 @@ public final class ReconUtils {
    * @param destPath destination path to untar to.
    * @throws IOException ioException
    */
-  public static void untarCheckpointFile(File tarFile, Path destPath)
+  public void untarCheckpointFile(File tarFile, Path destPath)
       throws IOException {
 
     FileInputStream fileInputStream = null;
@@ -153,7 +153,7 @@ public final class ReconUtils {
    * @return Inputstream to the response of the HTTP call.
    * @throws IOException While reading the response.
    */
-  public static InputStream makeHttpCall(CloseableHttpClient httpClient,
+  public InputStream makeHttpCall(CloseableHttpClient httpClient,
                                          String url)
       throws IOException {
 
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/OzoneManagerServiceProvider.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/OzoneManagerServiceProvider.java
index 420f333..3f57af6 100644
--- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/OzoneManagerServiceProvider.java
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/OzoneManagerServiceProvider.java
@@ -18,8 +18,6 @@ package org.apache.hadoop.ozone.recon.spi;
  * limitations under the License.
  */
 
-import java.io.IOException;
-
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 
 /**
@@ -28,14 +26,14 @@ import org.apache.hadoop.ozone.om.OMMetadataManager;
 public interface OzoneManagerServiceProvider {
 
   /**
-   * Initialize Ozone Manager Service Provider Impl.
+   * Start a task to sync data from OM.
    */
-  void init() throws IOException;
+  void start();
 
   /**
-   * Update Recon OM DB with new snapshot from OM.
+   * Stop the OM sync data.
    */
-  void updateReconOmDBWithNewSnapshot() throws IOException;
+  void stop();
 
   /**
    * Return instance of OM Metadata manager.
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ContainerDBServiceProviderImpl.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ContainerDBServiceProviderImpl.java
index 42aab2e..b1532fa 100644
--- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ContainerDBServiceProviderImpl.java
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ContainerDBServiceProviderImpl.java
@@ -37,6 +37,7 @@ import javax.inject.Singleton;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.recon.ReconUtils;
 import org.apache.hadoop.ozone.recon.api.types.ContainerKeyPrefix;
 import org.apache.hadoop.ozone.recon.api.types.ContainerMetadata;
 import org.apache.hadoop.ozone.recon.spi.ContainerDBServiceProvider;
@@ -74,6 +75,9 @@ public class ContainerDBServiceProviderImpl
   private Configuration sqlConfiguration;
 
   @Inject
+  private ReconUtils reconUtils;
+
+  @Inject
   public ContainerDBServiceProviderImpl(DBStore dbStore,
                                         Configuration sqlConfiguration) {
     globalStatsDao = new GlobalStatsDao(sqlConfiguration);
@@ -101,7 +105,8 @@ public class ContainerDBServiceProviderImpl
       throws IOException {
 
     File oldDBLocation = containerDbStore.getDbLocation();
-    containerDbStore = ReconContainerDBProvider.getNewDBStore(configuration);
+    containerDbStore = ReconContainerDBProvider
+        .getNewDBStore(configuration, reconUtils);
     containerKeyTable = containerDbStore.getTable(CONTAINER_KEY_TABLE,
         ContainerKeyPrefix.class, Integer.class);
 
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
index 389be1b..e7da3a0 100644
--- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
@@ -27,35 +27,54 @@ import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_CONNE
 import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_CONNECTION_TIMEOUT;
 import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_CONNECTION_TIMEOUT_DEFAULT;
 import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_FLUSH_PARAM;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INTERVAL;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INTERVAL_DEFAULT;
 import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SOCKET_TIMEOUT;
 import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SOCKET_TIMEOUT_DEFAULT;
-import static org.apache.hadoop.ozone.recon.ReconUtils.getReconDbDir;
-import static org.apache.hadoop.ozone.recon.ReconUtils.makeHttpCall;
-import static org.apache.hadoop.ozone.recon.ReconUtils.untarCheckpointFile;
 
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import javax.inject.Inject;
 import javax.inject.Singleton;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DBUpdatesRequest;
+import org.apache.hadoop.ozone.recon.ReconUtils;
 import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
 import org.apache.hadoop.ozone.recon.spi.OzoneManagerServiceProvider;
+import org.apache.hadoop.ozone.recon.tasks.OMDBUpdatesHandler;
+import org.apache.hadoop.ozone.recon.tasks.OMUpdateEventBatch;
+import org.apache.hadoop.ozone.recon.tasks.ReconTaskController;
 import org.apache.hadoop.utils.db.DBCheckpoint;
+import org.apache.hadoop.utils.db.DBUpdatesWrapper;
+import org.apache.hadoop.utils.db.RDBBatchOperation;
+import org.apache.hadoop.utils.db.RDBStore;
 import org.apache.hadoop.utils.db.RocksDBCheckpoint;
 import org.apache.http.client.config.RequestConfig;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.ratis.protocol.ClientId;
+import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao;
+import org.hadoop.ozone.recon.schema.tables.pojos.ReconTaskStatus;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,11 +94,28 @@ public class OzoneManagerServiceProviderImpl
   private File omSnapshotDBParentDir = null;
   private String omDBSnapshotUrl;
 
-  @Inject
+  private OzoneManagerProtocol ozoneManagerClient;
+  private final ClientId clientId = ClientId.randomId();
+  private final OzoneConfiguration configuration;
+  private final ScheduledExecutorService scheduler =
+      Executors.newScheduledThreadPool(1);
+
   private ReconOMMetadataManager omMetadataManager;
+  private ReconTaskController reconTaskController;
+  private ReconTaskStatusDao reconTaskStatusDao;
+  private ReconUtils reconUtils;
+  private enum OmSnapshotTaskName {
+    OM_DB_FULL_SNAPSHOT,
+    OM_DB_DELTA_UPDATES
+  }
 
   @Inject
-  public OzoneManagerServiceProviderImpl(Configuration configuration) {
+  public OzoneManagerServiceProviderImpl(
+      OzoneConfiguration configuration,
+      ReconOMMetadataManager omMetadataManager,
+      ReconTaskController reconTaskController,
+      ReconUtils reconUtils,
+      OzoneManagerProtocol ozoneManagerClient) throws IOException {
 
     String ozoneManagerHttpAddress = configuration.get(OMConfigKeys
         .OZONE_OM_HTTP_ADDRESS_KEY);
@@ -87,7 +123,7 @@ public class OzoneManagerServiceProviderImpl
     String ozoneManagerHttpsAddress = configuration.get(OMConfigKeys
         .OZONE_OM_HTTPS_ADDRESS_KEY);
 
-    omSnapshotDBParentDir = getReconDbDir(configuration,
+    omSnapshotDBParentDir = reconUtils.getReconDbDir(configuration,
         OZONE_RECON_OM_SNAPSHOT_DB_DIR);
 
     HttpConfig.Policy policy = DFSUtil.getHttpPolicy(configuration);
@@ -127,33 +163,39 @@ public class OzoneManagerServiceProviderImpl
       omDBSnapshotUrl += "?" + OZONE_DB_CHECKPOINT_REQUEST_FLUSH + "=true";
     }
 
+    this.reconUtils = reconUtils;
+    this.omMetadataManager = omMetadataManager;
+    this.reconTaskController = reconTaskController;
+    this.reconTaskStatusDao = reconTaskController.getReconTaskStatusDao();
+    this.ozoneManagerClient = ozoneManagerClient;
+    this.configuration = configuration;
   }
 
   @Override
-  public void init() throws IOException {
-    updateReconOmDBWithNewSnapshot();
+  public OMMetadataManager getOMMetadataManagerInstance() {
+    return omMetadataManager;
   }
 
   @Override
-  public void updateReconOmDBWithNewSnapshot() throws IOException {
-    //Obtain the current DB snapshot from OM and
-    //update the in house OM metadata managed DB instance.
-    DBCheckpoint dbSnapshot = getOzoneManagerDBSnapshot();
-    if (dbSnapshot != null && dbSnapshot.getCheckpointLocation() != null) {
-      try {
-        omMetadataManager.updateOmDB(dbSnapshot.getCheckpointLocation()
-            .toFile());
-      } catch (IOException e) {
-        LOG.error("Unable to refresh Recon OM DB Snapshot. ", e);
-      }
-    } else {
-      LOG.error("Null snapshot location got from OM.");
-    }
+  public void start() {
+    long initialDelay = configuration.getTimeDuration(
+        RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY,
+        RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    long interval = configuration.getTimeDuration(
+        RECON_OM_SNAPSHOT_TASK_INTERVAL,
+        RECON_OM_SNAPSHOT_TASK_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    scheduler.scheduleWithFixedDelay(this::syncDataFromOM,
+        initialDelay,
+        interval,
+        TimeUnit.MILLISECONDS);
   }
 
   @Override
-  public OMMetadataManager getOMMetadataManagerInstance() {
-    return omMetadataManager;
+  public void stop() {
+    reconTaskController.stop();
+    scheduler.shutdownNow();
   }
 
   /**
@@ -161,24 +203,24 @@ public class OzoneManagerServiceProviderImpl
    * @return DBCheckpoint instance.
    */
   @VisibleForTesting
-  protected DBCheckpoint getOzoneManagerDBSnapshot() {
+  DBCheckpoint getOzoneManagerDBSnapshot() {
     String snapshotFileName = RECON_OM_SNAPSHOT_DB + "_" + System
         .currentTimeMillis();
     File targetFile = new File(omSnapshotDBParentDir, snapshotFileName +
         ".tar.gz");
     try {
-      try (InputStream inputStream = makeHttpCall(httpClient,
+      try (InputStream inputStream = reconUtils.makeHttpCall(httpClient,
           omDBSnapshotUrl)) {
         FileUtils.copyInputStreamToFile(inputStream, targetFile);
       }
 
-      //Untar the checkpoint file.
+      // Untar the checkpoint file.
       Path untarredDbDir = Paths.get(omSnapshotDBParentDir.getAbsolutePath(),
           snapshotFileName);
-      untarCheckpointFile(targetFile, untarredDbDir);
+      reconUtils.untarCheckpointFile(targetFile, untarredDbDir);
       FileUtils.deleteQuietly(targetFile);
 
-      //TODO Create Checkpoint based on OM DB type.
+      // TODO Create Checkpoint based on OM DB type.
       // Currently, OM DB type is not configurable. Hence, defaulting to
       // RocksDB.
       return new RocksDBCheckpoint(untarredDbDir);
@@ -187,5 +229,119 @@ public class OzoneManagerServiceProviderImpl
     }
     return null;
   }
+
+  /**
+   * Update Local OM DB with new OM DB snapshot.
+   * @throws IOException
+   */
+  @VisibleForTesting
+  void updateReconOmDBWithNewSnapshot() throws IOException {
+    // Obtain the current DB snapshot from OM and
+    // update the in house OM metadata managed DB instance.
+    DBCheckpoint dbSnapshot = getOzoneManagerDBSnapshot();
+    if (dbSnapshot != null && dbSnapshot.getCheckpointLocation() != null) {
+      try {
+        omMetadataManager.updateOmDB(dbSnapshot.getCheckpointLocation()
+            .toFile());
+      } catch (IOException e) {
+        LOG.error("Unable to refresh Recon OM DB Snapshot. ", e);
+      }
+    } else {
+      LOG.error("Null snapshot location got from OM.");
+    }
+  }
+
+  /**
+   * Get Delta updates from OM through RPC call and apply to local OM DB as
+   * well as accumulate in a buffer.
+   * @param fromSequenceNumber from sequence number to request from.
+   * @param omdbUpdatesHandler OM DB updates handler to buffer updates.
+   * @throws IOException when OM RPC request fails.
+   * @throws RocksDBException when writing to RocksDB fails.
+   */
+  @VisibleForTesting
+  void getAndApplyDeltaUpdatesFromOM(
+      long fromSequenceNumber, OMDBUpdatesHandler omdbUpdatesHandler)
+      throws IOException, RocksDBException {
+    DBUpdatesRequest dbUpdatesRequest = DBUpdatesRequest.newBuilder()
+        .setSequenceNumber(fromSequenceNumber).build();
+    DBUpdatesWrapper dbUpdates = ozoneManagerClient.getDBUpdates(
+        dbUpdatesRequest);
+    if (null != dbUpdates) {
+      RDBStore rocksDBStore = (RDBStore)omMetadataManager.getStore();
+      RocksDB rocksDB = rocksDBStore.getDb();
+      LOG.debug("Number of updates received from OM : " +
+          dbUpdates.getData().size());
+      for (byte[] data : dbUpdates.getData()) {
+        WriteBatch writeBatch = new WriteBatch(data);
+        writeBatch.iterate(omdbUpdatesHandler);
+        RDBBatchOperation rdbBatchOperation = new RDBBatchOperation(writeBatch);
+        rdbBatchOperation.commit(rocksDB, new WriteOptions());
+      }
+    }
+  }
+
+  /**
+   * Based on current state of Recon's OM DB, we either get delta updates or
+   * full snapshot from Ozone Manager.
+   */
+  @VisibleForTesting
+  void syncDataFromOM() {
+    long currentSequenceNumber = getCurrentOMDBSequenceNumber();
+    boolean fullSnapshot = false;
+
+    if (currentSequenceNumber <= 0) {
+      fullSnapshot = true;
+    } else {
+      OMDBUpdatesHandler omdbUpdatesHandler =
+          new OMDBUpdatesHandler(omMetadataManager);
+      try {
+        // Get updates from OM and apply to local Recon OM DB.
+        getAndApplyDeltaUpdatesFromOM(currentSequenceNumber,
+            omdbUpdatesHandler);
+        // Update timestamp of successful delta updates query.
+        ReconTaskStatus reconTaskStatusRecord = new ReconTaskStatus(
+            OmSnapshotTaskName.OM_DB_DELTA_UPDATES.name(),
+                System.currentTimeMillis(), getCurrentOMDBSequenceNumber());
+        reconTaskStatusDao.update(reconTaskStatusRecord);
+        // Pass on DB update events to tasks that are listening.
+        reconTaskController.consumeOMEvents(new OMUpdateEventBatch(
+            omdbUpdatesHandler.getEvents()), omMetadataManager);
+      } catch (IOException | InterruptedException | RocksDBException e) {
+        LOG.warn("Unable to get and apply delta updates from OM.", e);
+        fullSnapshot = true;
+      }
+    }
+
+    if (fullSnapshot) {
+      try {
+        // Update local Recon OM DB to new snapshot.
+        updateReconOmDBWithNewSnapshot();
+        // Update timestamp of successful delta updates query.
+        ReconTaskStatus reconTaskStatusRecord =
+            new ReconTaskStatus(
+                OmSnapshotTaskName.OM_DB_FULL_SNAPSHOT.name(),
+                System.currentTimeMillis(), getCurrentOMDBSequenceNumber());
+        reconTaskStatusDao.update(reconTaskStatusRecord);
+        // Reinitialize tasks that are listening.
+        reconTaskController.reInitializeTasks(omMetadataManager);
+      } catch (IOException | InterruptedException e) {
+        LOG.error("Unable to update Recon's OM DB with new snapshot ", e);
+      }
+    }
+  }
+
+  /**
+   * Get OM RocksDB's latest sequence number.
+   * @return latest sequence number.
+   */
+  private long getCurrentOMDBSequenceNumber() {
+    RDBStore rocksDBStore = (RDBStore)omMetadataManager.getStore();
+    if (null == rocksDBStore) {
+      return 0;
+    } else {
+      return rocksDBStore.getDb().getLatestSequenceNumber();
+    }
+  }
 }
 
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconContainerDBProvider.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconContainerDBProvider.java
index de5c030..9b99f70 100644
--- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconContainerDBProvider.java
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconContainerDBProvider.java
@@ -22,11 +22,11 @@ import static org.apache.hadoop.ozone.recon.ReconConstants.CONTAINER_KEY_COUNT_T
 import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_CONTAINER_DB;
 import static org.apache.hadoop.ozone.recon.ReconConstants.CONTAINER_KEY_TABLE;
 import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DB_DIR;
-import static org.apache.hadoop.ozone.recon.ReconUtils.getReconDbDir;
 
 import java.nio.file.Path;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.recon.ReconUtils;
 import org.apache.hadoop.ozone.recon.api.types.ContainerKeyPrefix;
 import org.apache.hadoop.utils.db.DBStore;
 import org.apache.hadoop.utils.db.DBStoreBuilder;
@@ -52,9 +52,12 @@ public class ReconContainerDBProvider implements Provider<DBStore> {
   @Inject
   private OzoneConfiguration configuration;
 
+  @Inject
+  private ReconUtils reconUtils;
+
   @Override
   public DBStore get() {
-    DBStore dbStore = getNewDBStore(configuration);
+    DBStore dbStore = getNewDBStore(configuration, reconUtils);
     if (dbStore == null) {
       throw new ProvisionException("Unable to provide instance of DBStore " +
           "store.");
@@ -62,11 +65,13 @@ public class ReconContainerDBProvider implements Provider<DBStore> {
     return dbStore;
   }
 
-  public static DBStore getNewDBStore(OzoneConfiguration configuration) {
+  public static DBStore getNewDBStore(OzoneConfiguration configuration,
+                                      ReconUtils reconUtils) {
     DBStore dbStore = null;
     String dbName = RECON_CONTAINER_DB + "_" + System.currentTimeMillis();
     try {
-      Path metaDir = getReconDbDir(configuration, OZONE_RECON_DB_DIR).toPath();
+      Path metaDir = reconUtils.getReconDbDir(
+          configuration, OZONE_RECON_DB_DIR).toPath();
       dbStore = DBStoreBuilder.newBuilder(configuration)
           .setPath(metaDir)
           .setName(dbName)
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java
index ec8695c..18a1e38 100644
--- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java
@@ -18,11 +18,13 @@
 
 package org.apache.hadoop.ozone.recon.tasks;
 
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE;
+
 import java.io.IOException;
 import java.time.Duration;
 import java.time.Instant;
-import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -41,28 +43,23 @@ import org.apache.hadoop.utils.db.TableIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.inject.Inject;
+
 /**
  * Class to iterate over the OM DB and populate the Recon container DB with
  * the container -> Key reverse mapping.
  */
-public class ContainerKeyMapperTask extends ReconDBUpdateTask {
+public class ContainerKeyMapperTask implements ReconDBUpdateTask {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(ContainerKeyMapperTask.class);
 
   private ContainerDBServiceProvider containerDBServiceProvider;
-  private Collection<String> tables = new ArrayList<>();
 
+  @Inject
   public ContainerKeyMapperTask(ContainerDBServiceProvider
-                                    containerDBServiceProvider,
-                                OMMetadataManager omMetadataManager) {
-    super("ContainerKeyMapperTask");
+                                    containerDBServiceProvider) {
     this.containerDBServiceProvider = containerDBServiceProvider;
-    try {
-      tables.add(omMetadataManager.getKeyTable().getName());
-    } catch (IOException ioEx) {
-      LOG.error("Unable to listen on Key Table updates ", ioEx);
-    }
   }
 
   /**
@@ -103,13 +100,19 @@ public class ContainerKeyMapperTask extends ReconDBUpdateTask {
   }
 
   @Override
-  protected Collection<String> getTaskTables() {
-    return tables;
+  public String getTaskName() {
+    return "ContainerKeyMapperTask";
+  }
+
+  @Override
+  public Collection<String> getTaskTables() {
+    return Collections.singletonList(KEY_TABLE);
   }
 
   @Override
-  Pair<String, Boolean> process(OMUpdateEventBatch events) {
+  public Pair<String, Boolean> process(OMUpdateEventBatch events) {
     Iterator<OMDBUpdateEvent> eventIterator = events.getIterator();
+    int eventCount = 0;
     while (eventIterator.hasNext()) {
       OMDBUpdateEvent<String, OmKeyInfo> omdbUpdateEvent = eventIterator.next();
       String updatedKey = omdbUpdateEvent.getKey();
@@ -127,12 +130,15 @@ public class ContainerKeyMapperTask extends ReconDBUpdateTask {
         default: LOG.debug("Skipping DB update event : " + omdbUpdateEvent
             .getAction());
         }
+        eventCount++;
       } catch (IOException e) {
         LOG.error("Unexpected exception while updating key data : {} ",
             updatedKey, e);
         return new ImmutablePair<>(getTaskName(), false);
       }
     }
+    LOG.info("{} successfully processed {} OM DB update event(s).",
+        getTaskName(), eventCount);
     return new ImmutablePair<>(getTaskName(), true);
   }
 
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTask.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTask.java
index a09eaff..7432392 100644
--- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTask.java
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTask.java
@@ -33,11 +33,12 @@ import org.slf4j.LoggerFactory;
 
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE;
 import static org.apache.hadoop.ozone.recon.tasks.
     OMDBUpdateEvent.OMDBUpdateAction.DELETE;
 import static org.apache.hadoop.ozone.recon.tasks.
@@ -48,7 +49,7 @@ import static org.apache.hadoop.ozone.recon.tasks.
  * files binned into ranges (1KB, 2Kb..,4MB,.., 1TB,..1PB) to the Recon
  * fileSize DB.
  */
-public class FileSizeCountTask extends ReconDBUpdateTask {
+public class FileSizeCountTask implements ReconDBUpdateTask {
   private static final Logger LOG =
       LoggerFactory.getLogger(FileSizeCountTask.class);
 
@@ -56,19 +57,11 @@ public class FileSizeCountTask extends ReconDBUpdateTask {
   private long maxFileSizeUpperBound = 1125899906842624L; // 1 PB
   private long[] upperBoundCount;
   private long oneKb = 1024L;
-  private Collection<String> tables = new ArrayList<>();
   private FileCountBySizeDao fileCountBySizeDao;
 
   @Inject
-  public FileSizeCountTask(OMMetadataManager omMetadataManager,
-      Configuration sqlConfiguration) {
-    super("FileSizeCountTask");
-    try {
-      tables.add(omMetadataManager.getKeyTable().getName());
-      fileCountBySizeDao = new FileCountBySizeDao(sqlConfiguration);
-    } catch (Exception e) {
-      LOG.error("Unable to fetch Key Table updates ", e);
-    }
+  public FileSizeCountTask(Configuration sqlConfiguration) {
+    fileCountBySizeDao = new FileCountBySizeDao(sqlConfiguration);
     upperBoundCount = new long[getMaxBinSize()];
   }
 
@@ -98,7 +91,6 @@ public class FileSizeCountTask extends ReconDBUpdateTask {
    */
   @Override
   public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
-    LOG.info("Starting a 'reprocess' run of FileSizeCountTask.");
     Table<String, OmKeyInfo> omKeyInfoTable = omMetadataManager.getKeyTable();
     try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
         keyIter = omKeyInfoTable.iterator()) {
@@ -119,8 +111,13 @@ public class FileSizeCountTask extends ReconDBUpdateTask {
   }
 
   @Override
-  protected Collection<String> getTaskTables() {
-    return tables;
+  public String getTaskName() {
+    return "FileSizeCountTask";
+  }
+
+  @Override
+  public Collection<String> getTaskTables() {
+    return Collections.singletonList(KEY_TABLE);
   }
 
   private void updateCountFromDB() {
@@ -144,8 +141,7 @@ public class FileSizeCountTask extends ReconDBUpdateTask {
    * @return Pair
    */
   @Override
-  Pair<String, Boolean> process(OMUpdateEventBatch events) {
-    LOG.info("Starting a 'process' run of FileSizeCountTask.");
+  public Pair<String, Boolean> process(OMUpdateEventBatch events) {
     Iterator<OMDBUpdateEvent> eventIterator = events.getIterator();
 
     //update array with file size count from DB
@@ -246,9 +242,9 @@ public class FileSizeCountTask extends ReconDBUpdateTask {
         //decrement only if it had files before, default DB value is 0
         upperBoundCount[binIndex]--;
       } else {
-        LOG.debug("Cannot decrement count. Default value is 0 (zero).");
-        throw new IOException("Cannot decrement count. "
-            + "Default value is 0 (zero).");
+        LOG.warn("Unexpected error while updating bin count. Found 0 count " +
+            "for index : " + binIndex + " while processing DELETE event for "
+            + omKeyInfo.getKeyName());
       }
     }
   }
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdateEvent.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdateEvent.java
index 82b7a35..0fcabcc 100644
--- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdateEvent.java
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdateEvent.java
@@ -30,18 +30,18 @@ public final class OMDBUpdateEvent<KEY, VALUE> {
   private final String table;
   private final KEY updatedKey;
   private final VALUE updatedValue;
-  private final EventInfo eventInfo;
+  private final long sequenceNumber;
 
   private OMDBUpdateEvent(OMDBUpdateAction action,
                           String table,
                           KEY updatedKey,
                           VALUE updatedValue,
-                          EventInfo eventInfo) {
+                          long sequenceNumber) {
     this.action = action;
     this.table = table;
     this.updatedKey = updatedKey;
     this.updatedValue = updatedValue;
-    this.eventInfo = eventInfo;
+    this.sequenceNumber = sequenceNumber;
   }
 
   public OMDBUpdateAction getAction() {
@@ -60,8 +60,8 @@ public final class OMDBUpdateEvent<KEY, VALUE> {
     return updatedValue;
   }
 
-  public EventInfo getEventInfo() {
-    return eventInfo;
+  public long getSequenceNumber() {
+    return sequenceNumber;
   }
 
   /**
@@ -75,7 +75,7 @@ public final class OMDBUpdateEvent<KEY, VALUE> {
     private String table;
     private KEY updatedKey;
     private VALUE updatedValue;
-    private EventInfo eventInfo;
+    private long lastSequenceNumber;
 
     OMUpdateEventBuilder setAction(OMDBUpdateAction omdbUpdateAction) {
       this.action = omdbUpdateAction;
@@ -97,10 +97,8 @@ public final class OMDBUpdateEvent<KEY, VALUE> {
       return this;
     }
 
-    OMUpdateEventBuilder setEventInfo(long sequenceNumber,
-                                      long eventTimestampMillis) {
-      this.eventInfo = new EventInfo(sequenceNumber,
-          eventTimestampMillis);
+    OMUpdateEventBuilder setSequenceNumber(long sequenceNumber) {
+      this.lastSequenceNumber = sequenceNumber;
       return this;
     }
 
@@ -114,30 +112,7 @@ public final class OMDBUpdateEvent<KEY, VALUE> {
           table,
           updatedKey,
           updatedValue,
-          eventInfo);
-    }
-  }
-
-  /**
-   * Class used to hold timing information for an event. (Seq number and
-   * timestamp)
-   */
-  public static class EventInfo {
-    private long sequenceNumber;
-    private long eventTimestampMillis;
-
-    public EventInfo(long sequenceNumber,
-                     long eventTimestampMillis) {
-      this.sequenceNumber = sequenceNumber;
-      this.eventTimestampMillis = eventTimestampMillis;
-    }
-
-    public long getSequenceNumber() {
-      return sequenceNumber;
-    }
-
-    public long getEventTimestampMillis() {
-      return eventTimestampMillis;
+          lastSequenceNumber);
     }
   }
 
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdatesHandler.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdatesHandler.java
index d2d11b2..00bbade 100644
--- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdatesHandler.java
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdatesHandler.java
@@ -78,6 +78,11 @@ public class OMDBUpdatesHandler extends WriteBatch.Handler {
 
   /**
    *
+   * @param cfIndex
+   * @param keyBytes
+   * @param valueBytes
+   * @param action
+   * @throws IOException
    */
   private void processEvent(int cfIndex, byte[] keyBytes, byte[]
       valueBytes, OMDBUpdateEvent.OMDBUpdateAction action)
@@ -100,8 +105,8 @@ public class OMDBUpdatesHandler extends WriteBatch.Handler {
 
       builder.setAction(action);
       OMDBUpdateEvent event = builder.build();
-      LOG.info("Generated OM update Event for table : " + event.getTable()
-          + ", Key = " + event.getKey());
+      LOG.debug("Generated OM update Event for table : " + event.getTable()
+          + ", Key = " + event.getKey() + ", action = " + event.getAction());
       // Temporarily adding to an event buffer for testing. In subsequent JIRAs,
       // a Recon side class will be implemented that requests delta updates
       // from OM and calls on this handler. In that case, we will fill up
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMUpdateEventBatch.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMUpdateEventBatch.java
index 3b7cc5b..f137418 100644
--- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMUpdateEventBatch.java
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMUpdateEventBatch.java
@@ -31,7 +31,7 @@ public class OMUpdateEventBatch {
 
   private List<OMDBUpdateEvent> events;
 
-  OMUpdateEventBatch(Collection<OMDBUpdateEvent> e) {
+  public OMUpdateEventBatch(Collection<OMDBUpdateEvent> e) {
     events = new ArrayList<>(e);
   }
 
@@ -39,11 +39,11 @@ public class OMUpdateEventBatch {
    * Get Sequence Number and timestamp of last event in this batch.
    * @return Event Info instance.
    */
-  OMDBUpdateEvent.EventInfo getLastEventInfo() {
+  long getLastSequenceNumber() {
     if (events.isEmpty()) {
-      return new OMDBUpdateEvent.EventInfo(-1, -1);
+      return -1;
     } else {
-      return events.get(events.size() - 1).getEventInfo();
+      return events.get(events.size() - 1).getSequenceNumber();
     }
   }
 
@@ -66,4 +66,12 @@ public class OMUpdateEventBatch {
         .filter(e -> tables.contains(e.getTable()))
         .collect(Collectors.toList()));
   }
+
+  /**
+   * Return if empty.
+   * @return true if empty, else false.
+   */
+  public boolean isEmpty() {
+    return !getIterator().hasNext();
+  }
 }
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconDBUpdateTask.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconDBUpdateTask.java
index d828577..426e0ae 100644
--- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconDBUpdateTask.java
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconDBUpdateTask.java
@@ -24,43 +24,35 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 
 /**
- * Abstract class used to denote a Recon task that needs to act on OM DB events.
+ * Interface used to denote a Recon task that needs to act on OM DB events.
  */
-public abstract class ReconDBUpdateTask {
-
-  private String taskName;
-
-  protected ReconDBUpdateTask(String taskName) {
-    this.taskName = taskName;
-  }
+public interface ReconDBUpdateTask {
 
   /**
    * Return task name.
    * @return task name
    */
-  public String getTaskName() {
-    return taskName;
-  }
+  String getTaskName();
 
   /**
    * Return the list of tables that the task is listening on.
    * Empty list means the task is NOT listening on any tables.
    * @return Collection of Tables.
    */
-  protected abstract Collection<String> getTaskTables();
+  Collection<String> getTaskTables();
 
   /**
    * Process a set of OM events on tables that the task is listening on.
    * @param events Set of events to be processed by the task.
    * @return Pair of task name -> task success.
    */
-  abstract Pair<String, Boolean> process(OMUpdateEventBatch events);
+  Pair<String, Boolean> process(OMUpdateEventBatch events);
 
   /**
    * Process a  on tables that the task is listening on.
    * @param omMetadataManager OM Metadata manager instance.
    * @return Pair of task name -> task success.
    */
-  abstract Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager);
+  Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager);
 
 }
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskController.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskController.java
index 7548cc9..728a199 100644
--- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskController.java
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskController.java
@@ -20,6 +20,9 @@ package org.apache.hadoop.ozone.recon.tasks;
 
 import java.util.Map;
 
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao;
+
 /**
  * Controller used by Recon to manage Tasks that are waiting on Recon events.
  */
@@ -36,11 +39,31 @@ public interface ReconTaskController {
    * @param events set of events
    * @throws InterruptedException InterruptedException
    */
-  void consumeOMEvents(OMUpdateEventBatch events) throws InterruptedException;
+  void consumeOMEvents(OMUpdateEventBatch events,
+                       OMMetadataManager omMetadataManager)
+      throws InterruptedException;
+
+  /**
+   * Pass on the handle to a new OM DB instance to the registered tasks.
+   * @param omMetadataManager OM Metadata Manager instance
+   */
+  void reInitializeTasks(OMMetadataManager omMetadataManager)
+      throws InterruptedException;
 
   /**
    * Get set of registered tasks.
    * @return Map of Task name -> Task.
    */
   Map<String, ReconDBUpdateTask> getRegisteredTasks();
+
+  /**
+   * Get instance of ReconTaskStatusDao.
+   * @return instance of ReconTaskStatusDao
+   */
+  ReconTaskStatusDao getReconTaskStatusDao();
+
+  /**
+   * Stop the tasks. Start API is not needed since it is implicit.
+   */
+  void stop();
 }
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java
index 3fd7d96..9135705 100644
--- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java
@@ -26,6 +26,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -36,7 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao;
 import org.hadoop.ozone.recon.schema.tables.pojos.ReconTaskStatus;
 import org.jooq.Configuration;
@@ -57,21 +58,22 @@ public class ReconTaskControllerImpl implements ReconTaskController {
   private ExecutorService executorService;
   private int threadCount = 1;
   private final Semaphore taskSemaphore = new Semaphore(1);
-  private final ReconOMMetadataManager omMetadataManager;
   private Map<String, AtomicInteger> taskFailureCounter = new HashMap<>();
   private static final int TASK_FAILURE_THRESHOLD = 2;
   private ReconTaskStatusDao reconTaskStatusDao;
 
   @Inject
   public ReconTaskControllerImpl(OzoneConfiguration configuration,
-                                 ReconOMMetadataManager omMetadataManager,
-                                 Configuration sqlConfiguration) {
-    this.omMetadataManager = omMetadataManager;
+                                 Configuration sqlConfiguration,
+                                 Set<ReconDBUpdateTask> tasks) {
     reconDBUpdateTasks = new HashMap<>();
     threadCount = configuration.getInt(OZONE_RECON_TASK_THREAD_COUNT_KEY,
         OZONE_RECON_TASK_THREAD_COUNT_DEFAULT);
     executorService = Executors.newFixedThreadPool(threadCount);
     reconTaskStatusDao = new ReconTaskStatusDao(sqlConfiguration);
+    for (ReconDBUpdateTask task : tasks) {
+      registerTask(task);
+    }
   }
 
   @Override
@@ -86,7 +88,9 @@ public class ReconTaskControllerImpl implements ReconTaskController {
     // Create DB record for the task.
     ReconTaskStatus reconTaskStatusRecord = new ReconTaskStatus(taskName,
         0L, 0L);
-    reconTaskStatusDao.insert(reconTaskStatusRecord);
+    if (!reconTaskStatusDao.existsById(taskName)) {
+      reconTaskStatusDao.insert(reconTaskStatusRecord);
+    }
   }
 
   /**
@@ -98,7 +102,69 @@ public class ReconTaskControllerImpl implements ReconTaskController {
    * @throws InterruptedException
    */
   @Override
-  public void consumeOMEvents(OMUpdateEventBatch events)
+  public void consumeOMEvents(OMUpdateEventBatch events,
+                              OMMetadataManager omMetadataManager)
+      throws InterruptedException {
+    taskSemaphore.acquire();
+
+    try {
+      if (!events.isEmpty()) {
+        Collection<Callable<Pair>> tasks = new ArrayList<>();
+        for (Map.Entry<String, ReconDBUpdateTask> taskEntry :
+            reconDBUpdateTasks.entrySet()) {
+          ReconDBUpdateTask task = taskEntry.getValue();
+          Collection<String> tables = task.getTaskTables();
+          tasks.add(() -> task.process(events.filter(tables)));
+        }
+
+        List<Future<Pair>> results = executorService.invokeAll(tasks);
+        List<String> failedTasks = processTaskResults(results, events);
+
+        // Retry
+        List<String> retryFailedTasks = new ArrayList<>();
+        if (!failedTasks.isEmpty()) {
+          tasks.clear();
+          for (String taskName : failedTasks) {
+            ReconDBUpdateTask task = reconDBUpdateTasks.get(taskName);
+            Collection<String> tables = task.getTaskTables();
+            tasks.add(() -> task.process(events.filter(tables)));
+          }
+          results = executorService.invokeAll(tasks);
+          retryFailedTasks = processTaskResults(results, events);
+        }
+
+        // Reprocess the failed tasks.
+        // TODO Move to a separate task queue since reprocess may be a heavy
+        // operation for large OM DB instances
+        if (!retryFailedTasks.isEmpty()) {
+          tasks.clear();
+          for (String taskName : failedTasks) {
+            ReconDBUpdateTask task = reconDBUpdateTasks.get(taskName);
+            tasks.add(() -> task.reprocess(omMetadataManager));
+          }
+          results = executorService.invokeAll(tasks);
+          List<String> reprocessFailedTasks =
+              processTaskResults(results, events);
+          for (String taskName : reprocessFailedTasks) {
+            LOG.info("Reprocess step failed for task : " + taskName);
+            if (taskFailureCounter.get(taskName).incrementAndGet() >
+                TASK_FAILURE_THRESHOLD) {
+              LOG.info("Blacklisting Task since it failed retry and " +
+                  "reprocess more than " + TASK_FAILURE_THRESHOLD + " times.");
+              reconDBUpdateTasks.remove(taskName);
+            }
+          }
+        }
+      }
+    } catch (ExecutionException e) {
+      LOG.error("Unexpected error : ", e);
+    } finally {
+      taskSemaphore.release();
+    }
+  }
+
+  @Override
+  public void reInitializeTasks(OMMetadataManager omMetadataManager)
       throws InterruptedException {
     taskSemaphore.acquire();
 
@@ -107,43 +173,14 @@ public class ReconTaskControllerImpl implements ReconTaskController {
       for (Map.Entry<String, ReconDBUpdateTask> taskEntry :
           reconDBUpdateTasks.entrySet()) {
         ReconDBUpdateTask task = taskEntry.getValue();
-        tasks.add(() -> task.process(events));
+        tasks.add(() -> task.reprocess(omMetadataManager));
       }
 
       List<Future<Pair>> results = executorService.invokeAll(tasks);
-      List<String> failedTasks = processTaskResults(results, events);
-
-      //Retry
-      List<String> retryFailedTasks = new ArrayList<>();
-      if (!failedTasks.isEmpty()) {
-        tasks.clear();
-        for (String taskName : failedTasks) {
-          ReconDBUpdateTask task = reconDBUpdateTasks.get(taskName);
-          tasks.add(() -> task.process(events));
-        }
-        results = executorService.invokeAll(tasks);
-        retryFailedTasks = processTaskResults(results, events);
-      }
-
-      //Reprocess
-      //TODO Move to a separate task queue since reprocess may be a heavy
-      //operation for large OM DB instances
-      if (!retryFailedTasks.isEmpty()) {
-        tasks.clear();
-        for (String taskName : failedTasks) {
-          ReconDBUpdateTask task = reconDBUpdateTasks.get(taskName);
-          tasks.add(() -> task.reprocess(omMetadataManager));
-        }
-        results = executorService.invokeAll(tasks);
-        List<String> reprocessFailedTasks = processTaskResults(results, events);
-        for (String taskName : reprocessFailedTasks) {
-          LOG.info("Reprocess step failed for task : " + taskName);
-          if (taskFailureCounter.get(taskName).incrementAndGet() >
-              TASK_FAILURE_THRESHOLD) {
-            LOG.info("Blacklisting Task since it failed retry and " +
-                "reprocess more than " + TASK_FAILURE_THRESHOLD + " times.");
-            reconDBUpdateTasks.remove(taskName);
-          }
+      for (Future<Pair> f : results) {
+        String taskName = f.get().getLeft().toString();
+        if (!(Boolean)f.get().getRight()) {
+          LOG.info("Init failed for task : " + taskName);
         }
       }
     } catch (ExecutionException e) {
@@ -157,12 +194,12 @@ public class ReconTaskControllerImpl implements ReconTaskController {
    * Store the last completed event sequence number and timestamp to the DB
    * for that task.
    * @param taskName taskname to be updated.
-   * @param eventInfo contains the new sequence number and timestamp.
+   * @param lastSequenceNumber contains the new sequence number.
    */
   private void storeLastCompletedTransaction(
-      String taskName, OMDBUpdateEvent.EventInfo eventInfo) {
+      String taskName, long lastSequenceNumber) {
     ReconTaskStatus reconTaskStatusRecord = new ReconTaskStatus(taskName,
-        eventInfo.getEventTimestampMillis(), eventInfo.getSequenceNumber());
+        System.currentTimeMillis(), lastSequenceNumber);
     reconTaskStatusDao.update(reconTaskStatusRecord);
   }
 
@@ -171,6 +208,16 @@ public class ReconTaskControllerImpl implements ReconTaskController {
     return reconDBUpdateTasks;
   }
 
+  @Override
+  public ReconTaskStatusDao getReconTaskStatusDao() {
+    return reconTaskStatusDao;
+  }
+
+  @Override
+  public void stop() {
+    this.executorService.shutdownNow();
+  }
+
   /**
    * Wait on results of all tasks.
    * @param results Set of Futures.
@@ -190,7 +237,7 @@ public class ReconTaskControllerImpl implements ReconTaskController {
         failedTasks.add(f.get().getLeft().toString());
       } else {
         taskFailureCounter.get(taskName).set(0);
-        storeLastCompletedTransaction(taskName, events.getLastEventInfo());
+        storeLastCompletedTransaction(taskName, events.getLastSequenceNumber());
       }
     }
     return failedTasks;
diff --git a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/AbstractOMMetadataManagerTest.java b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/AbstractOMMetadataManagerTest.java
index 7dc987d..fe2cf49 100644
--- a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/AbstractOMMetadataManagerTest.java
+++ b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/AbstractOMMetadataManagerTest.java
@@ -56,7 +56,7 @@ public abstract class AbstractOMMetadataManagerTest {
   public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
   /**
-   * Create a new OM Metadata manager instance.
+   * Create a new OM Metadata manager instance with default volume and bucket.
    * @throws IOException ioEx
    */
   protected OMMetadataManager initializeNewOmMetadataManager()
@@ -88,6 +88,19 @@ public abstract class AbstractOMMetadataManagerTest {
   }
 
   /**
+   * Create an empty OM Metadata manager instance.
+   * @throws IOException ioEx
+   */
+  protected OMMetadataManager initializeEmptyOmMetadataManager()
+      throws IOException {
+    File omDbDir = temporaryFolder.newFolder();
+    OzoneConfiguration omConfiguration = new OzoneConfiguration();
+    omConfiguration.set(OZONE_OM_DB_DIRS,
+        omDbDir.getAbsolutePath());
+    return new OmMetadataManagerImpl(omConfiguration);
+  }
+
+  /**
    * Get an instance of Recon OM Metadata manager.
    * @return ReconOMMetadataManager
    * @throws IOException when creating the RocksDB instance.
diff --git a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconUtils.java b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconUtils.java
index f531bb2..ad04837 100644
--- a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconUtils.java
+++ b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconUtils.java
@@ -61,7 +61,7 @@ public class TestReconUtils {
     OzoneConfiguration configuration = new OzoneConfiguration();
     configuration.set("TEST_DB_DIR", filePath);
 
-    File file = ReconUtils.getReconDbDir(configuration,
+    File file = new ReconUtils().getReconDbDir(configuration,
         "TEST_DB_DIR");
     Assert.assertEquals(filePath, file.getAbsolutePath());
   }
@@ -89,7 +89,7 @@ public class TestReconUtils {
     //Create test tar file.
     File tarFile = OmUtils.createTarFile(newDir.toPath());
     File outputDir = folder.newFolder();
-    ReconUtils.untarCheckpointFile(tarFile, outputDir.toPath());
+    new ReconUtils().untarCheckpointFile(tarFile, outputDir.toPath());
 
     assertTrue(outputDir.isDirectory());
     assertTrue(outputDir.listFiles().length == 2);
@@ -126,7 +126,8 @@ public class TestReconUtils {
       }
     });
 
-    InputStream inputStream = ReconUtils.makeHttpCall(httpClientMock, url);
+    InputStream inputStream = new ReconUtils()
+        .makeHttpCall(httpClientMock, url);
     String contents = IOUtils.toString(inputStream, Charset.defaultCharset());
 
     assertEquals("File 1 Contents", contents);
diff --git a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerKeyService.java b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerKeyService.java
index 3ae39a6..3bef4a0 100644
--- a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerKeyService.java
+++ b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerKeyService.java
@@ -20,10 +20,9 @@ package org.apache.hadoop.ozone.recon.api;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -36,15 +35,12 @@ import javax.ws.rs.core.Response;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.recon.AbstractOMMetadataManagerTest;
 import org.apache.hadoop.ozone.recon.GuiceInjectorUtilsForTestsImpl;
-import org.apache.hadoop.ozone.recon.ReconUtils;
 import org.apache.hadoop.ozone.recon.api.types.ContainerMetadata;
 import org.apache.hadoop.ozone.recon.api.types.ContainersResponse;
 import org.apache.hadoop.ozone.recon.api.types.KeyMetadata;
@@ -53,53 +49,33 @@ import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
 import org.apache.hadoop.ozone.recon.spi.ContainerDBServiceProvider;
 import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl;
 import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTask;
-import org.apache.hadoop.utils.db.DBCheckpoint;
-import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.hadoop.utils.db.Table;
 import org.hadoop.ozone.recon.schema.StatsSchemaDefinition;
 import org.jooq.impl.DSL;
 import org.jooq.impl.DefaultConfiguration;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Injector;
 
-import org.junit.rules.TemporaryFolder;
-
 /**
  * Test for container key service.
  */
-@RunWith(PowerMockRunner.class)
-@PowerMockIgnore({"javax.management.*", "javax.net.ssl.*"})
-@PrepareForTest(ReconUtils.class)
 public class TestContainerKeyService extends AbstractOMMetadataManagerTest {
 
-  @Rule
-  public TemporaryFolder temporaryFolder = new TemporaryFolder();
   private ContainerDBServiceProvider containerDbServiceProvider;
-  private OMMetadataManager omMetadataManager;
   private Injector injector;
   private OzoneManagerServiceProviderImpl ozoneManagerServiceProvider;
   private ContainerKeyService containerKeyService;
   private GuiceInjectorUtilsForTestsImpl guiceInjectorTest =
       new GuiceInjectorUtilsForTestsImpl();
   private boolean isSetupDone = false;
-
+  private ReconOMMetadataManager reconOMMetadataManager;
   private void initializeInjector() throws Exception {
-    omMetadataManager = initializeNewOmMetadataManager();
-    OzoneConfiguration configuration =
-        guiceInjectorTest.getTestOzoneConfiguration(temporaryFolder);
-
-    ozoneManagerServiceProvider = new OzoneManagerServiceProviderImpl(
-        configuration);
-    ReconOMMetadataManager reconOMMetadataManager =
-        getTestMetadataManager(omMetadataManager);
+    reconOMMetadataManager = getTestMetadataManager(
+        initializeNewOmMetadataManager());
+    ozoneManagerServiceProvider = getMockOzoneManagerServiceProvider();
 
     Injector parentInjector = guiceInjectorTest.getInjector(
         ozoneManagerServiceProvider, reconOMMetadataManager, temporaryFolder);
@@ -150,7 +126,7 @@ public class TestContainerKeyService extends AbstractOMMetadataManagerTest {
         OmKeyLocationInfoGroup(0, omKeyLocationInfoList);
 
     //key = key_one, Blocks = [ {CID = 1, LID = 101}, {CID = 2, LID = 102} ]
-    writeDataToOm(omMetadataManager,
+    writeDataToOm(reconOMMetadataManager,
         "key_one", "bucketOne", "sampleVol",
         Collections.singletonList(omKeyLocationInfoGroup));
 
@@ -174,7 +150,7 @@ public class TestContainerKeyService extends AbstractOMMetadataManagerTest {
         omKeyLocationInfoListNew));
 
     //key = key_two, Blocks = [ {CID = 1, LID = 103}, {CID = 1, LID = 104} ]
-    writeDataToOm(omMetadataManager,
+    writeDataToOm(reconOMMetadataManager,
         "key_two", "bucketOne", "sampleVol", infoGroups);
 
     List<OmKeyLocationInfo> omKeyLocationInfoList2 = new ArrayList<>();
@@ -192,27 +168,18 @@ public class TestContainerKeyService extends AbstractOMMetadataManagerTest {
         OmKeyLocationInfoGroup(0, omKeyLocationInfoList2);
 
     //key = key_three, Blocks = [ {CID = 2, LID = 2}, {CID = 2, LID = 3} ]
-    writeDataToOm(omMetadataManager,
+    writeDataToOm(reconOMMetadataManager,
         "key_three", "bucketOne", "sampleVol",
         Collections.singletonList(omKeyLocationInfoGroup2));
 
-    //Take snapshot of OM DB and copy over to Recon OM DB.
-    DBCheckpoint checkpoint = omMetadataManager.getStore()
-        .getCheckpoint(true);
-    File tarFile = OmUtils.createTarFile(checkpoint.getCheckpointLocation());
-    InputStream inputStream = new FileInputStream(tarFile);
-    PowerMockito.stub(PowerMockito.method(ReconUtils.class,
-        "makeHttpCall",
-        CloseableHttpClient.class, String.class))
-        .toReturn(inputStream);
-
     //Generate Recon container DB data.
-    ContainerKeyMapperTask containerKeyMapperTask = new ContainerKeyMapperTask(
-        containerDbServiceProvider,
-        ozoneManagerServiceProvider.getOMMetadataManagerInstance());
-    ozoneManagerServiceProvider.updateReconOmDBWithNewSnapshot();
-    containerKeyMapperTask.reprocess(ozoneManagerServiceProvider
-        .getOMMetadataManagerInstance());
+    OMMetadataManager omMetadataManagerMock = mock(OMMetadataManager.class);
+    Table tableMock = mock(Table.class);
+    when(tableMock.getName()).thenReturn("KeyTable");
+    when(omMetadataManagerMock.getKeyTable()).thenReturn(tableMock);
+    ContainerKeyMapperTask containerKeyMapperTask  =
+        new ContainerKeyMapperTask(containerDbServiceProvider);
+    containerKeyMapperTask.reprocess(reconOMMetadataManager);
   }
 
   @Test
@@ -397,4 +364,10 @@ public class TestContainerKeyService extends AbstractOMMetadataManagerTest {
     assertEquals(2, containers.size());
     assertEquals(2, data.getTotalCount());
   }
+
+  private OzoneManagerServiceProviderImpl getMockOzoneManagerServiceProvider() {
+    OzoneManagerServiceProviderImpl omServiceProviderMock =
+        mock(OzoneManagerServiceProviderImpl.class);
+    return omServiceProviderMock;
+  }
 }
\ No newline at end of file
diff --git a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestUtilizationService.java b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestUtilizationService.java
index a5c7263..a3265b8 100644
--- a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestUtilizationService.java
+++ b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestUtilizationService.java
@@ -18,35 +18,25 @@
 
 package org.apache.hadoop.ozone.recon.api;
 
-import org.apache.hadoop.ozone.recon.ReconUtils;
 import org.hadoop.ozone.recon.schema.tables.daos.FileCountBySizeDao;
 import org.hadoop.ozone.recon.schema.tables.pojos.FileCountBySize;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 
 import javax.ws.rs.core.Response;
 import java.util.ArrayList;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
-import static org.powermock.api.mockito.PowerMockito.mock;
-import static org.powermock.api.mockito.PowerMockito.when;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Test for File size count service.
  */
-@RunWith(PowerMockRunner.class)
-@PowerMockIgnore({"javax.management.*", "javax.net.ssl.*"})
-@PrepareForTest(ReconUtils.class)
 public class TestUtilizationService {
   private UtilizationService utilizationService;
-  @Mock private FileCountBySizeDao fileCountBySizeDao;
   private int maxBinSize = 42;
 
   private List<FileCountBySize> setUpResultList() {
@@ -68,6 +58,7 @@ public class TestUtilizationService {
   public void testGetFileCounts() {
     List<FileCountBySize> resultList = setUpResultList();
 
+    FileCountBySizeDao fileCountBySizeDao = mock(FileCountBySizeDao.class);
     utilizationService = mock(UtilizationService.class);
     when(utilizationService.getFileCounts()).thenCallRealMethod();
     when(utilizationService.getDao()).thenReturn(fileCountBySizeDao);
diff --git a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java
index fde8142..c2a6dd8 100644
--- a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java
+++ b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java
@@ -18,104 +18,108 @@
 
 package org.apache.hadoop.ozone.recon.spi.impl;
 
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DB_DIR;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_DB_DIR;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileWriter;
+import java.io.IOException;
 import java.io.InputStream;
 import java.nio.file.Paths;
 
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.recon.AbstractOMMetadataManagerTest;
-import org.apache.hadoop.ozone.recon.GuiceInjectorUtilsForTestsImpl;
 import org.apache.hadoop.ozone.recon.ReconUtils;
 import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
+import org.apache.hadoop.ozone.recon.tasks.OMDBUpdatesHandler;
+import org.apache.hadoop.ozone.recon.tasks.OMUpdateEventBatch;
+import org.apache.hadoop.ozone.recon.tasks.ReconTaskController;
 import org.apache.hadoop.utils.db.DBCheckpoint;
-import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.hadoop.utils.db.DBUpdatesWrapper;
+import org.apache.hadoop.utils.db.RDBStore;
+import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao;
+import org.hadoop.ozone.recon.schema.tables.pojos.ReconTaskStatus;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import com.google.inject.Injector;
+import org.mockito.ArgumentCaptor;
+import org.rocksdb.RocksDB;
+import org.rocksdb.TransactionLogIterator;
+import org.rocksdb.WriteBatch;
 
 /**
  * Class to test Ozone Manager Service Provider Implementation.
  */
-@RunWith(PowerMockRunner.class)
-@PowerMockIgnore({"javax.management.*", "javax.net.ssl.*"})
-@PrepareForTest(ReconUtils.class)
 public class TestOzoneManagerServiceProviderImpl extends
     AbstractOMMetadataManagerTest {
 
-  private OMMetadataManager omMetadataManager;
-  private ReconOMMetadataManager reconOMMetadataManager;
-  private Injector injector;
-  private GuiceInjectorUtilsForTestsImpl guiceInjectorTest =
-      new GuiceInjectorUtilsForTestsImpl();
-  private OzoneManagerServiceProviderImpl ozoneManagerServiceProvider;
-  private boolean isSetupDone = false;
-
-  @Rule
-  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+  private OzoneConfiguration configuration;
+  private OzoneManagerProtocol ozoneManagerProtocol;
 
   @Before
   public void setUp() throws Exception {
-    omMetadataManager = initializeNewOmMetadataManager();
-    writeDataToOm(omMetadataManager, "key_one");
-    reconOMMetadataManager = getTestMetadataManager(omMetadataManager);
-    ozoneManagerServiceProvider =
-        new OzoneManagerServiceProviderImpl(
-            guiceInjectorTest.getTestOzoneConfiguration(temporaryFolder));
-    if (!isSetupDone) {
-      injector = guiceInjectorTest.getInjector(ozoneManagerServiceProvider,
-          reconOMMetadataManager, temporaryFolder);
-
-      isSetupDone = true;
-    }
+    configuration = new OzoneConfiguration();
+    configuration.set(OZONE_RECON_OM_SNAPSHOT_DB_DIR,
+        temporaryFolder.newFolder().getAbsolutePath());
+    configuration.set(OZONE_RECON_DB_DIR,
+        temporaryFolder.newFolder().getAbsolutePath());
+    configuration.set("ozone.om.address", "localhost:9862");
+    ozoneManagerProtocol = getMockOzoneManagerClient(new DBUpdatesWrapper());
   }
 
   @Test
-  public void testInit() throws Exception {
+  public void testUpdateReconOmDBWithNewSnapshot() throws Exception {
 
-    Assert.assertNotNull(reconOMMetadataManager.getKeyTable()
-        .get("/sampleVol/bucketOne/key_one"));
-    Assert.assertNull(reconOMMetadataManager.getKeyTable()
-        .get("/sampleVol/bucketOne/key_two"));
+    OMMetadataManager omMetadataManager = initializeNewOmMetadataManager();
+    ReconOMMetadataManager reconOMMetadataManager =
+        getTestMetadataManager(omMetadataManager);
 
+    writeDataToOm(omMetadataManager, "key_one");
     writeDataToOm(omMetadataManager, "key_two");
+
     DBCheckpoint checkpoint = omMetadataManager.getStore()
         .getCheckpoint(true);
     File tarFile = OmUtils.createTarFile(checkpoint.getCheckpointLocation());
     InputStream inputStream = new FileInputStream(tarFile);
-    PowerMockito.stub(PowerMockito.method(ReconUtils.class,
-        "makeHttpCall",
-        CloseableHttpClient.class, String.class))
-        .toReturn(inputStream);
+    ReconUtils reconUtilsMock = getMockReconUtils();
+    when(reconUtilsMock.makeHttpCall(any(), anyString()))
+        .thenReturn(inputStream);
 
-    ozoneManagerServiceProvider.init();
+    ReconTaskController reconTaskController = getMockTaskController();
 
-    Assert.assertNotNull(reconOMMetadataManager.getKeyTable()
+    OzoneManagerServiceProviderImpl ozoneManagerServiceProvider =
+        new OzoneManagerServiceProviderImpl(configuration,
+            reconOMMetadataManager, reconTaskController, reconUtilsMock,
+            ozoneManagerProtocol);
+
+    Assert.assertNull(reconOMMetadataManager.getKeyTable()
         .get("/sampleVol/bucketOne/key_one"));
-    Assert.assertNotNull(reconOMMetadataManager.getKeyTable()
+    Assert.assertNull(reconOMMetadataManager.getKeyTable()
         .get("/sampleVol/bucketOne/key_two"));
-  }
 
-  @Test
-  public void testGetOMMetadataManagerInstance() throws Exception {
-    OMMetadataManager omMetaMgr = ozoneManagerServiceProvider
-        .getOMMetadataManagerInstance();
-    assertNotNull(omMetaMgr);
+    ozoneManagerServiceProvider.updateReconOmDBWithNewSnapshot();
+
+    assertNotNull(reconOMMetadataManager.getKeyTable()
+        .get("/sampleVol/bucketOne/key_one"));
+    assertNotNull(reconOMMetadataManager.getKeyTable()
+        .get("/sampleVol/bucketOne/key_two"));
   }
 
   @Test
@@ -144,12 +148,18 @@ public class TestOzoneManagerServiceProviderImpl extends
 
     //Create test tar file.
     File tarFile = OmUtils.createTarFile(checkpointDir.toPath());
-
     InputStream fileInputStream = new FileInputStream(tarFile);
-    PowerMockito.stub(PowerMockito.method(ReconUtils.class,
-        "makeHttpCall",
-        CloseableHttpClient.class, String.class))
-        .toReturn(fileInputStream);
+    ReconUtils reconUtilsMock = getMockReconUtils();
+    when(reconUtilsMock.makeHttpCall(any(), anyString()))
+        .thenReturn(fileInputStream);
+
+    ReconOMMetadataManager reconOMMetadataManager =
+        mock(ReconOMMetadataManager.class);
+    ReconTaskController reconTaskController = getMockTaskController();
+    OzoneManagerServiceProviderImpl ozoneManagerServiceProvider =
+        new OzoneManagerServiceProviderImpl(configuration,
+            reconOMMetadataManager, reconTaskController, reconUtilsMock,
+            ozoneManagerProtocol);
 
     DBCheckpoint checkpoint = ozoneManagerServiceProvider
         .getOzoneManagerDBSnapshot();
@@ -158,4 +168,150 @@ public class TestOzoneManagerServiceProviderImpl extends
     assertTrue(checkpoint.getCheckpointLocation().toFile()
         .listFiles().length == 2);
   }
+
+  @Test
+  public void testGetAndApplyDeltaUpdatesFromOM() throws Exception {
+
+    // Writing 2 Keys into a source OM DB and collecting it in a
+    // DBUpdatesWrapper.
+    OMMetadataManager sourceOMMetadataMgr = initializeNewOmMetadataManager();
+    writeDataToOm(sourceOMMetadataMgr, "key_one");
+    writeDataToOm(sourceOMMetadataMgr, "key_two");
+
+    RocksDB rocksDB = ((RDBStore)sourceOMMetadataMgr.getStore()).getDb();
+    TransactionLogIterator transactionLogIterator = rocksDB.getUpdatesSince(0L);
+    DBUpdatesWrapper dbUpdatesWrapper = new DBUpdatesWrapper();
+    while(transactionLogIterator.isValid()) {
+      TransactionLogIterator.BatchResult result =
+          transactionLogIterator.getBatch();
+      result.writeBatch().markWalTerminationPoint();
+      WriteBatch writeBatch = result.writeBatch();
+      dbUpdatesWrapper.addWriteBatch(writeBatch.data(),
+          result.sequenceNumber());
+      transactionLogIterator.next();
+    }
+
+    // OM Service Provider's Metadata Manager.
+    OMMetadataManager omMetadataManager = initializeNewOmMetadataManager();
+
+    OzoneManagerServiceProviderImpl ozoneManagerServiceProvider =
+        new OzoneManagerServiceProviderImpl(configuration,
+            getTestMetadataManager(omMetadataManager),
+            getMockTaskController(), new ReconUtils(),
+            getMockOzoneManagerClient(dbUpdatesWrapper));
+
+    OMDBUpdatesHandler updatesHandler =
+        new OMDBUpdatesHandler(omMetadataManager);
+    ozoneManagerServiceProvider.getAndApplyDeltaUpdatesFromOM(
+        0L, updatesHandler);
+
+    // In this method, we have to assert the "GET" part and the "APPLY" path.
+
+    // Assert GET path --> verify if the OMDBUpdatesHandler picked up the 4
+    // events ( 1 Vol PUT + 1 Bucket PUT + 2 Key PUTs).
+    assertEquals(4, updatesHandler.getEvents().size());
+
+    // Assert APPLY path --> Verify if the OM service provider's RocksDB got
+    // the changes.
+    String fullKey = omMetadataManager.getOzoneKey("sampleVol",
+        "bucketOne", "key_one");
+    assertTrue(ozoneManagerServiceProvider.getOMMetadataManagerInstance()
+        .getKeyTable().isExist(fullKey));
+    fullKey = omMetadataManager.getOzoneKey("sampleVol",
+        "bucketOne", "key_two");
+    assertTrue(ozoneManagerServiceProvider.getOMMetadataManagerInstance()
+        .getKeyTable().isExist(fullKey));
+  }
+
+  @Test
+  public void testSyncDataFromOMFullSnapshot() throws Exception {
+
+    // Empty OM DB to start with.
+    ReconOMMetadataManager omMetadataManager = getTestMetadataManager(
+        initializeEmptyOmMetadataManager());
+    ReconTaskStatusDao reconTaskStatusDaoMock =
+        mock(ReconTaskStatusDao.class);
+    doNothing().when(reconTaskStatusDaoMock)
+        .update(any(ReconTaskStatus.class));
+
+    ReconTaskController reconTaskControllerMock = getMockTaskController();
+    when(reconTaskControllerMock.getReconTaskStatusDao())
+        .thenReturn(reconTaskStatusDaoMock);
+    doNothing().when(reconTaskControllerMock)
+        .reInitializeTasks(omMetadataManager);
+
+    OzoneManagerServiceProviderImpl ozoneManagerServiceProvider =
+        new OzoneManagerServiceProviderImpl(configuration, omMetadataManager,
+            reconTaskControllerMock, new ReconUtils(), ozoneManagerProtocol);
+
+    //Should trigger full snapshot request.
+    ozoneManagerServiceProvider.syncDataFromOM();
+
+    ArgumentCaptor<ReconTaskStatus> captor =
+        ArgumentCaptor.forClass(ReconTaskStatus.class);
+    verify(reconTaskStatusDaoMock, times(1))
+        .update(captor.capture());
+    assertTrue(captor.getValue().getTaskName().equals("OM_DB_FULL_SNAPSHOT"));
+    verify(reconTaskControllerMock, times(1))
+        .reInitializeTasks(omMetadataManager);
+  }
+
+  @Test
+  public void testSyncDataFromOMDeltaUpdates() throws Exception {
+
+    // Non-Empty OM DB to start with.
+    ReconOMMetadataManager omMetadataManager = getTestMetadataManager(
+        initializeNewOmMetadataManager());
+    ReconTaskStatusDao reconTaskStatusDaoMock =
+        mock(ReconTaskStatusDao.class);
+    doNothing().when(reconTaskStatusDaoMock)
+        .update(any(ReconTaskStatus.class));
+
+    ReconTaskController reconTaskControllerMock = getMockTaskController();
+    when(reconTaskControllerMock.getReconTaskStatusDao())
+        .thenReturn(reconTaskStatusDaoMock);
+    doNothing().when(reconTaskControllerMock)
+        .consumeOMEvents(any(OMUpdateEventBatch.class),
+            any(OMMetadataManager.class));
+
+    OzoneManagerServiceProviderImpl ozoneManagerServiceProvider =
+        new OzoneManagerServiceProviderImpl(configuration, omMetadataManager,
+            reconTaskControllerMock, new ReconUtils(), ozoneManagerProtocol);
+
+    // Should trigger delta updates.
+    ozoneManagerServiceProvider.syncDataFromOM();
+
+    ArgumentCaptor<ReconTaskStatus> captor =
+        ArgumentCaptor.forClass(ReconTaskStatus.class);
+    verify(reconTaskStatusDaoMock, times(1))
+        .update(captor.capture());
+    assertTrue(captor.getValue().getTaskName().equals("OM_DB_DELTA_UPDATES"));
+
+    verify(reconTaskControllerMock, times(1))
+        .consumeOMEvents(any(OMUpdateEventBatch.class),
+            any(OMMetadataManager.class));
+  }
+
+  private ReconTaskController getMockTaskController() {
+    ReconTaskController reconTaskControllerMock =
+        mock(ReconTaskController.class);
+    return reconTaskControllerMock;
+  }
+
+  private ReconUtils getMockReconUtils() throws IOException {
+    ReconUtils reconUtilsMock = mock(ReconUtils.class);
+    when(reconUtilsMock.getReconDbDir(any(), anyString())).thenCallRealMethod();
+    doCallRealMethod().when(reconUtilsMock).untarCheckpointFile(any(), any());
+    return reconUtilsMock;
+  }
+
+  private OzoneManagerProtocol getMockOzoneManagerClient(
+      DBUpdatesWrapper dbUpdatesWrapper) throws IOException {
+    OzoneManagerProtocol ozoneManagerProtocolMock =
+        mock(OzoneManagerProtocol.class);
+    when(ozoneManagerProtocolMock.getDBUpdates(any(OzoneManagerProtocolProtos
+        .DBUpdatesRequest.class))).thenReturn(dbUpdatesWrapper);
+    return ozoneManagerProtocolMock;
+  }
+
 }
\ No newline at end of file
diff --git a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/DummyReconDBTask.java b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/DummyReconDBTask.java
index 3073907..66be41e 100644
--- a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/DummyReconDBTask.java
+++ b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/DummyReconDBTask.java
@@ -29,13 +29,14 @@ import org.apache.hadoop.ozone.om.OMMetadataManager;
  * Dummy Recon task that has 3 modes of operations.
  * ALWAYS_FAIL / FAIL_ONCE / ALWAYS_PASS
  */
-public class DummyReconDBTask extends ReconDBUpdateTask {
+public class DummyReconDBTask implements ReconDBUpdateTask {
 
   private int numFailuresAllowed = Integer.MIN_VALUE;
   private int callCtr = 0;
+  private String taskName;
 
-  public DummyReconDBTask(String taskName, TaskType taskType) {
-    super(taskName);
+  DummyReconDBTask(String taskName, TaskType taskType) {
+    this.taskName = taskName;
     if (taskType.equals(TaskType.FAIL_ONCE)) {
       numFailuresAllowed = 1;
     } else if (taskType.equals(TaskType.ALWAYS_FAIL)) {
@@ -44,12 +45,17 @@ public class DummyReconDBTask extends ReconDBUpdateTask {
   }
 
   @Override
-  protected Collection<String> getTaskTables() {
+  public String getTaskName() {
+    return taskName;
+  }
+
+  @Override
+  public Collection<String> getTaskTables() {
     return Collections.singletonList("volumeTable");
   }
 
   @Override
-  Pair<String, Boolean> process(OMUpdateEventBatch events) {
+  public Pair<String, Boolean> process(OMUpdateEventBatch events) {
     if (++callCtr <= numFailuresAllowed) {
       return new ImmutablePair<>(getTaskName(), false);
     } else {
@@ -58,7 +64,7 @@ public class DummyReconDBTask extends ReconDBUpdateTask {
   }
 
   @Override
-  Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
+  public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
     if (++callCtr <= numFailuresAllowed) {
       return new ImmutablePair<>(getTaskName(), false);
     } else {
diff --git a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java
index 5cc9a48..383797e 100644
--- a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java
+++ b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.ozone.recon.tasks;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -28,7 +30,6 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
@@ -37,31 +38,22 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.recon.AbstractOMMetadataManagerTest;
 import org.apache.hadoop.ozone.recon.GuiceInjectorUtilsForTestsImpl;
-import org.apache.hadoop.ozone.recon.ReconUtils;
 import org.apache.hadoop.ozone.recon.api.types.ContainerKeyPrefix;
 import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
 import org.apache.hadoop.ozone.recon.spi.ContainerDBServiceProvider;
 import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl;
+import org.apache.hadoop.utils.db.Table;
 import org.hadoop.ozone.recon.schema.StatsSchemaDefinition;
 import org.jooq.impl.DSL;
 import org.jooq.impl.DefaultConfiguration;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 import com.google.inject.Injector;
 import javax.sql.DataSource;
 
 /**
  * Unit test for Container Key mapper task.
  */
-@RunWith(PowerMockRunner.class)
-@PowerMockIgnore({"javax.management.*", "javax.net.ssl.*"})
-@PrepareForTest(ReconUtils.class)
 public class TestContainerKeyMapperTask extends AbstractOMMetadataManagerTest {
 
   private ContainerDBServiceProvider containerDbServiceProvider;
@@ -77,16 +69,9 @@ public class TestContainerKeyMapperTask extends AbstractOMMetadataManagerTest {
     return injector;
   }
 
-  @Rule
-  TemporaryFolder temporaryFolder = new TemporaryFolder();
-
   private void initializeInjector() throws Exception {
     omMetadataManager = initializeNewOmMetadataManager();
-    OzoneConfiguration configuration =
-        guiceInjectorTest.getTestOzoneConfiguration(temporaryFolder);
-
-    ozoneManagerServiceProvider = new OzoneManagerServiceProviderImpl(
-        configuration);
+    ozoneManagerServiceProvider = getMockOzoneManagerServiceProvider();
     reconOMMetadataManager = getTestMetadataManager(omMetadataManager);
 
     injector = guiceInjectorTest.getInjector(
@@ -151,10 +136,8 @@ public class TestContainerKeyMapperTask extends AbstractOMMetadataManagerTest {
         Collections.singletonList(omKeyLocationInfoGroup));
 
     ContainerKeyMapperTask containerKeyMapperTask =
-        new ContainerKeyMapperTask(containerDbServiceProvider,
-        ozoneManagerServiceProvider.getOMMetadataManagerInstance());
-    containerKeyMapperTask.reprocess(ozoneManagerServiceProvider
-        .getOMMetadataManagerInstance());
+        new ContainerKeyMapperTask(containerDbServiceProvider);
+    containerKeyMapperTask.reprocess(reconOMMetadataManager);
 
     keyPrefixesForContainer =
         containerDbServiceProvider.getKeyPrefixesForContainer(1);
@@ -258,10 +241,8 @@ public class TestContainerKeyMapperTask extends AbstractOMMetadataManagerTest {
         }});
 
     ContainerKeyMapperTask containerKeyMapperTask =
-        new ContainerKeyMapperTask(containerDbServiceProvider,
-            ozoneManagerServiceProvider.getOMMetadataManagerInstance());
-    containerKeyMapperTask.reprocess(ozoneManagerServiceProvider
-        .getOMMetadataManagerInstance());
+        new ContainerKeyMapperTask(containerDbServiceProvider);
+    containerKeyMapperTask.reprocess(reconOMMetadataManager);
 
     keyPrefixesForContainer = containerDbServiceProvider
         .getKeyPrefixesForContainer(1);
@@ -317,4 +298,17 @@ public class TestContainerKeyMapperTask extends AbstractOMMetadataManagerTest {
             omKeyLocationInfoGroup))
         .build();
   }
+
+  private OzoneManagerServiceProviderImpl getMockOzoneManagerServiceProvider()
+      throws IOException {
+    OzoneManagerServiceProviderImpl omServiceProviderMock =
+        mock(OzoneManagerServiceProviderImpl.class);
+    OMMetadataManager omMetadataManagerMock = mock(OMMetadataManager.class);
+    Table tableMock = mock(Table.class);
+    when(tableMock.getName()).thenReturn("keyTable");
+    when(omMetadataManagerMock.getKeyTable()).thenReturn(tableMock);
+    when(omServiceProviderMock.getOMMetadataManagerInstance())
+      .thenReturn(omMetadataManagerMock);
+    return omServiceProviderMock;
+  }
 }
\ No newline at end of file
diff --git a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java
index 47a5d6f..4d21e4b 100644
--- a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java
+++ b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java
@@ -24,31 +24,21 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.utils.db.TypedTable;
 import org.junit.Test;
 
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
 import java.io.IOException;
 
-import static org.apache.hadoop.ozone.recon.tasks.
-    OMDBUpdateEvent.OMDBUpdateAction.PUT;
+import static org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMDBUpdateAction.PUT;
 import static org.junit.Assert.assertEquals;
 
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.times;
-import static org.powermock.api.mockito.PowerMockito.mock;
-import static org.powermock.api.mockito.PowerMockito.when;
+import static org.mockito.Mockito.when;
 
 /**
  * Unit test for File Size Count Task.
  */
-@RunWith(PowerMockRunner.class)
-@PowerMockIgnore({"javax.management.*", "javax.net.ssl.*"})
-@PrepareForTest(OmKeyInfo.class)
-
 public class TestFileSizeCountTask {
   @Test
   public void testCalculateBinIndex() {
diff --git a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java
index b587c89..6760869 100644
--- a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java
+++ b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.ozone.recon.tasks;
 
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DB_DIRS;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -28,14 +27,13 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import java.io.File;
 import java.util.Collections;
+import java.util.HashSet;
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.recon.persistence.AbstractSqlDatabaseTest;
-import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
-import org.apache.hadoop.ozone.recon.recovery.ReconOmMetadataManagerImpl;
 import org.hadoop.ozone.recon.schema.ReconInternalSchemaDefinition;
 import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao;
 import org.hadoop.ozone.recon.schema.tables.pojos.ReconTaskStatus;
@@ -50,16 +48,12 @@ import org.junit.Test;
 public class TestReconTaskControllerImpl extends AbstractSqlDatabaseTest {
 
   private ReconTaskController reconTaskController;
-
   private Configuration sqlConfiguration;
+
   @Before
   public void setUp() throws Exception {
 
-    File omDbDir = temporaryFolder.newFolder();
     OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
-    ozoneConfiguration.set(OZONE_OM_DB_DIRS, omDbDir.getAbsolutePath());
-    ReconOMMetadataManager omMetadataManager = new ReconOmMetadataManagerImpl(
-        ozoneConfiguration);
 
     sqlConfiguration = getInjector()
         .getInstance(Configuration.class);
@@ -69,7 +63,7 @@ public class TestReconTaskControllerImpl extends AbstractSqlDatabaseTest {
     schemaDefinition.initializeSchema();
 
     reconTaskController = new ReconTaskControllerImpl(ozoneConfiguration,
-        omMetadataManager, sqlConfiguration);
+        sqlConfiguration, new HashSet<>());
   }
 
   @Test
@@ -86,15 +80,17 @@ public class TestReconTaskControllerImpl extends AbstractSqlDatabaseTest {
   @Test
   public void testConsumeOMEvents() throws Exception {
 
-    ReconDBUpdateTask reconDBUpdateTaskMock = mock(ReconDBUpdateTask.class);
-    when(reconDBUpdateTaskMock.getTaskTables()).thenReturn(Collections
-        .EMPTY_LIST);
-    when(reconDBUpdateTaskMock.getTaskName()).thenReturn("MockTask");
+    ReconDBUpdateTask reconDBUpdateTaskMock = getMockTask("MockTask");
     when(reconDBUpdateTaskMock.process(any(OMUpdateEventBatch.class)))
         .thenReturn(new ImmutablePair<>("MockTask", true));
     reconTaskController.registerTask(reconDBUpdateTaskMock);
+    OMUpdateEventBatch omUpdateEventBatchMock = mock(OMUpdateEventBatch.class);
+    when(omUpdateEventBatchMock.isEmpty()).thenReturn(false);
+    when(omUpdateEventBatchMock.filter(Collections.singleton("MockTable")))
+        .thenReturn(omUpdateEventBatchMock);
     reconTaskController.consumeOMEvents(
-        new OMUpdateEventBatch(Collections.emptyList()));
+        omUpdateEventBatchMock,
+        mock(OMMetadataManager.class));
 
     verify(reconDBUpdateTaskMock, times(1))
         .process(any());
@@ -107,17 +103,13 @@ public class TestReconTaskControllerImpl extends AbstractSqlDatabaseTest {
         new DummyReconDBTask(taskName, DummyReconDBTask.TaskType.FAIL_ONCE);
     reconTaskController.registerTask(dummyReconDBTask);
 
-
-    long currentTime = System.nanoTime();
-    OMDBUpdateEvent.EventInfo eventInfoMock = mock(
-        OMDBUpdateEvent.EventInfo.class);
-    when(eventInfoMock.getSequenceNumber()).thenReturn(100L);
-    when(eventInfoMock.getEventTimestampMillis()).thenReturn(currentTime);
-
+    long currentTime = System.currentTimeMillis();
     OMUpdateEventBatch omUpdateEventBatchMock = mock(OMUpdateEventBatch.class);
-    when(omUpdateEventBatchMock.getLastEventInfo()).thenReturn(eventInfoMock);
+    when(omUpdateEventBatchMock.isEmpty()).thenReturn(false);
+    when(omUpdateEventBatchMock.getLastSequenceNumber()).thenReturn(100L);
 
-    reconTaskController.consumeOMEvents(omUpdateEventBatchMock);
+    reconTaskController.consumeOMEvents(omUpdateEventBatchMock,
+        mock(OMMetadataManager.class));
     assertFalse(reconTaskController.getRegisteredTasks().isEmpty());
     assertEquals(dummyReconDBTask, reconTaskController.getRegisteredTasks()
         .get(dummyReconDBTask.getTaskName()));
@@ -126,8 +118,8 @@ public class TestReconTaskControllerImpl extends AbstractSqlDatabaseTest {
     ReconTaskStatus dbRecord = dao.findById(taskName);
 
     Assert.assertEquals(taskName, dbRecord.getTaskName());
-    Assert.assertEquals(Long.valueOf(currentTime),
-        dbRecord.getLastUpdatedTimestamp());
+    Assert.assertTrue(
+        dbRecord.getLastUpdatedTimestamp() > currentTime);
     Assert.assertEquals(Long.valueOf(100L), dbRecord.getLastUpdatedSeqNumber());
   }
 
@@ -138,18 +130,14 @@ public class TestReconTaskControllerImpl extends AbstractSqlDatabaseTest {
         new DummyReconDBTask(taskName, DummyReconDBTask.TaskType.ALWAYS_FAIL);
     reconTaskController.registerTask(dummyReconDBTask);
 
-
-    long currentTime = System.nanoTime();
-    OMDBUpdateEvent.EventInfo eventInfoMock =
-        mock(OMDBUpdateEvent.EventInfo.class);
-    when(eventInfoMock.getSequenceNumber()).thenReturn(100L);
-    when(eventInfoMock.getEventTimestampMillis()).thenReturn(currentTime);
-
     OMUpdateEventBatch omUpdateEventBatchMock = mock(OMUpdateEventBatch.class);
-    when(omUpdateEventBatchMock.getLastEventInfo()).thenReturn(eventInfoMock);
+    when(omUpdateEventBatchMock.isEmpty()).thenReturn(false);
+    when(omUpdateEventBatchMock.getLastSequenceNumber()).thenReturn(100L);
 
+    OMMetadataManager omMetadataManagerMock = mock(OMMetadataManager.class);
     for (int i = 0; i < 2; i++) {
-      reconTaskController.consumeOMEvents(omUpdateEventBatchMock);
+      reconTaskController.consumeOMEvents(omUpdateEventBatchMock,
+          omMetadataManagerMock);
 
       assertFalse(reconTaskController.getRegisteredTasks().isEmpty());
       assertEquals(dummyReconDBTask, reconTaskController.getRegisteredTasks()
@@ -157,8 +145,8 @@ public class TestReconTaskControllerImpl extends AbstractSqlDatabaseTest {
     }
 
     //Should be blacklisted now.
-    reconTaskController.consumeOMEvents(
-        new OMUpdateEventBatch(Collections.emptyList()));
+    reconTaskController.consumeOMEvents(omUpdateEventBatchMock,
+        omMetadataManagerMock);
     assertTrue(reconTaskController.getRegisteredTasks().isEmpty());
 
     ReconTaskStatusDao dao = new ReconTaskStatusDao(sqlConfiguration);
@@ -168,4 +156,36 @@ public class TestReconTaskControllerImpl extends AbstractSqlDatabaseTest {
     Assert.assertEquals(Long.valueOf(0L), dbRecord.getLastUpdatedTimestamp());
     Assert.assertEquals(Long.valueOf(0L), dbRecord.getLastUpdatedSeqNumber());
   }
+
+
+  @Test
+  public void testReInitializeTasks() throws Exception {
+
+    OMMetadataManager omMetadataManagerMock = mock(OMMetadataManager.class);
+    ReconDBUpdateTask reconDBUpdateTaskMock =
+        getMockTask("MockTask2");
+    when(reconDBUpdateTaskMock.reprocess(omMetadataManagerMock))
+        .thenReturn(new ImmutablePair<>("MockTask2", true));
+
+    reconTaskController.registerTask(reconDBUpdateTaskMock);
+    reconTaskController.reInitializeTasks(omMetadataManagerMock);
+
+    verify(reconDBUpdateTaskMock, times(1))
+        .reprocess(omMetadataManagerMock);
+  }
+
+  /**
+   * Helper method for getting a mocked Task.
+   * @param taskName name of the task.
+   * @return instance of ReconDBUpdateTask.
+   */
+  private ReconDBUpdateTask getMockTask(String taskName) {
+    ReconDBUpdateTask reconDBUpdateTaskMock = mock(ReconDBUpdateTask.class);
+    when(reconDBUpdateTaskMock.getTaskTables()).thenReturn(Collections
+        .EMPTY_LIST);
+    when(reconDBUpdateTaskMock.getTaskName()).thenReturn(taskName);
+    when(reconDBUpdateTaskMock.getTaskTables())
+        .thenReturn(Collections.singleton("MockTable"));
+    return reconDBUpdateTaskMock;
+  }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org