You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by vi...@apache.org on 2020/06/03 22:18:34 UTC

[hadoop-ozone] branch master updated: HDDS-3681. Recon: Add support to store file size counts in each volume/bucket (#988)

This is an automated email from the ASF dual-hosted git repository.

vivekratnavel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new aec7a93  HDDS-3681. Recon: Add support to store file size counts in each volume/bucket (#988)
aec7a93 is described below

commit aec7a9345e8cecdacc1367562fdd82dd4dfc34df
Author: Vivek Ratnavel Subramanian <vi...@gmail.com>
AuthorDate: Wed Jun 3 15:18:25 2020 -0700

    HDDS-3681. Recon: Add support to store file size counts in each volume/bucket (#988)
---
 .../recon/schema/UtilizationSchemaDefinition.java  |  19 +-
 .../apache/hadoop/ozone/recon/ReconConstants.java  |   3 +
 .../hadoop/ozone/recon/ReconControllerModule.java  |   3 +-
 .../ozone/recon/api/UtilizationEndpoint.java       | 101 ++++++
 .../hadoop/ozone/recon/api/UtilizationService.java |  52 ----
 .../ozone/recon/tasks/FileSizeCountTask.java       | 213 +++++++------
 .../hadoop/ozone/recon/api/TestEndpoints.java      | 122 ++++++++
 .../TestUtilizationSchemaDefinition.java           |  26 +-
 .../ozone/recon/tasks/TestFileSizeCountTask.java   | 339 ++++++++++++++++-----
 9 files changed, 643 insertions(+), 235 deletions(-)

diff --git a/hadoop-ozone/recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/UtilizationSchemaDefinition.java b/hadoop-ozone/recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/UtilizationSchemaDefinition.java
index 941a3c6..92de19e 100644
--- a/hadoop-ozone/recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/UtilizationSchemaDefinition.java
+++ b/hadoop-ozone/recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/UtilizationSchemaDefinition.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -26,6 +26,7 @@ import javax.sql.DataSource;
 
 import com.google.inject.Singleton;
 
+import org.jooq.DSLContext;
 import org.jooq.impl.DSL;
 import org.jooq.impl.SQLDataType;
 import org.slf4j.Logger;
@@ -44,6 +45,7 @@ public class UtilizationSchemaDefinition implements ReconSchemaDefinition {
       LoggerFactory.getLogger(UtilizationSchemaDefinition.class);
 
   private final DataSource dataSource;
+  private DSLContext dslContext;
 
   public static final String CLUSTER_GROWTH_DAILY_TABLE_NAME =
       "CLUSTER_GROWTH_DAILY";
@@ -59,6 +61,7 @@ public class UtilizationSchemaDefinition implements ReconSchemaDefinition {
   @Transactional
   public void initializeSchema() throws SQLException {
     Connection conn = dataSource.getConnection();
+    dslContext = DSL.using(conn);
     if (!TABLE_EXISTS_CHECK.test(conn, FILE_COUNT_BY_SIZE_TABLE_NAME)) {
       createFileSizeCountTable(conn);
     }
@@ -68,7 +71,7 @@ public class UtilizationSchemaDefinition implements ReconSchemaDefinition {
   }
 
   private void createClusterGrowthTable(Connection conn) {
-    DSL.using(conn).createTableIfNotExists(CLUSTER_GROWTH_DAILY_TABLE_NAME)
+    dslContext.createTableIfNotExists(CLUSTER_GROWTH_DAILY_TABLE_NAME)
         .column("timestamp", SQLDataType.TIMESTAMP)
         .column("datanode_id", SQLDataType.INTEGER)
         .column("datanode_host", SQLDataType.VARCHAR(1024))
@@ -83,11 +86,17 @@ public class UtilizationSchemaDefinition implements ReconSchemaDefinition {
   }
 
   private void createFileSizeCountTable(Connection conn) {
-    DSL.using(conn).createTableIfNotExists(FILE_COUNT_BY_SIZE_TABLE_NAME)
+    dslContext.createTableIfNotExists(FILE_COUNT_BY_SIZE_TABLE_NAME)
+        .column("volume", SQLDataType.VARCHAR(64))
+        .column("bucket", SQLDataType.VARCHAR(64))
         .column("file_size", SQLDataType.BIGINT)
         .column("count", SQLDataType.BIGINT)
-        .constraint(DSL.constraint("pk_file_size")
-            .primaryKey("file_size"))
+        .constraint(DSL.constraint("pk_volume_bucket_file_size")
+            .primaryKey("volume", "bucket", "file_size"))
         .execute();
   }
+
+  public DSLContext getDSLContext() {
+    return dslContext;
+  }
 }
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
index bf3457a..b0a5c0b 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
@@ -48,6 +48,9 @@ public final class ReconConstants {
   public static final String RECON_QUERY_PREVKEY = "prevKey";
   public static final String PREV_CONTAINER_ID_DEFAULT_VALUE = "0";
   public static final String RECON_QUERY_LIMIT = "limit";
+  public static final String RECON_QUERY_VOLUME = "volume";
+  public static final String RECON_QUERY_BUCKET = "bucket";
+  public static final String RECON_QUERY_FILE_SIZE = "fileSize";
 
   public static final String RECON_SCM_CONTAINER_DB =
       "recon-" + CONTAINER_DB_SUFFIX;
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
index 5ec0e3b..050ad03 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
@@ -136,7 +136,8 @@ public class ReconControllerModule extends AbstractModule {
       RECON_DAO_LIST.forEach(aClass -> {
         try {
           bind(aClass).toConstructor(
-              (Constructor) aClass.getConstructor(Configuration.class));
+              (Constructor) aClass.getConstructor(Configuration.class))
+              .in(Singleton.class);
         } catch (NoSuchMethodException e) {
           LOG.error("Error creating DAO {} ", aClass.getSimpleName(), e);
         }
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/UtilizationEndpoint.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/UtilizationEndpoint.java
new file mode 100644
index 0000000..55f9055
--- /dev/null
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/UtilizationEndpoint.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.api;
+
+import javax.inject.Inject;
+
+import org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition;
+import org.hadoop.ozone.recon.schema.tables.daos.FileCountBySizeDao;
+import org.hadoop.ozone.recon.schema.tables.pojos.FileCountBySize;
+import org.jooq.DSLContext;
+import org.jooq.Record3;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_BUCKET;
+import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_FILE_SIZE;
+import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_VOLUME;
+import static org.hadoop.ozone.recon.schema.tables.FileCountBySizeTable.FILE_COUNT_BY_SIZE;
+
+/**
+ * Endpoint for querying the counts of a certain file Size.
+ */
+@Path("/utilization")
+@Produces(MediaType.APPLICATION_JSON)
+public class UtilizationEndpoint {
+
+  private FileCountBySizeDao fileCountBySizeDao;
+  private UtilizationSchemaDefinition utilizationSchemaDefinition;
+
+  @Inject
+  public UtilizationEndpoint(FileCountBySizeDao fileCountBySizeDao,
+                             UtilizationSchemaDefinition
+                                 utilizationSchemaDefinition) {
+    this.utilizationSchemaDefinition = utilizationSchemaDefinition;
+    this.fileCountBySizeDao = fileCountBySizeDao;
+  }
+
+  /**
+   * Return the file counts from Recon DB.
+   * @return {@link Response}
+   */
+  @GET
+  @Path("/fileCount")
+  public Response getFileCounts(
+      @QueryParam(RECON_QUERY_VOLUME)
+          String volume,
+      @QueryParam(RECON_QUERY_BUCKET)
+          String bucket,
+      @QueryParam(RECON_QUERY_FILE_SIZE)
+          long fileSize
+  ) {
+    DSLContext dslContext = utilizationSchemaDefinition.getDSLContext();
+    List<FileCountBySize> resultSet;
+    if (volume != null && bucket != null && fileSize > 0) {
+      Record3<String, String, Long> recordToFind = dslContext
+          .newRecord(FILE_COUNT_BY_SIZE.VOLUME,
+              FILE_COUNT_BY_SIZE.BUCKET,
+              FILE_COUNT_BY_SIZE.FILE_SIZE)
+          .value1(volume)
+          .value2(bucket)
+          .value3(fileSize);
+      FileCountBySize record = fileCountBySizeDao.findById(recordToFind);
+      resultSet = record != null ?
+          Collections.singletonList(record) : Collections.emptyList();
+    } else if (volume != null && bucket != null) {
+      resultSet = dslContext.select().from(FILE_COUNT_BY_SIZE)
+          .where(FILE_COUNT_BY_SIZE.VOLUME.eq(volume))
+          .and(FILE_COUNT_BY_SIZE.BUCKET.eq(bucket))
+          .fetchInto(FileCountBySize.class);
+    } else if (volume != null) {
+      resultSet = fileCountBySizeDao.fetchByVolume(volume);
+    } else {
+      // fetch all records
+      resultSet = fileCountBySizeDao.findAll();
+    }
+    return Response.ok(resultSet).build();
+  }
+}
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/UtilizationService.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/UtilizationService.java
deleted file mode 100644
index 5801f8f..0000000
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/UtilizationService.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.recon.api;
-
-import javax.inject.Inject;
-import org.hadoop.ozone.recon.schema.tables.daos.FileCountBySizeDao;
-import org.hadoop.ozone.recon.schema.tables.pojos.FileCountBySize;
-
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import java.util.List;
-
-/**
- * Endpoint for querying the counts of a certain file Size.
- */
-@Path("/utilization")
-@Produces(MediaType.APPLICATION_JSON)
-public class UtilizationService {
-
-  @Inject
-  private FileCountBySizeDao fileCountBySizeDao;
-
-  /**
-   * Return the file counts from Recon DB.
-   * @return {@link Response}
-   */
-  @GET
-  @Path("/fileCount")
-  public Response getFileCounts() {
-    List<FileCountBySize> resultSet = fileCountBySizeDao.findAll();
-    return Response.ok(resultSet).build();
-  }
-}
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTask.java
index d855747..80b2526 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTask.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTask.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.ozone.recon.tasks;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.inject.Inject;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
@@ -26,19 +25,23 @@ import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition;
 import org.hadoop.ozone.recon.schema.tables.daos.FileCountBySizeDao;
 import org.hadoop.ozone.recon.schema.tables.pojos.FileCountBySize;
+import org.jooq.DSLContext;
+import org.jooq.Record3;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
-import java.util.List;
+import java.util.Map;
 
 import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE;
+import static org.hadoop.ozone.recon.schema.tables.FileCountBySizeTable.FILE_COUNT_BY_SIZE;
 
 /**
  * Class to iterate over the OM DB and store the counts of existing/new
@@ -49,33 +52,26 @@ public class FileSizeCountTask implements ReconOmTask {
   private static final Logger LOG =
       LoggerFactory.getLogger(FileSizeCountTask.class);
 
-  private int maxBinSize = -1;
-  private long maxFileSizeUpperBound = 1125899906842624L; // 1 PB
-  private long[] upperBoundCount;
-  private long oneKb = 1024L;
+  // 1125899906842624L = 1PB
+  private static final long MAX_FILE_SIZE_UPPER_BOUND = 1125899906842624L;
   private FileCountBySizeDao fileCountBySizeDao;
+  private DSLContext dslContext;
 
   @Inject
-  public FileSizeCountTask(FileCountBySizeDao fileCountBySizeDao) {
+  public FileSizeCountTask(FileCountBySizeDao fileCountBySizeDao,
+                           UtilizationSchemaDefinition
+                               utilizationSchemaDefinition) {
     this.fileCountBySizeDao = fileCountBySizeDao;
-    upperBoundCount = new long[getMaxBinSize()];
-  }
-
-  long getOneKB() {
-    return oneKb;
-  }
-
-  long getMaxFileSizeUpperBound() {
-    return maxFileSizeUpperBound;
+    this.dslContext = utilizationSchemaDefinition.getDSLContext();
   }
 
-  int getMaxBinSize() {
-    if (maxBinSize == -1) {
-      // extra bin to add files > 1PB.
-      // 1 KB (2 ^ 10) is the smallest tracked file.
-      maxBinSize = nextClosestPowerIndexOfTwo(maxFileSizeUpperBound) - 10 + 1;
+  private static int nextClosestPowerIndexOfTwo(long dataSize) {
+    int index = 0;
+    while(dataSize != 0) {
+      dataSize >>= 1;
+      index += 1;
     }
-    return maxBinSize;
+    return index;
   }
 
   /**
@@ -88,17 +84,20 @@ public class FileSizeCountTask implements ReconOmTask {
   @Override
   public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
     Table<String, OmKeyInfo> omKeyInfoTable = omMetadataManager.getKeyTable();
+    Map<FileSizeCountKey, Long> fileSizeCountMap = new HashMap<>();
     try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
         keyIter = omKeyInfoTable.iterator()) {
       while (keyIter.hasNext()) {
         Table.KeyValue<String, OmKeyInfo> kv = keyIter.next();
-        handlePutKeyEvent(kv.getValue());
+        handlePutKeyEvent(kv.getValue(), fileSizeCountMap);
       }
     } catch (IOException ioEx) {
       LOG.error("Unable to populate File Size Count in Recon DB. ", ioEx);
       return new ImmutablePair<>(getTaskName(), false);
     }
-    writeCountsToDB();
+    // Truncate table before inserting new rows
+    dslContext.truncate(FILE_COUNT_BY_SIZE);
+    writeCountsToDB(true, fileSizeCountMap);
 
     LOG.info("Completed a 'reprocess' run of FileSizeCountTask.");
     return new ImmutablePair<>(getTaskName(), true);
@@ -114,19 +113,6 @@ public class FileSizeCountTask implements ReconOmTask {
     return Collections.singletonList(KEY_TABLE);
   }
 
-  private void readCountsFromDB() {
-    // Read - Write operations to DB are in ascending order
-    // of file size upper bounds.
-    List<FileCountBySize> resultSet = fileCountBySizeDao.findAll();
-    int index = 0;
-    if (resultSet != null) {
-      for (FileCountBySize row : resultSet) {
-        upperBoundCount[index] = row.getCount();
-        index++;
-      }
-    }
-  }
-
   /**
    * Read the Keys from update events and update the count of files
    * pertaining to a certain upper bound.
@@ -137,9 +123,8 @@ public class FileSizeCountTask implements ReconOmTask {
   @Override
   public Pair<String, Boolean> process(OMUpdateEventBatch events) {
     Iterator<OMDBUpdateEvent> eventIterator = events.getIterator();
+    Map<FileSizeCountKey, Long> fileSizeCountMap = new HashMap<>();
 
-    //update array with file size count from DB
-    readCountsFromDB();
     while (eventIterator.hasNext()) {
       OMDBUpdateEvent<String, OmKeyInfo> omdbUpdateEvent = eventIterator.next();
       String updatedKey = omdbUpdateEvent.getKey();
@@ -148,16 +133,17 @@ public class FileSizeCountTask implements ReconOmTask {
       try{
         switch (omdbUpdateEvent.getAction()) {
         case PUT:
-          handlePutKeyEvent(omKeyInfo);
+          handlePutKeyEvent(omKeyInfo, fileSizeCountMap);
           break;
 
         case DELETE:
-          handleDeleteKeyEvent(updatedKey, omKeyInfo);
+          handleDeleteKeyEvent(updatedKey, omKeyInfo, fileSizeCountMap);
           break;
 
         case UPDATE:
-          handleDeleteKeyEvent(updatedKey, omdbUpdateEvent.getOldValue());
-          handlePutKeyEvent(omKeyInfo);
+          handleDeleteKeyEvent(updatedKey, omdbUpdateEvent.getOldValue(),
+              fileSizeCountMap);
+          handlePutKeyEvent(omKeyInfo, fileSizeCountMap);
           break;
 
         default: LOG.trace("Skipping DB update event : {}",
@@ -169,36 +155,20 @@ public class FileSizeCountTask implements ReconOmTask {
         return new ImmutablePair<>(getTaskName(), false);
       }
     }
-    writeCountsToDB();
+    writeCountsToDB(false, fileSizeCountMap);
     LOG.info("Completed a 'process' run of FileSizeCountTask.");
     return new ImmutablePair<>(getTaskName(), true);
   }
 
-  /**
-   * Calculate the bin index based on size of the Key.
-   * index is calculated as the number of right shifts
-   * needed until dataSize becomes zero.
-   *
-   * @param dataSize Size of the key.
-   * @return int bin index in upperBoundCount
-   */
-  public int calculateBinIndex(long dataSize) {
-    if (dataSize >= getMaxFileSizeUpperBound()) {
-      return getMaxBinSize() - 1;
+  private long getFileSizeUpperBound(long fileSize) {
+    if (fileSize >= MAX_FILE_SIZE_UPPER_BOUND) {
+      return Long.MAX_VALUE;
     }
-    int index = nextClosestPowerIndexOfTwo(dataSize);
+    int index = nextClosestPowerIndexOfTwo(fileSize);
     // The smallest file size being tracked for count
     // is 1 KB i.e. 1024 = 2 ^ 10.
-    return index < 10 ? 0 : index - 10;
-  }
-
-  int nextClosestPowerIndexOfTwo(long dataSize) {
-    int index = 0;
-    while(dataSize != 0) {
-      dataSize >>= 1;
-      index += 1;
-    }
-    return index;
+    int binIndex = index < 10 ? 0 : index - 10;
+    return (long) Math.pow(2, (10 + binIndex));
   }
 
   /**
@@ -206,61 +176,108 @@ public class FileSizeCountTask implements ReconOmTask {
    * using the dao.
    *
    */
-  void writeCountsToDB() {
-    for (int i = 0; i < upperBoundCount.length; i++) {
-      long fileSizeUpperBound = (i == upperBoundCount.length - 1) ?
-          Long.MAX_VALUE : (long) Math.pow(2, (10 + i));
-      FileCountBySize fileCountRecord =
-          fileCountBySizeDao.findById(fileSizeUpperBound);
-      FileCountBySize newRecord = new
-          FileCountBySize(fileSizeUpperBound, upperBoundCount[i]);
-      if (fileCountRecord == null) {
+  private void writeCountsToDB(boolean isDbTruncated,
+                               Map<FileSizeCountKey, Long> fileSizeCountMap) {
+    fileSizeCountMap.keySet().forEach((FileSizeCountKey key) -> {
+      FileCountBySize newRecord = new FileCountBySize();
+      newRecord.setVolume(key.volume);
+      newRecord.setBucket(key.bucket);
+      newRecord.setFileSize(key.fileSizeUpperBound);
+      newRecord.setCount(fileSizeCountMap.get(key));
+      if (!isDbTruncated) {
+        // Get the current count from database and update
+        Record3<String, String, Long> recordToFind =
+            dslContext.newRecord(
+                FILE_COUNT_BY_SIZE.VOLUME,
+                FILE_COUNT_BY_SIZE.BUCKET,
+                FILE_COUNT_BY_SIZE.FILE_SIZE)
+                .value1(key.volume)
+                .value2(key.bucket)
+                .value3(key.fileSizeUpperBound);
+        FileCountBySize fileCountRecord =
+            fileCountBySizeDao.findById(recordToFind);
+        if (fileCountRecord == null && newRecord.getCount() > 0L) {
+          // insert new row only for non-zero counts.
+          fileCountBySizeDao.insert(newRecord);
+        } else if (fileCountRecord != null) {
+          newRecord.setCount(fileCountRecord.getCount() +
+              fileSizeCountMap.get(key));
+          fileCountBySizeDao.update(newRecord);
+        }
+      } else if (newRecord.getCount() > 0) {
+        // insert new row only for non-zero counts.
         fileCountBySizeDao.insert(newRecord);
-      } else {
-        fileCountBySizeDao.update(newRecord);
       }
-    }
+    });
+  }
+
+  private FileSizeCountKey getFileSizeCountKey(OmKeyInfo omKeyInfo) {
+    return new FileSizeCountKey(omKeyInfo.getVolumeName(),
+        omKeyInfo.getBucketName(),
+        getFileSizeUpperBound(omKeyInfo.getDataSize()));
   }
 
   /**
    * Calculate and update the count of files being tracked by
-   * upperBoundCount[].
+   * fileSizeCountMap.
    * Used by reprocess() and process().
    *
    * @param omKeyInfo OmKey being updated for count
    */
-  void handlePutKeyEvent(OmKeyInfo omKeyInfo) {
-    int binIndex = calculateBinIndex(omKeyInfo.getDataSize());
-    upperBoundCount[binIndex]++;
+  private void handlePutKeyEvent(OmKeyInfo omKeyInfo,
+                                 Map<FileSizeCountKey, Long> fileSizeCountMap) {
+    FileSizeCountKey key = getFileSizeCountKey(omKeyInfo);
+    Long count = fileSizeCountMap.containsKey(key) ?
+        fileSizeCountMap.get(key) + 1L : 1L;
+    fileSizeCountMap.put(key, count);
   }
 
   /**
    * Calculate and update the count of files being tracked by
-   * upperBoundCount[].
+   * fileSizeCountMap.
    * Used by reprocess() and process().
    *
    * @param omKeyInfo OmKey being updated for count
    */
-  void handleDeleteKeyEvent(String key, OmKeyInfo omKeyInfo) {
+  private void handleDeleteKeyEvent(String key, OmKeyInfo omKeyInfo,
+                                    Map<FileSizeCountKey, Long>
+                                        fileSizeCountMap) {
     if (omKeyInfo == null) {
       LOG.warn("Unexpected error while handling DELETE key event. Key not " +
           "found in Recon OM DB : {}", key);
     } else {
-      int binIndex = calculateBinIndex(omKeyInfo.getDataSize());
-      if (upperBoundCount[binIndex] > 0) {
-        //decrement only if it had files before, default DB value is 0
-        upperBoundCount[binIndex]--;
-      } else {
-        LOG.warn("Unexpected error while updating bin count. Found 0 count " +
-            "for index : {} while processing DELETE event for {}", binIndex,
-            omKeyInfo.getKeyName());
-      }
+      FileSizeCountKey countKey = getFileSizeCountKey(omKeyInfo);
+      Long count = fileSizeCountMap.containsKey(countKey) ?
+          fileSizeCountMap.get(countKey) - 1L : -1L;
+      fileSizeCountMap.put(countKey, count);
     }
   }
 
-  @VisibleForTesting
-  protected long[] getUpperBoundCount() {
-    return upperBoundCount;
-  }
+  private static class FileSizeCountKey {
+    private String volume;
+    private String bucket;
+    private Long fileSizeUpperBound;
 
+    FileSizeCountKey(String volume, String bucket,
+                     Long fileSizeUpperBound) {
+      this.volume = volume;
+      this.bucket = bucket;
+      this.fileSizeUpperBound = fileSizeUpperBound;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if(obj instanceof FileSizeCountKey) {
+        FileSizeCountKey s = (FileSizeCountKey) obj;
+        return volume.equals(s.volume) && bucket.equals(s.bucket) &&
+            fileSizeUpperBound.equals(s.fileSizeUpperBound);
+      }
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      return (volume  + bucket + fileSizeUpperBound).hashCode();
+    }
+  }
 }
diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java
index 9234131..40d3ba6 100644
--- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java
+++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ozone.recon.api;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
@@ -37,7 +38,11 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
 import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
+import org.apache.hadoop.hdds.utils.db.TypedTable;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.recon.ReconTestInjector;
 import org.apache.hadoop.ozone.recon.api.types.ClusterStateResponse;
@@ -52,7 +57,11 @@ import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade;
 import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
 import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl;
 import org.apache.hadoop.ozone.recon.spi.impl.StorageContainerServiceProviderImpl;
+import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTask;
 import org.apache.hadoop.test.LambdaTestUtils;
+import org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition;
+import org.hadoop.ozone.recon.schema.tables.daos.FileCountBySizeDao;
+import org.hadoop.ozone.recon.schema.tables.pojos.FileCountBySize;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
@@ -64,12 +73,16 @@ import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getRandom
 import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getTestReconOmMetadataManager;
 import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.initializeNewOmMetadataManager;
 import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeDataToOm;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.BDDMockito.given;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import javax.ws.rs.core.Response;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 
@@ -80,7 +93,9 @@ public class TestEndpoints extends AbstractReconSqlDBTest {
   private NodeEndpoint nodeEndpoint;
   private PipelineEndpoint pipelineEndpoint;
   private ClusterStateEndpoint clusterStateEndpoint;
+  private UtilizationEndpoint utilizationEndpoint;
   private ReconOMMetadataManager reconOMMetadataManager;
+  private FileSizeCountTask fileSizeCountTask;
   private ReconStorageContainerManagerFacade reconScm;
   private boolean isSetupDone = false;
   private String pipelineId;
@@ -90,6 +105,7 @@ public class TestEndpoints extends AbstractReconSqlDBTest {
   private ContainerReportsProto containerReportsProto;
   private DatanodeDetailsProto datanodeDetailsProto;
   private Pipeline pipeline;
+  private FileCountBySizeDao fileCountBySizeDao;
   private final String host1 = "host1.datanode";
   private final String host2 = "host2.datanode";
   private final String ip1 = "1.1.1.1";
@@ -143,6 +159,7 @@ public class TestEndpoints extends AbstractReconSqlDBTest {
             .addBinding(ClusterStateEndpoint.class)
             .addBinding(NodeEndpoint.class)
             .addBinding(ContainerSchemaManager.class)
+            .addBinding(UtilizationEndpoint.class)
             .addBinding(StorageContainerLocationProtocol.class, mockScmClient)
             .build();
 
@@ -150,6 +167,13 @@ public class TestEndpoints extends AbstractReconSqlDBTest {
     pipelineEndpoint = reconTestInjector.getInstance(PipelineEndpoint.class);
     clusterStateEndpoint =
         reconTestInjector.getInstance(ClusterStateEndpoint.class);
+    fileCountBySizeDao = getDao(FileCountBySizeDao.class);
+    UtilizationSchemaDefinition utilizationSchemaDefinition =
+        getSchemaDefinition(UtilizationSchemaDefinition.class);
+    utilizationEndpoint = new UtilizationEndpoint(
+        fileCountBySizeDao, utilizationSchemaDefinition);
+    fileSizeCountTask =
+        new FileSizeCountTask(fileCountBySizeDao, utilizationSchemaDefinition);
     reconScm = (ReconStorageContainerManagerFacade)
         reconTestInjector.getInstance(OzoneStorageContainerManager.class);
   }
@@ -397,6 +421,104 @@ public class TestEndpoints extends AbstractReconSqlDBTest {
     });
   }
 
+  @Test
+  public void testGetFileCounts() throws Exception {
+    OmKeyInfo omKeyInfo1 = mock(OmKeyInfo.class);
+    given(omKeyInfo1.getKeyName()).willReturn("key1");
+    given(omKeyInfo1.getVolumeName()).willReturn("vol1");
+    given(omKeyInfo1.getBucketName()).willReturn("bucket1");
+    given(omKeyInfo1.getDataSize()).willReturn(1000L);
+
+    OmKeyInfo omKeyInfo2 = mock(OmKeyInfo.class);
+    given(omKeyInfo2.getKeyName()).willReturn("key2");
+    given(omKeyInfo2.getVolumeName()).willReturn("vol1");
+    given(omKeyInfo2.getBucketName()).willReturn("bucket1");
+    given(omKeyInfo2.getDataSize()).willReturn(100000L);
+
+    OmKeyInfo omKeyInfo3 = mock(OmKeyInfo.class);
+    given(omKeyInfo3.getKeyName()).willReturn("key1");
+    given(omKeyInfo3.getVolumeName()).willReturn("vol2");
+    given(omKeyInfo3.getBucketName()).willReturn("bucket1");
+    given(omKeyInfo3.getDataSize()).willReturn(1000L);
+
+    OMMetadataManager omMetadataManager = mock(OmMetadataManagerImpl.class);
+    TypedTable<String, OmKeyInfo> keyTable = mock(TypedTable.class);
+
+    TypedTable.TypedTableIterator mockKeyIter = mock(TypedTable
+        .TypedTableIterator.class);
+    TypedTable.TypedKeyValue mockKeyValue = mock(
+        TypedTable.TypedKeyValue.class);
+
+    when(keyTable.iterator()).thenReturn(mockKeyIter);
+    when(omMetadataManager.getKeyTable()).thenReturn(keyTable);
+    when(mockKeyIter.hasNext())
+        .thenReturn(true)
+        .thenReturn(true)
+        .thenReturn(true)
+        .thenReturn(false);
+    when(mockKeyIter.next()).thenReturn(mockKeyValue);
+    when(mockKeyValue.getValue())
+        .thenReturn(omKeyInfo1)
+        .thenReturn(omKeyInfo2)
+        .thenReturn(omKeyInfo3);
+
+    Pair<String, Boolean> result =
+        fileSizeCountTask.reprocess(omMetadataManager);
+    assertTrue(result.getRight());
+
+    assertEquals(3, fileCountBySizeDao.count());
+    Response response = utilizationEndpoint.getFileCounts(null, null, 0);
+    List<FileCountBySize> resultSet =
+        (List<FileCountBySize>) response.getEntity();
+    assertEquals(3, resultSet.size());
+    assertTrue(resultSet.stream().anyMatch(o -> o.getVolume().equals("vol1") &&
+        o.getBucket().equals("bucket1") && o.getFileSize() == 1024L &&
+        o.getCount() == 1L));
+    assertTrue(resultSet.stream().anyMatch(o -> o.getVolume().equals("vol1") &&
+        o.getBucket().equals("bucket1") && o.getFileSize() == 131072 &&
+        o.getCount() == 1L));
+    assertTrue(resultSet.stream().anyMatch(o -> o.getVolume().equals("vol2") &&
+        o.getBucket().equals("bucket1") && o.getFileSize() == 1024L &&
+        o.getCount() == 1L));
+
+    // Test for "volume" query param
+    response = utilizationEndpoint.getFileCounts("vol1", null, 0);
+    resultSet = (List<FileCountBySize>) response.getEntity();
+    assertEquals(2, resultSet.size());
+    assertTrue(resultSet.stream().allMatch(o -> o.getVolume().equals("vol1")));
+
+    // Test for non-existent volume
+    response = utilizationEndpoint.getFileCounts("vol", null, 0);
+    resultSet = (List<FileCountBySize>) response.getEntity();
+    assertEquals(0, resultSet.size());
+
+    // Test for "volume" + "bucket" query param
+    response = utilizationEndpoint.getFileCounts("vol1", "bucket1", 0);
+    resultSet = (List<FileCountBySize>) response.getEntity();
+    assertEquals(2, resultSet.size());
+    assertTrue(resultSet.stream().allMatch(o -> o.getVolume().equals("vol1") &&
+        o.getBucket().equals("bucket1")));
+
+    // Test for non-existent bucket
+    response = utilizationEndpoint.getFileCounts("vol1", "bucket", 0);
+    resultSet = (List<FileCountBySize>) response.getEntity();
+    assertEquals(0, resultSet.size());
+
+    // Test for "volume" + "bucket" + "fileSize" query params
+    response = utilizationEndpoint.getFileCounts("vol1", "bucket1", 131072);
+    resultSet = (List<FileCountBySize>) response.getEntity();
+    assertEquals(1, resultSet.size());
+    FileCountBySize o = resultSet.get(0);
+    assertTrue(o.getVolume().equals("vol1") && o.getBucket().equals(
+        "bucket1") && o.getFileSize() == 131072);
+
+    // Test for non-existent fileSize
+    response = utilizationEndpoint.getFileCounts("vol1", "bucket1", 1310725);
+    resultSet = (List<FileCountBySize>) response.getEntity();
+    assertEquals(0, resultSet.size());
+  }
+
+
   private void waitAndCheckConditionAfterHeartbeat(Callable<Boolean> check)
       throws Exception {
     // if container report is processed first, and pipeline does not exist
diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/persistence/TestUtilizationSchemaDefinition.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/persistence/TestUtilizationSchemaDefinition.java
index 9e781da..ab21a35 100644
--- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/persistence/TestUtilizationSchemaDefinition.java
+++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/persistence/TestUtilizationSchemaDefinition.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.recon.persistence;
 import static org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition.CLUSTER_GROWTH_DAILY_TABLE_NAME;
 import static org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition.FILE_COUNT_BY_SIZE_TABLE_NAME;
 import static org.hadoop.ozone.recon.schema.tables.ClusterGrowthDailyTable.CLUSTER_GROWTH_DAILY;
+import static org.hadoop.ozone.recon.schema.tables.FileCountBySizeTable.FILE_COUNT_BY_SIZE;
 import static org.junit.Assert.assertEquals;
 
 import java.sql.Connection;
@@ -33,11 +34,13 @@ import java.util.List;
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
+import org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition;
 import org.hadoop.ozone.recon.schema.tables.daos.ClusterGrowthDailyDao;
 import org.hadoop.ozone.recon.schema.tables.daos.FileCountBySizeDao;
 import org.hadoop.ozone.recon.schema.tables.pojos.ClusterGrowthDaily;
 import org.hadoop.ozone.recon.schema.tables.pojos.FileCountBySize;
 import org.hadoop.ozone.recon.schema.tables.records.FileCountBySizeRecord;
+import org.jooq.Record3;
 import org.jooq.Table;
 import org.jooq.UniqueKey;
 import org.junit.Assert;
@@ -82,6 +85,10 @@ public class TestUtilizationSchemaDefinition extends AbstractReconSqlDBTest {
 
     List<Pair<String, Integer>> expectedPairsFileCount = new ArrayList<>();
     expectedPairsFileCount.add(
+        new ImmutablePair<>("volume", Types.VARCHAR));
+    expectedPairsFileCount.add(
+        new ImmutablePair<>("bucket", Types.VARCHAR));
+    expectedPairsFileCount.add(
         new ImmutablePair<>("file_size", Types.BIGINT));
     expectedPairsFileCount.add(
         new ImmutablePair<>("count", Types.BIGINT));
@@ -93,7 +100,7 @@ public class TestUtilizationSchemaDefinition extends AbstractReconSqlDBTest {
               "DATA_TYPE")));
     }
     assertEquals("Unexpected number of columns",
-        2, actualPairsFileCount.size());
+        4, actualPairsFileCount.size());
     assertEquals("Columns Do not Match ",
         expectedPairsFileCount, actualPairsFileCount);
   }
@@ -182,20 +189,31 @@ public class TestUtilizationSchemaDefinition extends AbstractReconSqlDBTest {
     }
 
     FileCountBySizeDao fileCountBySizeDao = getDao(FileCountBySizeDao.class);
+    UtilizationSchemaDefinition utilizationSchemaDefinition =
+        getSchemaDefinition(UtilizationSchemaDefinition.class);
 
     FileCountBySize newRecord = new FileCountBySize();
+    newRecord.setVolume("vol1");
+    newRecord.setBucket("bucket1");
     newRecord.setFileSize(1024L);
     newRecord.setCount(1L);
 
     fileCountBySizeDao.insert(newRecord);
 
-    FileCountBySize dbRecord = fileCountBySizeDao.findById(1024L);
+    Record3<String, String, Long> recordToFind = utilizationSchemaDefinition
+        .getDSLContext().newRecord(FILE_COUNT_BY_SIZE.VOLUME,
+            FILE_COUNT_BY_SIZE.BUCKET,
+            FILE_COUNT_BY_SIZE.FILE_SIZE)
+        .value1("vol1")
+        .value2("bucket1")
+        .value3(1024L);
+    FileCountBySize dbRecord = fileCountBySizeDao.findById(recordToFind);
     assertEquals(Long.valueOf(1), dbRecord.getCount());
 
     dbRecord.setCount(2L);
     fileCountBySizeDao.update(dbRecord);
 
-    dbRecord = fileCountBySizeDao.findById(1024L);
+    dbRecord = fileCountBySizeDao.findById(recordToFind);
     assertEquals(Long.valueOf(2), dbRecord.getCount());
 
     Table<FileCountBySizeRecord> fileCountBySizeRecordTable =
diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java
index e0e37b5..1cfc0ad 100644
--- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java
+++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -25,20 +25,26 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.hdds.utils.db.TypedTable;
 import org.apache.hadoop.ozone.recon.persistence.AbstractReconSqlDBTest;
 import org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMUpdateEventBuilder;
+import org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition;
 import org.hadoop.ozone.recon.schema.tables.daos.FileCountBySizeDao;
+import org.jooq.DSLContext;
+import org.jooq.Record3;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.AdditionalAnswers;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 
 import static org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMDBUpdateAction.DELETE;
 import static org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMDBUpdateAction.PUT;
 import static org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMDBUpdateAction.UPDATE;
+import static org.hadoop.ozone.recon.schema.tables.FileCountBySizeTable.FILE_COUNT_BY_SIZE;
 import static org.junit.Assert.assertEquals;
 
 import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.BDDMockito.given;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -49,71 +55,41 @@ import static org.mockito.Mockito.when;
 public class TestFileSizeCountTask extends AbstractReconSqlDBTest {
 
   private FileCountBySizeDao fileCountBySizeDao;
+  private FileSizeCountTask fileSizeCountTask;
+  private DSLContext dslContext;
 
   @Before
   public void setUp() {
     fileCountBySizeDao = getDao(FileCountBySizeDao.class);
-  }
-
-  @Test
-  public void testCalculateBinIndex() {
-    FileSizeCountTask fileSizeCountTask = mock(FileSizeCountTask.class);
-
-    when(fileSizeCountTask.getMaxFileSizeUpperBound()).
-        thenReturn(1125899906842624L);    // 1 PB
-    when(fileSizeCountTask.getOneKB()).thenReturn(1024L);
-    when(fileSizeCountTask.getMaxBinSize()).thenReturn(42);
-    when(fileSizeCountTask.calculateBinIndex(anyLong())).thenCallRealMethod();
-    when(fileSizeCountTask.nextClosestPowerIndexOfTwo(
-        anyLong())).thenCallRealMethod();
-
-    long fileSize = 1024L;            // 1 KB
-    int binIndex = fileSizeCountTask.calculateBinIndex(fileSize);
-    assertEquals(1, binIndex);
-
-    fileSize = 1023L;                // 1KB - 1B
-    binIndex = fileSizeCountTask.calculateBinIndex(fileSize);
-    assertEquals(0, binIndex);
-
-    fileSize = 562949953421312L;      // 512 TB
-    binIndex = fileSizeCountTask.calculateBinIndex(fileSize);
-    assertEquals(40, binIndex);
-
-    fileSize = 562949953421313L;      // (512 TB + 1B)
-    binIndex = fileSizeCountTask.calculateBinIndex(fileSize);
-    assertEquals(40, binIndex);
-
-    fileSize = 562949953421311L;      // (512 TB - 1B)
-    binIndex = fileSizeCountTask.calculateBinIndex(fileSize);
-    assertEquals(39, binIndex);
-
-    fileSize = 1125899906842624L;      // 1 PB - last (extra) bin
-    binIndex = fileSizeCountTask.calculateBinIndex(fileSize);
-    assertEquals(41, binIndex);
-
-    fileSize = 100000L;
-    binIndex = fileSizeCountTask.calculateBinIndex(fileSize);
-    assertEquals(7, binIndex);
-
-    fileSize = 1125899906842623L;      // (1 PB - 1B)
-    binIndex = fileSizeCountTask.calculateBinIndex(fileSize);
-    assertEquals(40, binIndex);
-
-    fileSize = 1125899906842624L * 4;      // 4 PB - last extra bin
-    binIndex = fileSizeCountTask.calculateBinIndex(fileSize);
-    assertEquals(41, binIndex);
-
-    fileSize = Long.MAX_VALUE;        // extra bin
-    binIndex = fileSizeCountTask.calculateBinIndex(fileSize);
-    assertEquals(41, binIndex);
+    UtilizationSchemaDefinition utilizationSchemaDefinition =
+        getSchemaDefinition(UtilizationSchemaDefinition.class);
+    fileSizeCountTask =
+        new FileSizeCountTask(fileCountBySizeDao, utilizationSchemaDefinition);
+    dslContext = utilizationSchemaDefinition.getDSLContext();
+    // Truncate table before running each test
+    dslContext.truncate(FILE_COUNT_BY_SIZE);
   }
 
   @Test
   public void testReprocess() throws IOException {
     OmKeyInfo omKeyInfo1 = mock(OmKeyInfo.class);
     given(omKeyInfo1.getKeyName()).willReturn("key1");
+    given(omKeyInfo1.getVolumeName()).willReturn("vol1");
+    given(omKeyInfo1.getBucketName()).willReturn("bucket1");
     given(omKeyInfo1.getDataSize()).willReturn(1000L);
 
+    OmKeyInfo omKeyInfo2 = mock(OmKeyInfo.class);
+    given(omKeyInfo2.getKeyName()).willReturn("key2");
+    given(omKeyInfo2.getVolumeName()).willReturn("vol1");
+    given(omKeyInfo2.getBucketName()).willReturn("bucket1");
+    given(omKeyInfo2.getDataSize()).willReturn(100000L);
+
+    OmKeyInfo omKeyInfo3 = mock(OmKeyInfo.class);
+    given(omKeyInfo3.getKeyName()).willReturn("key3");
+    given(omKeyInfo3.getVolumeName()).willReturn("vol1");
+    given(omKeyInfo3.getBucketName()).willReturn("bucket1");
+    given(omKeyInfo3.getDataSize()).willReturn(1125899906842624L * 4); // 4PB
+
     OMMetadataManager omMetadataManager = mock(OmMetadataManagerImpl.class);
     TypedTable<String, OmKeyInfo> keyTable = mock(TypedTable.class);
 
@@ -124,30 +100,47 @@ public class TestFileSizeCountTask extends AbstractReconSqlDBTest {
 
     when(keyTable.iterator()).thenReturn(mockKeyIter);
     when(omMetadataManager.getKeyTable()).thenReturn(keyTable);
-    when(mockKeyIter.hasNext()).thenReturn(true).thenReturn(false);
+    when(mockKeyIter.hasNext())
+        .thenReturn(true)
+        .thenReturn(true)
+        .thenReturn(true)
+        .thenReturn(false);
     when(mockKeyIter.next()).thenReturn(mockKeyValue);
-    when(mockKeyValue.getValue()).thenReturn(omKeyInfo1);
+    when(mockKeyValue.getValue())
+        .thenReturn(omKeyInfo1)
+        .thenReturn(omKeyInfo2)
+        .thenReturn(omKeyInfo3);
 
-    FileSizeCountTask fileSizeCountTask =
-        new FileSizeCountTask(fileCountBySizeDao);
     Pair<String, Boolean> result =
         fileSizeCountTask.reprocess(omMetadataManager);
     assertTrue(result.getRight());
 
-    long[] upperBoundCount = fileSizeCountTask.getUpperBoundCount();
-    assertEquals(1, upperBoundCount[0]);
-    for (int i = 1; i < upperBoundCount.length; i++) {
-      assertEquals(0, upperBoundCount[i]);
-    }
+    assertEquals(3, fileCountBySizeDao.count());
+    Record3<String, String, Long> recordToFind = dslContext
+        .newRecord(FILE_COUNT_BY_SIZE.VOLUME,
+        FILE_COUNT_BY_SIZE.BUCKET,
+        FILE_COUNT_BY_SIZE.FILE_SIZE)
+        .value1("vol1")
+        .value2("bucket1")
+        .value3(1024L);
+    assertEquals(1L,
+        fileCountBySizeDao.findById(recordToFind).getCount().longValue());
+    // file size upper bound for 100000L is 131072L (next highest power of 2)
+    recordToFind.value3(131072L);
+    assertEquals(1L,
+        fileCountBySizeDao.findById(recordToFind).getCount().longValue());
+    // file size upper bound for 4PB is Long.MAX_VALUE
+    recordToFind.value3(Long.MAX_VALUE);
+    assertEquals(1L,
+        fileCountBySizeDao.findById(recordToFind).getCount().longValue());
   }
 
   @Test
   public void testProcess() {
-    FileSizeCountTask fileSizeCountTask =
-        new FileSizeCountTask(fileCountBySizeDao);
-
     // Write 2 keys.
     OmKeyInfo toBeDeletedKey = mock(OmKeyInfo.class);
+    given(toBeDeletedKey.getVolumeName()).willReturn("vol1");
+    given(toBeDeletedKey.getBucketName()).willReturn("bucket1");
     given(toBeDeletedKey.getKeyName()).willReturn("deletedKey");
     given(toBeDeletedKey.getDataSize()).willReturn(2000L); // Bin 1
     OMDBUpdateEvent event = new OMUpdateEventBuilder()
@@ -157,6 +150,8 @@ public class TestFileSizeCountTask extends AbstractReconSqlDBTest {
         .build();
 
     OmKeyInfo toBeUpdatedKey = mock(OmKeyInfo.class);
+    given(toBeUpdatedKey.getVolumeName()).willReturn("vol1");
+    given(toBeUpdatedKey.getBucketName()).willReturn("bucket1");
     given(toBeUpdatedKey.getKeyName()).willReturn("updatedKey");
     given(toBeUpdatedKey.getDataSize()).willReturn(10000L); // Bin 4
     OMDBUpdateEvent event2 = new OMUpdateEventBuilder()
@@ -170,12 +165,25 @@ public class TestFileSizeCountTask extends AbstractReconSqlDBTest {
     fileSizeCountTask.process(omUpdateEventBatch);
 
     // Verify 2 keys are in correct bins.
-    long[] upperBoundCount = fileSizeCountTask.getUpperBoundCount();
-    assertEquals(1, upperBoundCount[4]); // updatedKey
-    assertEquals(1, upperBoundCount[1]); // deletedKey
+    assertEquals(2, fileCountBySizeDao.count());
+    Record3<String, String, Long> recordToFind = dslContext
+        .newRecord(FILE_COUNT_BY_SIZE.VOLUME,
+            FILE_COUNT_BY_SIZE.BUCKET,
+            FILE_COUNT_BY_SIZE.FILE_SIZE)
+        .value1("vol1")
+        .value2("bucket1")
+        .value3(2048L);
+    assertEquals(1L,
+        fileCountBySizeDao.findById(recordToFind).getCount().longValue());
+    // file size upper bound for 10000L is 16384L (next highest power of 2)
+    recordToFind.value3(16384L);
+    assertEquals(1L,
+        fileCountBySizeDao.findById(recordToFind).getCount().longValue());
 
     // Add new key.
     OmKeyInfo newKey = mock(OmKeyInfo.class);
+    given(newKey.getVolumeName()).willReturn("vol1");
+    given(newKey.getBucketName()).willReturn("bucket1");
     given(newKey.getKeyName()).willReturn("newKey");
     given(newKey.getDataSize()).willReturn(1000L); // Bin 0
     OMDBUpdateEvent putEvent = new OMUpdateEventBuilder()
@@ -186,6 +194,8 @@ public class TestFileSizeCountTask extends AbstractReconSqlDBTest {
 
     // Update existing key.
     OmKeyInfo updatedKey = mock(OmKeyInfo.class);
+    given(updatedKey.getVolumeName()).willReturn("vol1");
+    given(updatedKey.getBucketName()).willReturn("bucket1");
     given(updatedKey.getKeyName()).willReturn("updatedKey");
     given(updatedKey.getDataSize()).willReturn(50000L); // Bin 6
     OMDBUpdateEvent updateEvent = new OMUpdateEventBuilder()
@@ -206,10 +216,189 @@ public class TestFileSizeCountTask extends AbstractReconSqlDBTest {
         Arrays.asList(updateEvent, putEvent, deleteEvent));
     fileSizeCountTask.process(omUpdateEventBatch);
 
-    upperBoundCount = fileSizeCountTask.getUpperBoundCount();
-    assertEquals(1, upperBoundCount[0]); // newKey
-    assertEquals(0, upperBoundCount[1]); // deletedKey
-    assertEquals(0, upperBoundCount[4]); // updatedKey old
-    assertEquals(1, upperBoundCount[6]); // updatedKey new
+    assertEquals(4, fileCountBySizeDao.count());
+    recordToFind.value3(1024L);
+    assertEquals(1, fileCountBySizeDao.findById(recordToFind)
+        .getCount().longValue());
+    recordToFind.value3(2048L);
+    assertEquals(0, fileCountBySizeDao.findById(recordToFind)
+        .getCount().longValue());
+    recordToFind.value3(16384L);
+    assertEquals(0, fileCountBySizeDao.findById(recordToFind)
+        .getCount().longValue());
+    recordToFind.value3(65536L);
+    assertEquals(1, fileCountBySizeDao.findById(recordToFind)
+        .getCount().longValue());
+  }
+
+  @Test
+  public void testReprocessAtScale() throws IOException {
+    // generate mocks for 2 volumes, 500 buckets each volume
+    // and 42 keys in each bucket.
+    List<OmKeyInfo> omKeyInfoList = new ArrayList<>();
+    List<Boolean> hasNextAnswer = new ArrayList<>();
+    for (int volIndex = 1; volIndex <= 2; volIndex++) {
+      for (int bktIndex = 1; bktIndex <= 500; bktIndex++) {
+        for (int keyIndex = 1; keyIndex <= 42; keyIndex++) {
+          OmKeyInfo omKeyInfo = mock(OmKeyInfo.class);
+          given(omKeyInfo.getKeyName()).willReturn("key" + keyIndex);
+          given(omKeyInfo.getVolumeName()).willReturn("vol" + volIndex);
+          given(omKeyInfo.getBucketName()).willReturn("bucket" + bktIndex);
+          // Place keys in each bin
+          long fileSize = (long)Math.pow(2, keyIndex + 9) - 1L;
+          given(omKeyInfo.getDataSize()).willReturn(fileSize);
+          omKeyInfoList.add(omKeyInfo);
+          hasNextAnswer.add(true);
+        }
+      }
+    }
+    hasNextAnswer.add(false);
+
+    OMMetadataManager omMetadataManager = mock(OmMetadataManagerImpl.class);
+    TypedTable<String, OmKeyInfo> keyTable = mock(TypedTable.class);
+
+    TypedTable.TypedTableIterator mockKeyIter = mock(TypedTable
+        .TypedTableIterator.class);
+    TypedTable.TypedKeyValue mockKeyValue = mock(
+        TypedTable.TypedKeyValue.class);
+
+    when(keyTable.iterator()).thenReturn(mockKeyIter);
+    when(omMetadataManager.getKeyTable()).thenReturn(keyTable);
+    when(mockKeyIter.hasNext())
+        .thenAnswer(AdditionalAnswers.returnsElementsOf(hasNextAnswer));
+    when(mockKeyIter.next()).thenReturn(mockKeyValue);
+    when(mockKeyValue.getValue())
+        .thenAnswer(AdditionalAnswers.returnsElementsOf(omKeyInfoList));
+
+    Pair<String, Boolean> result =
+        fileSizeCountTask.reprocess(omMetadataManager);
+    assertTrue(result.getRight());
+
+    // 2 volumes * 500 buckets * 42 bins = 42000 rows
+    assertEquals(42000, fileCountBySizeDao.count());
+    Record3<String, String, Long> recordToFind = dslContext
+        .newRecord(FILE_COUNT_BY_SIZE.VOLUME,
+            FILE_COUNT_BY_SIZE.BUCKET,
+            FILE_COUNT_BY_SIZE.FILE_SIZE)
+        .value1("vol1")
+        .value2("bucket1")
+        .value3(1024L);
+    assertEquals(1L,
+        fileCountBySizeDao.findById(recordToFind).getCount().longValue());
+    // file size upper bound for 100000L is 131072L (next highest power of 2)
+    recordToFind.value1("vol1");
+    recordToFind.value3(131072L);
+    assertEquals(1L,
+        fileCountBySizeDao.findById(recordToFind).getCount().longValue());
+    recordToFind.value2("bucket500");
+    recordToFind.value3(Long.MAX_VALUE);
+    assertEquals(1L,
+        fileCountBySizeDao.findById(recordToFind).getCount().longValue());
+  }
+
+  @Test
+  public void testProcessAtScale() {
+    // Write 10000 keys.
+    List<OMDBUpdateEvent> omDbEventList = new ArrayList<>();
+    List<OmKeyInfo> omKeyInfoList = new ArrayList<>();
+    for (int volIndex = 1; volIndex <= 10; volIndex++) {
+      for (int bktIndex = 1; bktIndex <= 100; bktIndex++) {
+        for (int keyIndex = 1; keyIndex <= 10; keyIndex++) {
+          OmKeyInfo omKeyInfo = mock(OmKeyInfo.class);
+          given(omKeyInfo.getKeyName()).willReturn("key" + keyIndex);
+          given(omKeyInfo.getVolumeName()).willReturn("vol" + volIndex);
+          given(omKeyInfo.getBucketName()).willReturn("bucket" + bktIndex);
+          // Place keys in each bin
+          long fileSize = (long)Math.pow(2, keyIndex + 9) - 1L;
+          given(omKeyInfo.getDataSize()).willReturn(fileSize);
+          omKeyInfoList.add(omKeyInfo);
+          omDbEventList.add(new OMUpdateEventBuilder()
+              .setAction(PUT)
+              .setKey("key" + keyIndex)
+              .setValue(omKeyInfo)
+              .build());
+        }
+      }
+    }
+
+    OMUpdateEventBatch omUpdateEventBatch =
+        new OMUpdateEventBatch(omDbEventList);
+    fileSizeCountTask.process(omUpdateEventBatch);
+
+    // Verify 2 keys are in correct bins.
+    assertEquals(10000, fileCountBySizeDao.count());
+    Record3<String, String, Long> recordToFind = dslContext
+        .newRecord(FILE_COUNT_BY_SIZE.VOLUME,
+            FILE_COUNT_BY_SIZE.BUCKET,
+            FILE_COUNT_BY_SIZE.FILE_SIZE)
+        .value1("vol1")
+        .value2("bucket1")
+        .value3(2048L);
+    assertEquals(1L,
+        fileCountBySizeDao.findById(recordToFind).getCount().longValue());
+    recordToFind.value1("vol10");
+    recordToFind.value2("bucket100");
+    // file size upper bound for 10000L is 16384L (next highest power of 2)
+    recordToFind.value3(16384L);
+    assertEquals(1L,
+        fileCountBySizeDao.findById(recordToFind).getCount().longValue());
+
+    // Process 500 deletes and 500 updates
+    omDbEventList = new ArrayList<>();
+    for (int volIndex = 1; volIndex <= 1; volIndex++) {
+      for (int bktIndex = 1; bktIndex <= 100; bktIndex++) {
+        for (int keyIndex = 1; keyIndex <= 10; keyIndex++) {
+          OmKeyInfo omKeyInfo = mock(OmKeyInfo.class);
+          given(omKeyInfo.getKeyName()).willReturn("key" + keyIndex);
+          given(omKeyInfo.getVolumeName()).willReturn("vol" + volIndex);
+          given(omKeyInfo.getBucketName()).willReturn("bucket" + bktIndex);
+          if (keyIndex <= 5) {
+            long fileSize = (long)Math.pow(2, keyIndex + 9) - 1L;
+            given(omKeyInfo.getDataSize()).willReturn(fileSize);
+            omDbEventList.add(new OMUpdateEventBuilder()
+                .setAction(DELETE)
+                .setKey("key" + keyIndex)
+                .setValue(omKeyInfo)
+                .build());
+          } else {
+            // update all the files with keyIndex > 5 to filesize 1023L
+            // so that they get into first bin
+            given(omKeyInfo.getDataSize()).willReturn(1023L);
+            omDbEventList.add(new OMUpdateEventBuilder()
+                .setAction(UPDATE)
+                .setKey("key" + keyIndex)
+                .setValue(omKeyInfo)
+                .setOldValue(
+                    omKeyInfoList.get((volIndex * bktIndex) + keyIndex))
+                .build());
+          }
+        }
+      }
+    }
+
+    omUpdateEventBatch = new OMUpdateEventBatch(omDbEventList);
+    fileSizeCountTask.process(omUpdateEventBatch);
+
+    assertEquals(10000, fileCountBySizeDao.count());
+    recordToFind = dslContext
+        .newRecord(FILE_COUNT_BY_SIZE.VOLUME,
+            FILE_COUNT_BY_SIZE.BUCKET,
+            FILE_COUNT_BY_SIZE.FILE_SIZE)
+        .value1("vol1")
+        .value2("bucket1")
+        .value3(1024L);
+    // The update events on keys 6-10 should now put them under first bin 1024L
+    assertEquals(5, fileCountBySizeDao.findById(recordToFind)
+        .getCount().longValue());
+    recordToFind.value2("bucket100");
+    assertEquals(5, fileCountBySizeDao.findById(recordToFind)
+        .getCount().longValue());
+    recordToFind.value3(2048L);
+    assertEquals(0, fileCountBySizeDao.findById(recordToFind)
+        .getCount().longValue());
+    // Volumes 2 - 10 should not be affected by this process
+    recordToFind.value1("vol2");
+    assertEquals(1, fileCountBySizeDao.findById(recordToFind)
+        .getCount().longValue());
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org