You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by av...@apache.org on 2020/08/12 14:25:27 UTC

[hadoop-ozone] branch master updated: HDDS-4009. Recon Overview page: The volume, bucket and key counts are not accurate (#1305)

This is an automated email from the ASF dual-hosted git repository.

avijayan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new eb70d9e  HDDS-4009. Recon Overview page: The volume, bucket and key counts are not accurate (#1305)
eb70d9e is described below

commit eb70d9ee8482f5e2deb4dabef0d801b147dc980f
Author: Vivek Ratnavel Subramanian <vi...@gmail.com>
AuthorDate: Wed Aug 12 07:25:16 2020 -0700

    HDDS-4009. Recon Overview page: The volume, bucket and key counts are not accurate (#1305)
---
 .../apache/hadoop/hdds/utils/db/DBDefinition.java  |  27 +++
 .../org/apache/hadoop/hdds/utils/db/RDBTable.java  |  13 ++
 .../org/apache/hadoop/hdds/utils/db/Table.java     |  13 ++
 .../apache/hadoop/hdds/utils/db/TypedTable.java    |  13 ++
 .../ozone/recon/schema/StatsSchemaDefinition.java  |  12 +-
 .../recon/schema/UtilizationSchemaDefinition.java  |  13 +-
 .../apache/hadoop/ozone/recon/ReconConstants.java  |   8 +-
 .../hadoop/ozone/recon/ReconControllerModule.java  |   2 +
 .../org/apache/hadoop/ozone/recon/ReconUtils.java  |  32 ++++
 .../ozone/recon/api/ClusterStateEndpoint.java      |  47 +++---
 .../hadoop/ozone/recon/api/ContainerEndpoint.java  |   2 +-
 .../spi/impl/ContainerDBServiceProviderImpl.java   |  33 +---
 .../ozone/recon/tasks/FileSizeCountTask.java       |   4 +-
 .../hadoop/ozone/recon/tasks/OMDBUpdateEvent.java  |   2 +-
 .../ozone/recon/tasks/OMDBUpdatesHandler.java      |  92 +++++-----
 .../hadoop/ozone/recon/tasks/TableCountTask.java   | 185 +++++++++++++++++++++
 .../src/views/overview/overview.tsx                |   2 +-
 .../hadoop/ozone/recon/api/TestEndpoints.java      |  36 +++-
 .../ozone/recon/tasks/TestOMDBUpdatesHandler.java  |  54 +++++-
 .../ozone/recon/tasks/TestTableCountTask.java      | 178 ++++++++++++++++++++
 20 files changed, 633 insertions(+), 135 deletions(-)

diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBDefinition.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBDefinition.java
index 3058261..6d7d09e 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBDefinition.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBDefinition.java
@@ -21,6 +21,10 @@ package org.apache.hadoop.hdds.utils.db;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.function.Function;
+
 /**
  * Simple interface to provide information to create a DBStore..
  */
@@ -43,4 +47,27 @@ public interface DBDefinition {
    */
   DBColumnFamilyDefinition[] getColumnFamilies();
 
+  /**
+   * Get the key type class for the given table.
+   * @param table table name
+   * @return the class of key type of the given table wrapped in an
+   * {@link Optional}
+   */
+  default Optional<Class> getKeyType(String table) {
+    return Arrays.stream(getColumnFamilies()).filter(cf -> cf.getName().equals(
+        table)).map((Function<DBColumnFamilyDefinition, Class>)
+        DBColumnFamilyDefinition::getKeyType).findAny();
+  }
+
+  /**
+   * Get the value type class for the given table.
+   * @param table table name
+   * @return the class of value type of the given table wrapped in an
+   * {@link Optional}
+   */
+  default Optional<Class> getValueType(String table) {
+    return Arrays.stream(getColumnFamilies()).filter(cf -> cf.getName().equals(
+        table)).map((Function<DBColumnFamilyDefinition, Class>)
+        DBColumnFamilyDefinition::getValueType).findAny();
+  }
 }
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
index 4dbb59a..8f2a1f8 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
@@ -157,6 +157,19 @@ class RDBTable implements Table<byte[], byte[]> {
     }
   }
 
+  /**
+   * Skip checking cache and get the value mapped to the given key in byte
+   * array or returns null if the key is not found.
+   *
+   * @param bytes metadata key
+   * @return value in byte array or null if the key is not found.
+   * @throws IOException on Failure
+   */
+  @Override
+  public byte[] getSkipCache(byte[] bytes) throws IOException {
+    return get(bytes);
+  }
+
   @Override
   public byte[] getIfExist(byte[] key) throws IOException {
     try {
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java
index 72111f9..0aa852f 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java
@@ -83,6 +83,19 @@ public interface Table<KEY, VALUE> extends AutoCloseable {
 
 
   /**
+   * Skip checking cache and get the value mapped to the given key in byte
+   * array or returns null if the key is not found.
+   *
+   * @param key metadata key
+   * @return value in byte array or null if the key is not found.
+   * @throws IOException on Failure
+   */
+  default VALUE getSkipCache(KEY key) throws IOException {
+    throw new NotImplementedException("getSkipCache is not implemented");
+  }
+
+
+  /**
    * Returns the value mapped to the given key in byte array or returns null
    * if the key is not found.
    *
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
index 1451946..0d22c3f 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
@@ -178,6 +178,19 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, VALUE> {
   }
 
   /**
+   * Skip checking cache and get the value mapped to the given key in byte
+   * array or returns null if the key is not found.
+   *
+   * @param key metadata key
+   * @return value in byte array or null if the key is not found.
+   * @throws IOException on Failure
+   */
+  @Override
+  public VALUE getSkipCache(KEY key) throws IOException {
+    return getFromTable(key);
+  }
+
+  /**
    * This method returns the value if it exists in cache, if it 
    * does not, get the value from the underlying rockdb table. If it 
    * exists in cache, it returns the same reference of the cached value.
diff --git a/hadoop-ozone/recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/StatsSchemaDefinition.java b/hadoop-ozone/recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/StatsSchemaDefinition.java
index adfaca6..55f3b93 100644
--- a/hadoop-ozone/recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/StatsSchemaDefinition.java
+++ b/hadoop-ozone/recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/StatsSchemaDefinition.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -22,6 +22,7 @@ import static org.hadoop.ozone.recon.codegen.SqlDbUtils.TABLE_EXISTS_CHECK;
 
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import org.jooq.DSLContext;
 import org.jooq.impl.DSL;
 import org.jooq.impl.SQLDataType;
 
@@ -36,6 +37,7 @@ import java.sql.SQLException;
 public class StatsSchemaDefinition implements ReconSchemaDefinition {
 
   public static final String GLOBAL_STATS_TABLE_NAME = "GLOBAL_STATS";
+  private DSLContext dslContext;
   private final DataSource dataSource;
 
   @Inject
@@ -46,17 +48,17 @@ public class StatsSchemaDefinition implements ReconSchemaDefinition {
   @Override
   public void initializeSchema() throws SQLException {
     Connection conn = dataSource.getConnection();
+    dslContext = DSL.using(conn);
     if (!TABLE_EXISTS_CHECK.test(conn, GLOBAL_STATS_TABLE_NAME)) {
-      createGlobalStatsTable(conn);
+      createGlobalStatsTable();
     }
   }
 
   /**
    * Create the Ozone Global Stats table.
-   * @param conn connection
    */
-  private void createGlobalStatsTable(Connection conn) {
-    DSL.using(conn).createTableIfNotExists(GLOBAL_STATS_TABLE_NAME)
+  private void createGlobalStatsTable() {
+    dslContext.createTableIfNotExists(GLOBAL_STATS_TABLE_NAME)
         .column("key", SQLDataType.VARCHAR(255))
         .column("value", SQLDataType.BIGINT)
         .column("last_updated_timestamp", SQLDataType.TIMESTAMP)
diff --git a/hadoop-ozone/recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/UtilizationSchemaDefinition.java b/hadoop-ozone/recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/UtilizationSchemaDefinition.java
index 92de19e..b2b2881 100644
--- a/hadoop-ozone/recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/UtilizationSchemaDefinition.java
+++ b/hadoop-ozone/recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/UtilizationSchemaDefinition.java
@@ -63,14 +63,14 @@ public class UtilizationSchemaDefinition implements ReconSchemaDefinition {
     Connection conn = dataSource.getConnection();
     dslContext = DSL.using(conn);
     if (!TABLE_EXISTS_CHECK.test(conn, FILE_COUNT_BY_SIZE_TABLE_NAME)) {
-      createFileSizeCountTable(conn);
+      createFileSizeCountTable();
     }
     if (!TABLE_EXISTS_CHECK.test(conn, CLUSTER_GROWTH_DAILY_TABLE_NAME)) {
-      createClusterGrowthTable(conn);
+      createClusterGrowthTable();
     }
   }
 
-  private void createClusterGrowthTable(Connection conn) {
+  private void createClusterGrowthTable() {
     dslContext.createTableIfNotExists(CLUSTER_GROWTH_DAILY_TABLE_NAME)
         .column("timestamp", SQLDataType.TIMESTAMP)
         .column("datanode_id", SQLDataType.INTEGER)
@@ -85,7 +85,7 @@ public class UtilizationSchemaDefinition implements ReconSchemaDefinition {
         .execute();
   }
 
-  private void createFileSizeCountTable(Connection conn) {
+  private void createFileSizeCountTable() {
     dslContext.createTableIfNotExists(FILE_COUNT_BY_SIZE_TABLE_NAME)
         .column("volume", SQLDataType.VARCHAR(64))
         .column("bucket", SQLDataType.VARCHAR(64))
@@ -96,6 +96,11 @@ public class UtilizationSchemaDefinition implements ReconSchemaDefinition {
         .execute();
   }
 
+  /**
+   * Returns the DSL context.
+   *
+   * @return dslContext
+   */
   public DSLContext getDSLContext() {
     return dslContext;
   }
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
index 972399f..0cbc61f 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
@@ -32,13 +32,11 @@ public final class ReconConstants {
 
   public static final String RECON_CONTAINER_KEY_DB = "recon-container-key.db";
 
-  public static final String CONTAINER_COUNT_KEY = "totalCount";
+  public static final String CONTAINER_COUNT_KEY = "containerCount";
 
-  public static final String RECON_OM_SNAPSHOT_DB =
-      "om.snapshot.db";
+  public static final String RECON_OM_SNAPSHOT_DB = "om.snapshot.db";
 
-  public static final String CONTAINER_KEY_TABLE =
-      "containerKeyTable";
+  public static final String CONTAINER_KEY_TABLE = "containerKeyTable";
 
   public static final String CONTAINER_KEY_COUNT_TABLE =
       "containerKeyCountTable";
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
index 1199630..8991767 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.ozone.recon.spi.impl.ReconContainerDBProvider;
 import org.apache.hadoop.ozone.recon.spi.impl.StorageContainerServiceProviderImpl;
 import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTask;
 import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTask;
+import org.apache.hadoop.ozone.recon.tasks.TableCountTask;
 import org.apache.hadoop.ozone.recon.tasks.ReconOmTask;
 import org.apache.hadoop.ozone.recon.tasks.ReconTaskController;
 import org.apache.hadoop.ozone.recon.tasks.ReconTaskControllerImpl;
@@ -116,6 +117,7 @@ public class ReconControllerModule extends AbstractModule {
           Multibinder.newSetBinder(binder(), ReconOmTask.class);
       taskBinder.addBinding().to(ContainerKeyMapperTask.class);
       taskBinder.addBinding().to(FileSizeCountTask.class);
+      taskBinder.addBinding().to(TableCountTask.class);
     }
   }
 
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
index cba7428..b6e4d7c 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
@@ -29,6 +29,7 @@ import java.net.URL;
 import java.net.URLConnection;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.sql.Timestamp;
 import java.util.zip.GZIPOutputStream;
 
 import org.apache.hadoop.hdds.HddsConfigKeys;
@@ -44,8 +45,14 @@ import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
 import static org.apache.hadoop.hdds.server.ServerUtils.getDirectoryFromConfig;
 import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
 import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_DB_DIR;
+import static org.jooq.impl.DSL.currentTimestamp;
+import static org.jooq.impl.DSL.select;
+import static org.jooq.impl.DSL.using;
 
 import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.hadoop.ozone.recon.schema.tables.daos.GlobalStatsDao;
+import org.hadoop.ozone.recon.schema.tables.pojos.GlobalStats;
+import org.jooq.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -266,4 +273,29 @@ public class ReconUtils {
         new File(reconDbDir.getPath(), lastKnownSnapshotFileName);
   }
 
+  /**
+   * Upsert row in GlobalStats table.
+   *
+   * @param sqlConfiguration
+   * @param globalStatsDao
+   * @param key
+   * @param count
+   */
+  public static void upsertGlobalStatsTable(Configuration sqlConfiguration,
+                                            GlobalStatsDao globalStatsDao,
+                                            String key,
+                                            Long count) {
+    // Get the current timestamp
+    Timestamp now =
+        using(sqlConfiguration).fetchValue(select(currentTimestamp()));
+    GlobalStats record = globalStatsDao.fetchOneByKey(key);
+    GlobalStats newRecord = new GlobalStats(key, count, now);
+
+    // Insert a new record for key if it does not exist
+    if (record == null) {
+      globalStatsDao.insert(newRecord);
+    } else {
+      globalStatsDao.update(newRecord);
+    }
+  }
 }
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ClusterStateEndpoint.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ClusterStateEndpoint.java
index 918ee18..de0028c 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ClusterStateEndpoint.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ClusterStateEndpoint.java
@@ -24,10 +24,12 @@ import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
 import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
 import org.apache.hadoop.ozone.recon.api.types.ClusterStateResponse;
 import org.apache.hadoop.ozone.recon.api.types.DatanodeStorageReport;
-import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
 import org.apache.hadoop.ozone.recon.scm.ReconContainerManager;
 import org.apache.hadoop.ozone.recon.scm.ReconNodeManager;
 import org.apache.hadoop.ozone.recon.scm.ReconPipelineManager;
+import org.apache.hadoop.ozone.recon.tasks.TableCountTask;
+import org.hadoop.ozone.recon.schema.tables.daos.GlobalStatsDao;
+import org.hadoop.ozone.recon.schema.tables.pojos.GlobalStats;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,6 +41,10 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import java.util.List;
 
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.BUCKET_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.VOLUME_TABLE;
+
 /**
  * Endpoint to fetch current state of ozone cluster.
  */
@@ -52,17 +58,17 @@ public class ClusterStateEndpoint {
   private ReconNodeManager nodeManager;
   private ReconPipelineManager pipelineManager;
   private ReconContainerManager containerManager;
-  private ReconOMMetadataManager omMetadataManager;
+  private GlobalStatsDao globalStatsDao;
 
   @Inject
   ClusterStateEndpoint(OzoneStorageContainerManager reconSCM,
-                       ReconOMMetadataManager omMetadataManager) {
+                       GlobalStatsDao globalStatsDao) {
     this.nodeManager =
         (ReconNodeManager) reconSCM.getScmNodeManager();
     this.pipelineManager = (ReconPipelineManager) reconSCM.getPipelineManager();
     this.containerManager =
         (ReconContainerManager) reconSCM.getContainerManager();
-    this.omMetadataManager = omMetadataManager;
+    this.globalStatsDao = globalStatsDao;
   }
 
   /**
@@ -80,25 +86,20 @@ public class ClusterStateEndpoint {
         new DatanodeStorageReport(stats.getCapacity().get(),
             stats.getScmUsed().get(), stats.getRemaining().get());
     ClusterStateResponse.Builder builder = ClusterStateResponse.newBuilder();
-    if (omMetadataManager.isOmTablesInitialized()) {
-      try {
-        builder.setVolumes(
-            omMetadataManager.getVolumeTable().getEstimatedKeyCount());
-      } catch (Exception ex) {
-        LOG.error("Unable to get Volumes count in ClusterStateResponse.", ex);
-      }
-      try {
-        builder.setBuckets(
-            omMetadataManager.getBucketTable().getEstimatedKeyCount());
-      } catch (Exception ex) {
-        LOG.error("Unable to get Buckets count in ClusterStateResponse.", ex);
-      }
-      try {
-        builder.setKeys(
-            omMetadataManager.getKeyTable().getEstimatedKeyCount());
-      } catch (Exception ex) {
-        LOG.error("Unable to get Keys count in ClusterStateResponse.", ex);
-      }
+    GlobalStats volumeRecord = globalStatsDao.findById(
+        TableCountTask.getRowKeyFromTable(VOLUME_TABLE));
+    GlobalStats bucketRecord = globalStatsDao.findById(
+        TableCountTask.getRowKeyFromTable(BUCKET_TABLE));
+    GlobalStats keyRecord = globalStatsDao.findById(
+        TableCountTask.getRowKeyFromTable(KEY_TABLE));
+    if (volumeRecord != null) {
+      builder.setVolumes(volumeRecord.getValue());
+    }
+    if (bucketRecord != null) {
+      builder.setBuckets(bucketRecord.getValue());
+    }
+    if (keyRecord != null) {
+      builder.setKeys(keyRecord.getValue());
     }
     ClusterStateResponse response = builder
         .setStorageReport(storageReport)
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java
index c534062..1778b84 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java
@@ -157,7 +157,7 @@ public class ContainerEndpoint {
         // Directly calling get() on the Key table instead of iterating since
         // only full keys are supported now. When we change to using a prefix
         // of the key, this needs to change to prefix seek.
-        OmKeyInfo omKeyInfo = omMetadataManager.getKeyTable().get(
+        OmKeyInfo omKeyInfo = omMetadataManager.getKeyTable().getSkipCache(
             containerKeyPrefix.getKeyPrefix());
         if (null != omKeyInfo) {
           // Filter keys by version.
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ContainerDBServiceProviderImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ContainerDBServiceProviderImpl.java
index aeefeef..6360cf2 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ContainerDBServiceProviderImpl.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ContainerDBServiceProviderImpl.java
@@ -22,13 +22,9 @@ import static org.apache.hadoop.ozone.recon.ReconConstants.CONTAINER_COUNT_KEY;
 import static org.apache.hadoop.ozone.recon.spi.impl.ReconContainerDBProvider.getNewDBStore;
 import static org.apache.hadoop.ozone.recon.spi.impl.ReconDBDefinition.CONTAINER_KEY;
 import static org.apache.hadoop.ozone.recon.spi.impl.ReconDBDefinition.CONTAINER_KEY_COUNT;
-import static org.jooq.impl.DSL.currentTimestamp;
-import static org.jooq.impl.DSL.select;
-import static org.jooq.impl.DSL.using;
 
 import java.io.File;
 import java.io.IOException;
-import java.sql.Timestamp;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
@@ -38,6 +34,7 @@ import javax.inject.Singleton;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.recon.ReconUtils;
 import org.apache.hadoop.ozone.recon.api.types.ContainerKeyPrefix;
 import org.apache.hadoop.ozone.recon.api.types.ContainerMetadata;
 import org.apache.hadoop.ozone.recon.spi.ContainerDBServiceProvider;
@@ -152,7 +149,7 @@ public class ContainerDBServiceProviderImpl
    *
    * @param containerKeyPrefix the containerID, key-prefix tuple.
    * @param count Count of the keys matching that prefix.
-   * @throws IOException
+   * @throws IOException on failure.
    */
   @Override
   public void storeContainerKeyMapping(ContainerKeyPrefix containerKeyPrefix,
@@ -166,7 +163,7 @@ public class ContainerDBServiceProviderImpl
    *
    * @param containerID the containerID.
    * @param count count of the keys within the given containerID.
-   * @throws IOException
+   * @throws IOException on failure.
    */
   @Override
   public void storeContainerKeyCount(Long containerID, Long count)
@@ -179,7 +176,7 @@ public class ContainerDBServiceProviderImpl
    *
    * @param containerID the given containerID.
    * @return count of keys within the given containerID.
-   * @throws IOException
+   * @throws IOException on failure.
    */
   @Override
   public long getKeyCountForContainer(Long containerID) throws IOException {
@@ -192,7 +189,7 @@ public class ContainerDBServiceProviderImpl
    *
    * @param containerID the given containerID.
    * @return if the given ContainerID exists or not.
-   * @throws IOException
+   * @throws IOException on failure.
    */
   @Override
   public boolean doesContainerExists(Long containerID) throws IOException {
@@ -205,7 +202,7 @@ public class ContainerDBServiceProviderImpl
    *
    * @param containerKeyPrefix the containerID, key-prefix tuple.
    * @return count of keys matching the containerID, key-prefix.
-   * @throws IOException
+   * @throws IOException on failure.
    */
   @Override
   public Integer getCountForContainerKeyPrefix(
@@ -305,7 +302,7 @@ public class ContainerDBServiceProviderImpl
    * @param prevContainer containerID after which the
    *                      list of containers are scanned.
    * @return Map of containerID -> containerMetadata.
-   * @throws IOException
+   * @throws IOException on failure.
    */
   @Override
   public Map<Long, ContainerMetadata> getContainers(int limit,
@@ -385,20 +382,8 @@ public class ContainerDBServiceProviderImpl
    */
   @Override
   public void storeContainerCount(Long count) {
-    // Get the current timestamp
-    Timestamp now =
-        using(sqlConfiguration).fetchValue(select(currentTimestamp()));
-    GlobalStats containerCountRecord =
-        globalStatsDao.fetchOneByKey(CONTAINER_COUNT_KEY);
-    GlobalStats globalStatsRecord =
-        new GlobalStats(CONTAINER_COUNT_KEY, count, now);
-
-    // Insert a new record for CONTAINER_COUNT_KEY if it does not exist
-    if (containerCountRecord == null) {
-      globalStatsDao.insert(globalStatsRecord);
-    } else {
-      globalStatsDao.update(globalStatsRecord);
-    }
+    ReconUtils.upsertGlobalStatsTable(sqlConfiguration, globalStatsDao,
+        CONTAINER_COUNT_KEY, count);
   }
 
   /**
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTask.java
index 80b2526..7092c54 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTask.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTask.java
@@ -243,8 +243,8 @@ public class FileSizeCountTask implements ReconOmTask {
                                     Map<FileSizeCountKey, Long>
                                         fileSizeCountMap) {
     if (omKeyInfo == null) {
-      LOG.warn("Unexpected error while handling DELETE key event. Key not " +
-          "found in Recon OM DB : {}", key);
+      LOG.warn("Deleting a key not found while handling DELETE key event. Key" +
+          " not found in Recon OM DB : {}", key);
     } else {
       FileSizeCountKey countKey = getFileSizeCountKey(omKeyInfo);
       Long count = fileSizeCountMap.containsKey(countKey) ?
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdateEvent.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdateEvent.java
index 949439c..f32b04e 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdateEvent.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdateEvent.java
@@ -22,7 +22,7 @@ import java.util.Objects;
 
 /**
  * A class used to encapsulate a single OM DB update event.
- * Currently only PUT and DELETE are supported.
+ * Currently PUT, DELETE and UPDATE are supported.
  * @param <KEY> Type of Key.
  * @param <VALUE> Type of Value.
  */
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdatesHandler.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdatesHandler.java
index 82e1ae8..34c4c33 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdatesHandler.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdatesHandler.java
@@ -18,9 +18,6 @@
 
 package org.apache.hadoop.ozone.recon.tasks;
 
-import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.BUCKET_TABLE;
-import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE;
-import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.VOLUME_TABLE;
 import static org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMDBUpdateAction.DELETE;
 import static org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMDBUpdateAction.PUT;
 import static org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMDBUpdateAction.UPDATE;
@@ -29,13 +26,12 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
+import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
-import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
-import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.codec.OMDBDefinition;
 import org.apache.hadoop.hdds.utils.db.CodecRegistry;
-import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.WriteBatch;
 import org.slf4j.Logger;
@@ -53,16 +49,17 @@ public class OMDBUpdatesHandler extends WriteBatch.Handler {
   private CodecRegistry codecRegistry;
   private OMMetadataManager omMetadataManager;
   private List<OMDBUpdateEvent> omdbUpdateEvents = new ArrayList<>();
+  private OMDBDefinition omdbDefinition;
 
   public OMDBUpdatesHandler(OMMetadataManager metadataManager) {
     omMetadataManager = metadataManager;
     tablesNames = metadataManager.getStore().getTableNames();
     codecRegistry = metadataManager.getStore().getCodecRegistry();
+    omdbDefinition = new OMDBDefinition();
   }
 
   @Override
-  public void put(int cfIndex, byte[] keyBytes, byte[] valueBytes) throws
-      RocksDBException {
+  public void put(int cfIndex, byte[] keyBytes, byte[] valueBytes) {
     try {
       processEvent(cfIndex, keyBytes, valueBytes,
           OMDBUpdateEvent.OMDBUpdateAction.PUT);
@@ -72,7 +69,7 @@ public class OMDBUpdatesHandler extends WriteBatch.Handler {
   }
 
   @Override
-  public void delete(int cfIndex, byte[] keyBytes) throws RocksDBException {
+  public void delete(int cfIndex, byte[] keyBytes) {
     try {
       processEvent(cfIndex, keyBytes, null,
           OMDBUpdateEvent.OMDBUpdateAction.DELETE);
@@ -93,41 +90,44 @@ public class OMDBUpdatesHandler extends WriteBatch.Handler {
       valueBytes, OMDBUpdateEvent.OMDBUpdateAction action)
       throws IOException {
     String tableName = tablesNames.get(cfIndex);
-    Class<String> keyType = getKeyType();
-    Class valueType = getValueType(tableName);
-    if (valueType != null) {
+    Optional<Class> keyType = omdbDefinition.getKeyType(tableName);
+    Optional<Class> valueType = omdbDefinition.getValueType(tableName);
+    if (keyType.isPresent() && valueType.isPresent()) {
       OMDBUpdateEvent.OMUpdateEventBuilder builder =
           new OMDBUpdateEvent.OMUpdateEventBuilder<>();
       builder.setTable(tableName);
       builder.setAction(action);
-
-      String key = codecRegistry.asObject(keyBytes, keyType);
+      String key = (String) codecRegistry.asObject(keyBytes, keyType.get());
       builder.setKey(key);
 
+      // Put new
+      // Put existing --> Update
+      // Delete existing
+      // Delete non-existing
+      Table table = omMetadataManager.getTable(tableName);
+      // Recon does not add entries to cache and it is safer to always use
+      // getSkipCache in Recon.
+      Object oldValue = table.getSkipCache(key);
       if (action == PUT) {
-        Object value = codecRegistry.asObject(valueBytes, valueType);
+        Object value = codecRegistry.asObject(valueBytes, valueType.get());
         builder.setValue(value);
-        // If a PUT key operation happens on an existing Key, it is tagged
+        // If a PUT operation happens on an existing Key, it is tagged
         // as an "UPDATE" event.
-        if (tableName.equalsIgnoreCase(KEY_TABLE)) {
-          if (omMetadataManager.getKeyTable().isExist(key)) {
-            OmKeyInfo omKeyInfo = omMetadataManager.getKeyTable().get(key);
-            builder.setOldValue(omKeyInfo);
-            builder.setAction(UPDATE);
-          }
+        if (oldValue != null) {
+          builder.setOldValue(oldValue);
+          builder.setAction(UPDATE);
         }
       } else if (action.equals(DELETE)) {
-        // When you delete a Key, we add the old OmKeyInfo to the event so that
+        // When you delete a Key, we add the old value to the event so that
         // a downstream task can use it.
-        if (tableName.equalsIgnoreCase(KEY_TABLE)) {
-          OmKeyInfo omKeyInfo = omMetadataManager.getKeyTable().get(key);
-          builder.setValue(omKeyInfo);
-        }
+        builder.setValue(oldValue);
       }
 
       OMDBUpdateEvent event = builder.build();
-      LOG.debug("Generated OM update Event for table : " + event.getTable()
-          + ", Key = " + event.getKey() + ", action = " + event.getAction());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format("Generated OM update Event for table : %s, " +
+            "action = %s", tableName, action));
+      }
       if (omdbUpdateEvents.contains(event)) {
         // If the same event is part of this batch, the last one only holds.
         // For example, if there are 2 PUT key1 events, then the first one
@@ -135,6 +135,13 @@ public class OMDBUpdatesHandler extends WriteBatch.Handler {
         omdbUpdateEvents.remove(event);
       }
       omdbUpdateEvents.add(event);
+    } else {
+      // key type or value type cannot be determined for this table.
+      // log a warn message and ignore the update.
+      if (LOG.isWarnEnabled()) {
+        LOG.warn(String.format("KeyType or ValueType could not be determined" +
+            " for table %s. Ignoring the event.", tableName));
+      }
     }
   }
 
@@ -262,35 +269,10 @@ public class OMDBUpdatesHandler extends WriteBatch.Handler {
   }
 
   /**
-   * Return Key type class for a given table name.
-   * @param name table name.
-   * @return String.class by default.
-   */
-  private Class<String> getKeyType() {
-    return String.class;
-  }
-
-  /**
-   * Return Value type class for a given table.
-   * @param name table name
-   * @return Value type based on table name.
-   */
-  @VisibleForTesting
-  protected Class getValueType(String name) {
-    switch (name) {
-    case KEY_TABLE : return OmKeyInfo.class;
-    case VOLUME_TABLE : return OmVolumeArgs.class;
-    case BUCKET_TABLE : return OmBucketInfo.class;
-    default: return null;
-    }
-  }
-
-  /**
    * Get List of events.
    * @return List of events.
    */
   public List<OMDBUpdateEvent> getEvents() {
     return omdbUpdateEvents;
   }
-
 }
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/TableCountTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/TableCountTask.java
new file mode 100644
index 0000000..2621529
--- /dev/null
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/TableCountTask.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.tasks;
+
+import com.google.inject.Inject;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.recon.ReconUtils;
+import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
+import org.hadoop.ozone.recon.schema.tables.daos.GlobalStatsDao;
+import org.hadoop.ozone.recon.schema.tables.pojos.GlobalStats;
+import org.jooq.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+/**
+ * Class to iterate over the OM DB and store the total counts of volumes,
+ * buckets, keys, open keys, deleted keys, etc.
+ */
+public class TableCountTask implements ReconOmTask {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TableCountTask.class);
+
+  private GlobalStatsDao globalStatsDao;
+  private Configuration sqlConfiguration;
+  private ReconOMMetadataManager reconOMMetadataManager;
+
+  @Inject
+  public TableCountTask(GlobalStatsDao globalStatsDao,
+                        Configuration sqlConfiguration,
+                        ReconOMMetadataManager reconOMMetadataManager) {
+    this.globalStatsDao = globalStatsDao;
+    this.sqlConfiguration = sqlConfiguration;
+    this.reconOMMetadataManager = reconOMMetadataManager;
+  }
+
+  /**
+   * Iterate the rows of each table in OM snapshot DB and calculate the
+   * counts for each table.
+   *
+   * @param omMetadataManager OM Metadata instance.
+   * @return Pair
+   */
+  @Override
+  public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
+    for (String tableName : getTaskTables()) {
+      Table table = omMetadataManager.getTable(tableName);
+      try (TableIterator keyIter = table.iterator()) {
+        long count = getCount(keyIter);
+        ReconUtils.upsertGlobalStatsTable(sqlConfiguration, globalStatsDao,
+            getRowKeyFromTable(tableName),
+            count);
+      } catch (IOException ioEx) {
+        LOG.error("Unable to populate Table Count in Recon DB.", ioEx);
+        return new ImmutablePair<>(getTaskName(), false);
+      }
+    }
+    LOG.info("Completed a 'reprocess' run of TableCountTask.");
+    return new ImmutablePair<>(getTaskName(), true);
+  }
+
+  private long getCount(Iterator iterator) {
+    long count = 0L;
+    while (iterator.hasNext()) {
+      count++;
+      iterator.next();
+    }
+    return count;
+  }
+
+  @Override
+  public String getTaskName() {
+    return "TableCountTask";
+  }
+
+  @Override
+  public Collection<String> getTaskTables() {
+    return new ArrayList<>(reconOMMetadataManager.listTableNames());
+  }
+
+  /**
+   * Read the update events and update the count of respective object
+   * (volume, bucket, key etc.) based on the action (put or delete).
+   *
+   * @param events Update events - PUT, DELETE and UPDATE.
+   * @return Pair
+   */
+  @Override
+  public Pair<String, Boolean> process(OMUpdateEventBatch events) {
+    Iterator<OMDBUpdateEvent> eventIterator = events.getIterator();
+
+    HashMap<String, Long> objectCountMap = initializeCountMap();
+
+    while (eventIterator.hasNext()) {
+      OMDBUpdateEvent<String, Object> omdbUpdateEvent = eventIterator.next();
+      String rowKey = getRowKeyFromTable(omdbUpdateEvent.getTable());
+      try{
+        switch (omdbUpdateEvent.getAction()) {
+        case PUT:
+          objectCountMap.computeIfPresent(rowKey, (k, count) -> count + 1L);
+          break;
+
+        case DELETE:
+          // if value is null, it means that the volume / bucket / key
+          // is already deleted and does not exist in the OM database anymore.
+          if (omdbUpdateEvent.getValue() != null) {
+            String key = getRowKeyFromTable(omdbUpdateEvent.getTable());
+            objectCountMap.computeIfPresent(key,
+                (k, count) -> count > 0 ? count - 1L : 0L);
+          }
+          break;
+
+        default: LOG.trace("Skipping DB update event : Table: {}, Action: {}",
+            omdbUpdateEvent.getTable(), omdbUpdateEvent.getAction());
+        }
+      } catch (Exception e) {
+        LOG.error("Unexpected exception while processing the table {}, " +
+                "Action: {}", omdbUpdateEvent.getTable(),
+            omdbUpdateEvent.getAction(), e);
+        return new ImmutablePair<>(getTaskName(), false);
+      }
+    }
+    for (Entry<String, Long> entry: objectCountMap.entrySet()) {
+      ReconUtils.upsertGlobalStatsTable(sqlConfiguration, globalStatsDao,
+          entry.getKey(),
+          entry.getValue());
+    }
+
+    LOG.info("Completed a 'process' run of TableCountTask.");
+    return new ImmutablePair<>(getTaskName(), true);
+  }
+
+  private HashMap<String, Long> initializeCountMap() {
+    Collection<String> tables = getTaskTables();
+    HashMap<String, Long> objectCountMap = new HashMap<>(tables.size());
+    for (String tableName: tables) {
+      String key = getRowKeyFromTable(tableName);
+      objectCountMap.put(key, getCountForKey(key));
+    }
+    return objectCountMap;
+  }
+
+  public static String getRowKeyFromTable(String tableName) {
+    return tableName + "Count";
+  }
+
+  /**
+   * Get the count stored for the given key from Global Stats table.
+   * Return 0 if record not found.
+   *
+   * @param key Key in the global stats table
+   * @return count
+   */
+  private long getCountForKey(String key) {
+    GlobalStats record = globalStatsDao.fetchOneByKey(key);
+
+    return (record == null) ? 0L : record.getValue();
+  }
+}
diff --git a/hadoop-ozone/recon/src/main/resources/webapps/recon/ozone-recon-web/src/views/overview/overview.tsx b/hadoop-ozone/recon/src/main/resources/webapps/recon/ozone-recon-web/src/views/overview/overview.tsx
index 4497d71..bd92f5f 100644
--- a/hadoop-ozone/recon/src/main/resources/webapps/recon/ozone-recon-web/src/views/overview/overview.tsx
+++ b/hadoop-ozone/recon/src/main/resources/webapps/recon/ozone-recon-web/src/views/overview/overview.tsx
@@ -180,7 +180,7 @@ export class Overview extends React.Component<Record<string, object>, IOverviewS
             <OverviewCard loading={loading} title='Buckets' data={buckets.toString()} icon='folder-open'/>
           </Col>
           <Col xs={24} sm={18} md={12} lg={12} xl={6}>
-            <OverviewCard loading={loading} title='Keys (Estimated)' data={keys.toString()} icon='file-text'/>
+            <OverviewCard loading={loading} title='Keys' data={keys.toString()} icon='file-text'/>
           </Col>
         </Row>
       </div>
diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java
index a592119..f1350a9 100644
--- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java
+++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java
@@ -59,10 +59,14 @@ import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
 import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl;
 import org.apache.hadoop.ozone.recon.spi.impl.StorageContainerServiceProviderImpl;
 import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTask;
+import org.apache.hadoop.ozone.recon.tasks.TableCountTask;
 import org.apache.hadoop.test.LambdaTestUtils;
 import org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition;
 import org.hadoop.ozone.recon.schema.tables.daos.FileCountBySizeDao;
+import org.hadoop.ozone.recon.schema.tables.daos.GlobalStatsDao;
 import org.hadoop.ozone.recon.schema.tables.pojos.FileCountBySize;
+import org.jooq.Configuration;
+import org.jooq.DSLContext;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
@@ -74,6 +78,7 @@ import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getRandom
 import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getTestReconOmMetadataManager;
 import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.initializeNewOmMetadataManager;
 import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeDataToOm;
+import static org.hadoop.ozone.recon.schema.tables.GlobalStatsTable.GLOBAL_STATS;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.BDDMockito.given;
@@ -97,6 +102,7 @@ public class TestEndpoints extends AbstractReconSqlDBTest {
   private UtilizationEndpoint utilizationEndpoint;
   private ReconOMMetadataManager reconOMMetadataManager;
   private FileSizeCountTask fileSizeCountTask;
+  private TableCountTask tableCountTask;
   private ReconStorageContainerManagerFacade reconScm;
   private boolean isSetupDone = false;
   private String pipelineId;
@@ -107,6 +113,7 @@ public class TestEndpoints extends AbstractReconSqlDBTest {
   private DatanodeDetailsProto datanodeDetailsProto;
   private Pipeline pipeline;
   private FileCountBySizeDao fileCountBySizeDao;
+  private DSLContext dslContext;
   private final String host1 = "host1.datanode";
   private final String host2 = "host2.datanode";
   private final String ip1 = "1.1.1.1";
@@ -166,17 +173,23 @@ public class TestEndpoints extends AbstractReconSqlDBTest {
 
     nodeEndpoint = reconTestInjector.getInstance(NodeEndpoint.class);
     pipelineEndpoint = reconTestInjector.getInstance(PipelineEndpoint.class);
-    clusterStateEndpoint =
-        reconTestInjector.getInstance(ClusterStateEndpoint.class);
     fileCountBySizeDao = getDao(FileCountBySizeDao.class);
+    GlobalStatsDao globalStatsDao = getDao(GlobalStatsDao.class);
     UtilizationSchemaDefinition utilizationSchemaDefinition =
         getSchemaDefinition(UtilizationSchemaDefinition.class);
+    Configuration sqlConfiguration =
+        reconTestInjector.getInstance(Configuration.class);
     utilizationEndpoint = new UtilizationEndpoint(
         fileCountBySizeDao, utilizationSchemaDefinition);
     fileSizeCountTask =
         new FileSizeCountTask(fileCountBySizeDao, utilizationSchemaDefinition);
+    tableCountTask = new TableCountTask(
+        globalStatsDao, sqlConfiguration, reconOMMetadataManager);
     reconScm = (ReconStorageContainerManagerFacade)
         reconTestInjector.getInstance(OzoneStorageContainerManager.class);
+    clusterStateEndpoint =
+        new ClusterStateEndpoint(reconScm, globalStatsDao);
+    dslContext = getDslContext();
   }
 
   @Before
@@ -305,6 +318,9 @@ public class TestEndpoints extends AbstractReconSqlDBTest {
 
     // key = key_three
     writeDataToOm(reconOMMetadataManager, "key_three");
+
+    // Truncate global stats table before running each test
+    dslContext.truncate(GLOBAL_STATS);
   }
 
   private void testDatanodeResponse(DatanodeMetadata datanodeMetadata)
@@ -415,9 +431,9 @@ public class TestEndpoints extends AbstractReconSqlDBTest {
         (ClusterStateResponse) response.getEntity();
 
     Assert.assertEquals(1, clusterStateResponse.getPipelines());
-    Assert.assertEquals(2, clusterStateResponse.getVolumes());
-    Assert.assertEquals(2, clusterStateResponse.getBuckets());
-    Assert.assertEquals(3, clusterStateResponse.getKeys());
+    Assert.assertEquals(0, clusterStateResponse.getVolumes());
+    Assert.assertEquals(0, clusterStateResponse.getBuckets());
+    Assert.assertEquals(0, clusterStateResponse.getKeys());
     Assert.assertEquals(2, clusterStateResponse.getTotalDatanodes());
     Assert.assertEquals(2, clusterStateResponse.getHealthyDatanodes());
 
@@ -427,6 +443,16 @@ public class TestEndpoints extends AbstractReconSqlDBTest {
           (ClusterStateResponse) response1.getEntity();
       return (clusterStateResponse1.getContainers() == 1);
     });
+
+    // check volume, bucket and key count after running table count task
+    Pair<String, Boolean> result =
+        tableCountTask.reprocess(reconOMMetadataManager);
+    assertTrue(result.getRight());
+    response = clusterStateEndpoint.getClusterState();
+    clusterStateResponse = (ClusterStateResponse) response.getEntity();
+    Assert.assertEquals(2, clusterStateResponse.getVolumes());
+    Assert.assertEquals(2, clusterStateResponse.getBuckets());
+    Assert.assertEquals(3, clusterStateResponse.getKeys());
   }
 
   @Test
diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOMDBUpdatesHandler.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOMDBUpdatesHandler.java
index d1cb1e9..92c797b 100644
--- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOMDBUpdatesHandler.java
+++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOMDBUpdatesHandler.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -35,10 +35,12 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.server.ServerUtils;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.codec.OMDBDefinition;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -54,6 +56,8 @@ public class TestOMDBUpdatesHandler {
   @Rule
   public TemporaryFolder folder = new TemporaryFolder();
 
+  private OMDBDefinition omdbDefinition = new OMDBDefinition();
+
   private OzoneConfiguration createNewTestPath() throws IOException {
     OzoneConfiguration configuration = new OzoneConfiguration();
     File newFolder = folder.newFolder();
@@ -149,6 +153,7 @@ public class TestOMDBUpdatesHandler {
 
     // Write 1 volume, 1 key into source and target OM DBs.
     String volumeKey = metaMgr.getVolumeKey("sampleVol");
+    String nonExistVolumeKey = metaMgr.getVolumeKey("nonExistingVolume");
     OmVolumeArgs args =
         OmVolumeArgs.newBuilder()
             .setVolume("sampleVol")
@@ -165,6 +170,9 @@ public class TestOMDBUpdatesHandler {
     // Delete the volume and key from target DB.
     metaMgr.getKeyTable().delete("/sampleVol/bucketOne/key_one");
     metaMgr.getVolumeTable().delete(volumeKey);
+    // Delete a non-existing volume and key
+    metaMgr.getKeyTable().delete("/sampleVol/bucketOne/key_two");
+    metaMgr.getVolumeTable().delete(metaMgr.getVolumeKey("nonExistingVolume"));
 
     RDBStore rdbStore = (RDBStore) metaMgr.getStore();
     RocksDB rocksDB = rdbStore.getDb();
@@ -191,7 +199,7 @@ public class TestOMDBUpdatesHandler {
     }
 
     List<OMDBUpdateEvent> events = omdbUpdatesHandler.getEvents();
-    assertTrue(events.size() == 2);
+    assertEquals(4, events.size());
 
     OMDBUpdateEvent keyEvent = events.get(0);
     assertEquals(OMDBUpdateEvent.OMDBUpdateAction.DELETE, keyEvent.getAction());
@@ -201,7 +209,35 @@ public class TestOMDBUpdatesHandler {
     OMDBUpdateEvent volEvent = events.get(1);
     assertEquals(OMDBUpdateEvent.OMDBUpdateAction.DELETE, volEvent.getAction());
     assertEquals(volumeKey, volEvent.getKey());
-    assertNull(volEvent.getValue());
+    assertNotNull(volEvent.getValue());
+    OmVolumeArgs volumeInfo = (OmVolumeArgs) volEvent.getValue();
+    assertEquals("sampleVol", volumeInfo.getVolume());
+
+    // Assert the values of non existent keys are set to null.
+    OMDBUpdateEvent nonExistKey = events.get(2);
+    assertEquals(OMDBUpdateEvent.OMDBUpdateAction.DELETE,
+        nonExistKey.getAction());
+    assertEquals("/sampleVol/bucketOne/key_two", nonExistKey.getKey());
+    assertNull(nonExistKey.getValue());
+
+    OMDBUpdateEvent nonExistVolume = events.get(3);
+    assertEquals(OMDBUpdateEvent.OMDBUpdateAction.DELETE,
+        nonExistVolume.getAction());
+    assertEquals(nonExistVolumeKey, nonExistVolume.getKey());
+    assertNull(nonExistVolume.getValue());
+  }
+
+  @Test
+  public void testGetKeyType() throws IOException {
+    OzoneConfiguration configuration = createNewTestPath();
+    OmMetadataManagerImpl metaMgr = new OmMetadataManagerImpl(configuration);
+    OMDBUpdatesHandler omdbUpdatesHandler =
+        new OMDBUpdatesHandler(metaMgr);
+
+    assertEquals(String.class, omdbDefinition.getKeyType(
+        metaMgr.getKeyTable().getName()).get());
+    assertEquals(OzoneTokenIdentifier.class, omdbDefinition.getKeyType(
+        metaMgr.getDelegationTokenTable().getName()).get());
   }
 
   @Test
@@ -211,12 +247,12 @@ public class TestOMDBUpdatesHandler {
     OMDBUpdatesHandler omdbUpdatesHandler =
         new OMDBUpdatesHandler(metaMgr);
 
-    assertEquals(OmKeyInfo.class, omdbUpdatesHandler.getValueType(
-        metaMgr.getKeyTable().getName()));
-    assertEquals(OmVolumeArgs.class, omdbUpdatesHandler.getValueType(
-        metaMgr.getVolumeTable().getName()));
-    assertEquals(OmBucketInfo.class, omdbUpdatesHandler.getValueType(
-        metaMgr.getBucketTable().getName()));
+    assertEquals(OmKeyInfo.class, omdbDefinition.getValueType(
+        metaMgr.getKeyTable().getName()).get());
+    assertEquals(OmVolumeArgs.class, omdbDefinition.getValueType(
+        metaMgr.getVolumeTable().getName()).get());
+    assertEquals(OmBucketInfo.class, omdbDefinition.getValueType(
+        metaMgr.getBucketTable().getName()).get());
   }
 
   private OmKeyInfo getOmKeyInfo(String volumeName, String bucketName,
diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestTableCountTask.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestTableCountTask.java
new file mode 100644
index 0000000..94d7673
--- /dev/null
+++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestTableCountTask.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.tasks;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.utils.db.TypedTable;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.recon.persistence.AbstractReconSqlDBTest;
+import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
+import org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMUpdateEventBuilder;
+
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.BUCKET_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DELETED_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_KEY_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.VOLUME_TABLE;
+import org.hadoop.ozone.recon.schema.tables.daos.GlobalStatsDao;
+import org.jooq.DSLContext;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getTestReconOmMetadataManager;
+import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.initializeNewOmMetadataManager;
+import static org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMDBUpdateAction.DELETE;
+import static org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMDBUpdateAction.PUT;
+import static org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMDBUpdateAction.UPDATE;
+import static org.hadoop.ozone.recon.schema.tables.GlobalStatsTable.GLOBAL_STATS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit test for Object Count Task.
+ */
+public class TestTableCountTask extends AbstractReconSqlDBTest {
+
+  private GlobalStatsDao globalStatsDao;
+  private TableCountTask tableCountTask;
+  private DSLContext dslContext;
+  private boolean isSetupDone = false;
+
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  private void initializeInjector() throws IOException {
+    ReconOMMetadataManager omMetadataManager = getTestReconOmMetadataManager(
+        initializeNewOmMetadataManager(temporaryFolder.newFolder()),
+        temporaryFolder.newFolder());
+    globalStatsDao = getDao(GlobalStatsDao.class);
+    tableCountTask = new TableCountTask(globalStatsDao, getConfiguration(),
+            omMetadataManager);
+    dslContext = getDslContext();
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    // The following setup runs only once
+    if (!isSetupDone) {
+      initializeInjector();
+      isSetupDone = true;
+    }
+    // Truncate table before running each test
+    dslContext.truncate(GLOBAL_STATS);
+  }
+
+  @Test
+  public void testReprocess() {
+    OMMetadataManager omMetadataManager = mock(OmMetadataManagerImpl.class);
+    // Mock 5 rows in each table and test the count
+    for (String tableName: tableCountTask.getTaskTables()) {
+      TypedTable<String, Object> table = mock(TypedTable.class);
+      TypedTable.TypedTableIterator mockIter = mock(TypedTable
+          .TypedTableIterator.class);
+      when(table.iterator()).thenReturn(mockIter);
+      when(omMetadataManager.getTable(tableName)).thenReturn(table);
+      when(mockIter.hasNext())
+          .thenReturn(true)
+          .thenReturn(true)
+          .thenReturn(true)
+          .thenReturn(true)
+          .thenReturn(true)
+          .thenReturn(false);
+    }
+
+    Pair<String, Boolean> result = tableCountTask.reprocess(omMetadataManager);
+    assertTrue(result.getRight());
+
+    assertEquals(5L, getCountForTable(KEY_TABLE));
+    assertEquals(5L, getCountForTable(VOLUME_TABLE));
+    assertEquals(5L, getCountForTable(BUCKET_TABLE));
+    assertEquals(5L, getCountForTable(OPEN_KEY_TABLE));
+    assertEquals(5L, getCountForTable(DELETED_TABLE));
+  }
+
+  @Test
+  public void testProcess() {
+    ArrayList<OMDBUpdateEvent> events = new ArrayList<>();
+    // Create 5 put, 1 delete and 1 update event for each table
+    for (String tableName: tableCountTask.getTaskTables()) {
+      for (int i=0; i<5; i++) {
+        events.add(getOMUpdateEvent("item" + i, null, tableName, PUT));
+      }
+      // for delete event, if value is set to null, the counter will not be
+      // decremented. This is because the value will be null if item does not
+      // exist in the database and there is no need to delete.
+      events.add(getOMUpdateEvent("item0", mock(OmKeyInfo.class), tableName,
+          DELETE));
+      events.add(getOMUpdateEvent("item1", null, tableName, UPDATE));
+    }
+    OMUpdateEventBatch omUpdateEventBatch = new OMUpdateEventBatch(events);
+    tableCountTask.process(omUpdateEventBatch);
+
+    // Verify 4 items in each table. (5 puts - 1 delete + 0 update)
+    assertEquals(4L, getCountForTable(KEY_TABLE));
+    assertEquals(4L, getCountForTable(VOLUME_TABLE));
+    assertEquals(4L, getCountForTable(BUCKET_TABLE));
+    assertEquals(4L, getCountForTable(OPEN_KEY_TABLE));
+    assertEquals(4L, getCountForTable(DELETED_TABLE));
+
+    // add a new key and simulate delete on non-existing item (value: null)
+    ArrayList<OMDBUpdateEvent> newEvents = new ArrayList<>();
+    for (String tableName: tableCountTask.getTaskTables()) {
+      newEvents.add(getOMUpdateEvent("item5", null, tableName, PUT));
+      // This delete event should be a noop since value is null
+      newEvents.add(getOMUpdateEvent("item0", null, tableName, DELETE));
+    }
+
+    omUpdateEventBatch = new OMUpdateEventBatch(newEvents);
+    tableCountTask.process(omUpdateEventBatch);
+
+    // Verify 5 items in each table. (1 new put + 0 delete)
+    assertEquals(5L, getCountForTable(KEY_TABLE));
+    assertEquals(5L, getCountForTable(VOLUME_TABLE));
+    assertEquals(5L, getCountForTable(BUCKET_TABLE));
+    assertEquals(5L, getCountForTable(OPEN_KEY_TABLE));
+    assertEquals(5L, getCountForTable(DELETED_TABLE));
+  }
+
+  private OMDBUpdateEvent getOMUpdateEvent(String name, Object value,
+                                           String table,
+                           OMDBUpdateEvent.OMDBUpdateAction action) {
+    return new OMUpdateEventBuilder()
+        .setAction(action)
+        .setKey(name)
+        .setValue(value)
+        .setTable(table)
+        .build();
+  }
+
+  private long getCountForTable(String tableName) {
+    String key = TableCountTask.getRowKeyFromTable(tableName);
+    return globalStatsDao.findById(key).getValue();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org