You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by vi...@apache.org on 2020/06/03 22:18:34 UTC
[hadoop-ozone] branch master updated: HDDS-3681. Recon: Add support
to store file size counts in each volume/bucket (#988)
This is an automated email from the ASF dual-hosted git repository.
vivekratnavel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/master by this push:
new aec7a93 HDDS-3681. Recon: Add support to store file size counts in each volume/bucket (#988)
aec7a93 is described below
commit aec7a9345e8cecdacc1367562fdd82dd4dfc34df
Author: Vivek Ratnavel Subramanian <vi...@gmail.com>
AuthorDate: Wed Jun 3 15:18:25 2020 -0700
HDDS-3681. Recon: Add support to store file size counts in each volume/bucket (#988)
---
.../recon/schema/UtilizationSchemaDefinition.java | 19 +-
.../apache/hadoop/ozone/recon/ReconConstants.java | 3 +
.../hadoop/ozone/recon/ReconControllerModule.java | 3 +-
.../ozone/recon/api/UtilizationEndpoint.java | 101 ++++++
.../hadoop/ozone/recon/api/UtilizationService.java | 52 ----
.../ozone/recon/tasks/FileSizeCountTask.java | 213 +++++++------
.../hadoop/ozone/recon/api/TestEndpoints.java | 122 ++++++++
.../TestUtilizationSchemaDefinition.java | 26 +-
.../ozone/recon/tasks/TestFileSizeCountTask.java | 339 ++++++++++++++++-----
9 files changed, 643 insertions(+), 235 deletions(-)
diff --git a/hadoop-ozone/recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/UtilizationSchemaDefinition.java b/hadoop-ozone/recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/UtilizationSchemaDefinition.java
index 941a3c6..92de19e 100644
--- a/hadoop-ozone/recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/UtilizationSchemaDefinition.java
+++ b/hadoop-ozone/recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/UtilizationSchemaDefinition.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -26,6 +26,7 @@ import javax.sql.DataSource;
import com.google.inject.Singleton;
+import org.jooq.DSLContext;
import org.jooq.impl.DSL;
import org.jooq.impl.SQLDataType;
import org.slf4j.Logger;
@@ -44,6 +45,7 @@ public class UtilizationSchemaDefinition implements ReconSchemaDefinition {
LoggerFactory.getLogger(UtilizationSchemaDefinition.class);
private final DataSource dataSource;
+ private DSLContext dslContext;
public static final String CLUSTER_GROWTH_DAILY_TABLE_NAME =
"CLUSTER_GROWTH_DAILY";
@@ -59,6 +61,7 @@ public class UtilizationSchemaDefinition implements ReconSchemaDefinition {
@Transactional
public void initializeSchema() throws SQLException {
Connection conn = dataSource.getConnection();
+ dslContext = DSL.using(conn);
if (!TABLE_EXISTS_CHECK.test(conn, FILE_COUNT_BY_SIZE_TABLE_NAME)) {
createFileSizeCountTable(conn);
}
@@ -68,7 +71,7 @@ public class UtilizationSchemaDefinition implements ReconSchemaDefinition {
}
private void createClusterGrowthTable(Connection conn) {
- DSL.using(conn).createTableIfNotExists(CLUSTER_GROWTH_DAILY_TABLE_NAME)
+ dslContext.createTableIfNotExists(CLUSTER_GROWTH_DAILY_TABLE_NAME)
.column("timestamp", SQLDataType.TIMESTAMP)
.column("datanode_id", SQLDataType.INTEGER)
.column("datanode_host", SQLDataType.VARCHAR(1024))
@@ -83,11 +86,17 @@ public class UtilizationSchemaDefinition implements ReconSchemaDefinition {
}
private void createFileSizeCountTable(Connection conn) {
- DSL.using(conn).createTableIfNotExists(FILE_COUNT_BY_SIZE_TABLE_NAME)
+ dslContext.createTableIfNotExists(FILE_COUNT_BY_SIZE_TABLE_NAME)
+ .column("volume", SQLDataType.VARCHAR(64))
+ .column("bucket", SQLDataType.VARCHAR(64))
.column("file_size", SQLDataType.BIGINT)
.column("count", SQLDataType.BIGINT)
- .constraint(DSL.constraint("pk_file_size")
- .primaryKey("file_size"))
+ .constraint(DSL.constraint("pk_volume_bucket_file_size")
+ .primaryKey("volume", "bucket", "file_size"))
.execute();
}
+
+ public DSLContext getDSLContext() {
+ return dslContext;
+ }
}
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
index bf3457a..b0a5c0b 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
@@ -48,6 +48,9 @@ public final class ReconConstants {
public static final String RECON_QUERY_PREVKEY = "prevKey";
public static final String PREV_CONTAINER_ID_DEFAULT_VALUE = "0";
public static final String RECON_QUERY_LIMIT = "limit";
+ public static final String RECON_QUERY_VOLUME = "volume";
+ public static final String RECON_QUERY_BUCKET = "bucket";
+ public static final String RECON_QUERY_FILE_SIZE = "fileSize";
public static final String RECON_SCM_CONTAINER_DB =
"recon-" + CONTAINER_DB_SUFFIX;
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
index 5ec0e3b..050ad03 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
@@ -136,7 +136,8 @@ public class ReconControllerModule extends AbstractModule {
RECON_DAO_LIST.forEach(aClass -> {
try {
bind(aClass).toConstructor(
- (Constructor) aClass.getConstructor(Configuration.class));
+ (Constructor) aClass.getConstructor(Configuration.class))
+ .in(Singleton.class);
} catch (NoSuchMethodException e) {
LOG.error("Error creating DAO {} ", aClass.getSimpleName(), e);
}
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/UtilizationEndpoint.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/UtilizationEndpoint.java
new file mode 100644
index 0000000..55f9055
--- /dev/null
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/UtilizationEndpoint.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.api;
+
+import javax.inject.Inject;
+
+import org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition;
+import org.hadoop.ozone.recon.schema.tables.daos.FileCountBySizeDao;
+import org.hadoop.ozone.recon.schema.tables.pojos.FileCountBySize;
+import org.jooq.DSLContext;
+import org.jooq.Record3;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_BUCKET;
+import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_FILE_SIZE;
+import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_VOLUME;
+import static org.hadoop.ozone.recon.schema.tables.FileCountBySizeTable.FILE_COUNT_BY_SIZE;
+
+/**
+ * Endpoint for querying the counts of a certain file Size.
+ */
+@Path("/utilization")
+@Produces(MediaType.APPLICATION_JSON)
+public class UtilizationEndpoint {
+
+ private FileCountBySizeDao fileCountBySizeDao;
+ private UtilizationSchemaDefinition utilizationSchemaDefinition;
+
+ @Inject
+ public UtilizationEndpoint(FileCountBySizeDao fileCountBySizeDao,
+ UtilizationSchemaDefinition
+ utilizationSchemaDefinition) {
+ this.utilizationSchemaDefinition = utilizationSchemaDefinition;
+ this.fileCountBySizeDao = fileCountBySizeDao;
+ }
+
+ /**
+ * Return the file counts from Recon DB.
+ * @return {@link Response}
+ */
+ @GET
+ @Path("/fileCount")
+ public Response getFileCounts(
+ @QueryParam(RECON_QUERY_VOLUME)
+ String volume,
+ @QueryParam(RECON_QUERY_BUCKET)
+ String bucket,
+ @QueryParam(RECON_QUERY_FILE_SIZE)
+ long fileSize
+ ) {
+ DSLContext dslContext = utilizationSchemaDefinition.getDSLContext();
+ List<FileCountBySize> resultSet;
+ if (volume != null && bucket != null && fileSize > 0) {
+ Record3<String, String, Long> recordToFind = dslContext
+ .newRecord(FILE_COUNT_BY_SIZE.VOLUME,
+ FILE_COUNT_BY_SIZE.BUCKET,
+ FILE_COUNT_BY_SIZE.FILE_SIZE)
+ .value1(volume)
+ .value2(bucket)
+ .value3(fileSize);
+ FileCountBySize record = fileCountBySizeDao.findById(recordToFind);
+ resultSet = record != null ?
+ Collections.singletonList(record) : Collections.emptyList();
+ } else if (volume != null && bucket != null) {
+ resultSet = dslContext.select().from(FILE_COUNT_BY_SIZE)
+ .where(FILE_COUNT_BY_SIZE.VOLUME.eq(volume))
+ .and(FILE_COUNT_BY_SIZE.BUCKET.eq(bucket))
+ .fetchInto(FileCountBySize.class);
+ } else if (volume != null) {
+ resultSet = fileCountBySizeDao.fetchByVolume(volume);
+ } else {
+ // fetch all records
+ resultSet = fileCountBySizeDao.findAll();
+ }
+ return Response.ok(resultSet).build();
+ }
+}
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/UtilizationService.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/UtilizationService.java
deleted file mode 100644
index 5801f8f..0000000
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/UtilizationService.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.recon.api;
-
-import javax.inject.Inject;
-import org.hadoop.ozone.recon.schema.tables.daos.FileCountBySizeDao;
-import org.hadoop.ozone.recon.schema.tables.pojos.FileCountBySize;
-
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import java.util.List;
-
-/**
- * Endpoint for querying the counts of a certain file Size.
- */
-@Path("/utilization")
-@Produces(MediaType.APPLICATION_JSON)
-public class UtilizationService {
-
- @Inject
- private FileCountBySizeDao fileCountBySizeDao;
-
- /**
- * Return the file counts from Recon DB.
- * @return {@link Response}
- */
- @GET
- @Path("/fileCount")
- public Response getFileCounts() {
- List<FileCountBySize> resultSet = fileCountBySizeDao.findAll();
- return Response.ok(resultSet).build();
- }
-}
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTask.java
index d855747..80b2526 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTask.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTask.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.ozone.recon.tasks;
-import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
@@ -26,19 +25,23 @@ import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition;
import org.hadoop.ozone.recon.schema.tables.daos.FileCountBySizeDao;
import org.hadoop.ozone.recon.schema.tables.pojos.FileCountBySize;
+import org.jooq.DSLContext;
+import org.jooq.Record3;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
-import java.util.List;
+import java.util.Map;
import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE;
+import static org.hadoop.ozone.recon.schema.tables.FileCountBySizeTable.FILE_COUNT_BY_SIZE;
/**
* Class to iterate over the OM DB and store the counts of existing/new
@@ -49,33 +52,26 @@ public class FileSizeCountTask implements ReconOmTask {
private static final Logger LOG =
LoggerFactory.getLogger(FileSizeCountTask.class);
- private int maxBinSize = -1;
- private long maxFileSizeUpperBound = 1125899906842624L; // 1 PB
- private long[] upperBoundCount;
- private long oneKb = 1024L;
+ // 1125899906842624L = 1PB
+ private static final long MAX_FILE_SIZE_UPPER_BOUND = 1125899906842624L;
private FileCountBySizeDao fileCountBySizeDao;
+ private DSLContext dslContext;
@Inject
- public FileSizeCountTask(FileCountBySizeDao fileCountBySizeDao) {
+ public FileSizeCountTask(FileCountBySizeDao fileCountBySizeDao,
+ UtilizationSchemaDefinition
+ utilizationSchemaDefinition) {
this.fileCountBySizeDao = fileCountBySizeDao;
- upperBoundCount = new long[getMaxBinSize()];
- }
-
- long getOneKB() {
- return oneKb;
- }
-
- long getMaxFileSizeUpperBound() {
- return maxFileSizeUpperBound;
+ this.dslContext = utilizationSchemaDefinition.getDSLContext();
}
- int getMaxBinSize() {
- if (maxBinSize == -1) {
- // extra bin to add files > 1PB.
- // 1 KB (2 ^ 10) is the smallest tracked file.
- maxBinSize = nextClosestPowerIndexOfTwo(maxFileSizeUpperBound) - 10 + 1;
+ private static int nextClosestPowerIndexOfTwo(long dataSize) {
+ int index = 0;
+ while(dataSize != 0) {
+ dataSize >>= 1;
+ index += 1;
}
- return maxBinSize;
+ return index;
}
/**
@@ -88,17 +84,20 @@ public class FileSizeCountTask implements ReconOmTask {
@Override
public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
Table<String, OmKeyInfo> omKeyInfoTable = omMetadataManager.getKeyTable();
+ Map<FileSizeCountKey, Long> fileSizeCountMap = new HashMap<>();
try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
keyIter = omKeyInfoTable.iterator()) {
while (keyIter.hasNext()) {
Table.KeyValue<String, OmKeyInfo> kv = keyIter.next();
- handlePutKeyEvent(kv.getValue());
+ handlePutKeyEvent(kv.getValue(), fileSizeCountMap);
}
} catch (IOException ioEx) {
LOG.error("Unable to populate File Size Count in Recon DB. ", ioEx);
return new ImmutablePair<>(getTaskName(), false);
}
- writeCountsToDB();
+ // Truncate table before inserting new rows
+ dslContext.truncate(FILE_COUNT_BY_SIZE);
+ writeCountsToDB(true, fileSizeCountMap);
LOG.info("Completed a 'reprocess' run of FileSizeCountTask.");
return new ImmutablePair<>(getTaskName(), true);
@@ -114,19 +113,6 @@ public class FileSizeCountTask implements ReconOmTask {
return Collections.singletonList(KEY_TABLE);
}
- private void readCountsFromDB() {
- // Read - Write operations to DB are in ascending order
- // of file size upper bounds.
- List<FileCountBySize> resultSet = fileCountBySizeDao.findAll();
- int index = 0;
- if (resultSet != null) {
- for (FileCountBySize row : resultSet) {
- upperBoundCount[index] = row.getCount();
- index++;
- }
- }
- }
-
/**
* Read the Keys from update events and update the count of files
* pertaining to a certain upper bound.
@@ -137,9 +123,8 @@ public class FileSizeCountTask implements ReconOmTask {
@Override
public Pair<String, Boolean> process(OMUpdateEventBatch events) {
Iterator<OMDBUpdateEvent> eventIterator = events.getIterator();
+ Map<FileSizeCountKey, Long> fileSizeCountMap = new HashMap<>();
- //update array with file size count from DB
- readCountsFromDB();
while (eventIterator.hasNext()) {
OMDBUpdateEvent<String, OmKeyInfo> omdbUpdateEvent = eventIterator.next();
String updatedKey = omdbUpdateEvent.getKey();
@@ -148,16 +133,17 @@ public class FileSizeCountTask implements ReconOmTask {
try{
switch (omdbUpdateEvent.getAction()) {
case PUT:
- handlePutKeyEvent(omKeyInfo);
+ handlePutKeyEvent(omKeyInfo, fileSizeCountMap);
break;
case DELETE:
- handleDeleteKeyEvent(updatedKey, omKeyInfo);
+ handleDeleteKeyEvent(updatedKey, omKeyInfo, fileSizeCountMap);
break;
case UPDATE:
- handleDeleteKeyEvent(updatedKey, omdbUpdateEvent.getOldValue());
- handlePutKeyEvent(omKeyInfo);
+ handleDeleteKeyEvent(updatedKey, omdbUpdateEvent.getOldValue(),
+ fileSizeCountMap);
+ handlePutKeyEvent(omKeyInfo, fileSizeCountMap);
break;
default: LOG.trace("Skipping DB update event : {}",
@@ -169,36 +155,20 @@ public class FileSizeCountTask implements ReconOmTask {
return new ImmutablePair<>(getTaskName(), false);
}
}
- writeCountsToDB();
+ writeCountsToDB(false, fileSizeCountMap);
LOG.info("Completed a 'process' run of FileSizeCountTask.");
return new ImmutablePair<>(getTaskName(), true);
}
- /**
- * Calculate the bin index based on size of the Key.
- * index is calculated as the number of right shifts
- * needed until dataSize becomes zero.
- *
- * @param dataSize Size of the key.
- * @return int bin index in upperBoundCount
- */
- public int calculateBinIndex(long dataSize) {
- if (dataSize >= getMaxFileSizeUpperBound()) {
- return getMaxBinSize() - 1;
+ private long getFileSizeUpperBound(long fileSize) {
+ if (fileSize >= MAX_FILE_SIZE_UPPER_BOUND) {
+ return Long.MAX_VALUE;
}
- int index = nextClosestPowerIndexOfTwo(dataSize);
+ int index = nextClosestPowerIndexOfTwo(fileSize);
// The smallest file size being tracked for count
// is 1 KB i.e. 1024 = 2 ^ 10.
- return index < 10 ? 0 : index - 10;
- }
-
- int nextClosestPowerIndexOfTwo(long dataSize) {
- int index = 0;
- while(dataSize != 0) {
- dataSize >>= 1;
- index += 1;
- }
- return index;
+ int binIndex = index < 10 ? 0 : index - 10;
+ return (long) Math.pow(2, (10 + binIndex));
}
/**
@@ -206,61 +176,108 @@ public class FileSizeCountTask implements ReconOmTask {
* using the dao.
*
*/
- void writeCountsToDB() {
- for (int i = 0; i < upperBoundCount.length; i++) {
- long fileSizeUpperBound = (i == upperBoundCount.length - 1) ?
- Long.MAX_VALUE : (long) Math.pow(2, (10 + i));
- FileCountBySize fileCountRecord =
- fileCountBySizeDao.findById(fileSizeUpperBound);
- FileCountBySize newRecord = new
- FileCountBySize(fileSizeUpperBound, upperBoundCount[i]);
- if (fileCountRecord == null) {
+ private void writeCountsToDB(boolean isDbTruncated,
+ Map<FileSizeCountKey, Long> fileSizeCountMap) {
+ fileSizeCountMap.keySet().forEach((FileSizeCountKey key) -> {
+ FileCountBySize newRecord = new FileCountBySize();
+ newRecord.setVolume(key.volume);
+ newRecord.setBucket(key.bucket);
+ newRecord.setFileSize(key.fileSizeUpperBound);
+ newRecord.setCount(fileSizeCountMap.get(key));
+ if (!isDbTruncated) {
+ // Get the current count from database and update
+ Record3<String, String, Long> recordToFind =
+ dslContext.newRecord(
+ FILE_COUNT_BY_SIZE.VOLUME,
+ FILE_COUNT_BY_SIZE.BUCKET,
+ FILE_COUNT_BY_SIZE.FILE_SIZE)
+ .value1(key.volume)
+ .value2(key.bucket)
+ .value3(key.fileSizeUpperBound);
+ FileCountBySize fileCountRecord =
+ fileCountBySizeDao.findById(recordToFind);
+ if (fileCountRecord == null && newRecord.getCount() > 0L) {
+ // insert new row only for non-zero counts.
+ fileCountBySizeDao.insert(newRecord);
+ } else if (fileCountRecord != null) {
+ newRecord.setCount(fileCountRecord.getCount() +
+ fileSizeCountMap.get(key));
+ fileCountBySizeDao.update(newRecord);
+ }
+ } else if (newRecord.getCount() > 0) {
+ // insert new row only for non-zero counts.
fileCountBySizeDao.insert(newRecord);
- } else {
- fileCountBySizeDao.update(newRecord);
}
- }
+ });
+ }
+
+ private FileSizeCountKey getFileSizeCountKey(OmKeyInfo omKeyInfo) {
+ return new FileSizeCountKey(omKeyInfo.getVolumeName(),
+ omKeyInfo.getBucketName(),
+ getFileSizeUpperBound(omKeyInfo.getDataSize()));
}
/**
* Calculate and update the count of files being tracked by
- * upperBoundCount[].
+ * fileSizeCountMap.
* Used by reprocess() and process().
*
* @param omKeyInfo OmKey being updated for count
*/
- void handlePutKeyEvent(OmKeyInfo omKeyInfo) {
- int binIndex = calculateBinIndex(omKeyInfo.getDataSize());
- upperBoundCount[binIndex]++;
+ private void handlePutKeyEvent(OmKeyInfo omKeyInfo,
+ Map<FileSizeCountKey, Long> fileSizeCountMap) {
+ FileSizeCountKey key = getFileSizeCountKey(omKeyInfo);
+ Long count = fileSizeCountMap.containsKey(key) ?
+ fileSizeCountMap.get(key) + 1L : 1L;
+ fileSizeCountMap.put(key, count);
}
/**
* Calculate and update the count of files being tracked by
- * upperBoundCount[].
+ * fileSizeCountMap.
* Used by reprocess() and process().
*
* @param omKeyInfo OmKey being updated for count
*/
- void handleDeleteKeyEvent(String key, OmKeyInfo omKeyInfo) {
+ private void handleDeleteKeyEvent(String key, OmKeyInfo omKeyInfo,
+ Map<FileSizeCountKey, Long>
+ fileSizeCountMap) {
if (omKeyInfo == null) {
LOG.warn("Unexpected error while handling DELETE key event. Key not " +
"found in Recon OM DB : {}", key);
} else {
- int binIndex = calculateBinIndex(omKeyInfo.getDataSize());
- if (upperBoundCount[binIndex] > 0) {
- //decrement only if it had files before, default DB value is 0
- upperBoundCount[binIndex]--;
- } else {
- LOG.warn("Unexpected error while updating bin count. Found 0 count " +
- "for index : {} while processing DELETE event for {}", binIndex,
- omKeyInfo.getKeyName());
- }
+ FileSizeCountKey countKey = getFileSizeCountKey(omKeyInfo);
+ Long count = fileSizeCountMap.containsKey(countKey) ?
+ fileSizeCountMap.get(countKey) - 1L : -1L;
+ fileSizeCountMap.put(countKey, count);
}
}
- @VisibleForTesting
- protected long[] getUpperBoundCount() {
- return upperBoundCount;
- }
+ private static class FileSizeCountKey {
+ private String volume;
+ private String bucket;
+ private Long fileSizeUpperBound;
+ FileSizeCountKey(String volume, String bucket,
+ Long fileSizeUpperBound) {
+ this.volume = volume;
+ this.bucket = bucket;
+ this.fileSizeUpperBound = fileSizeUpperBound;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if(obj instanceof FileSizeCountKey) {
+ FileSizeCountKey s = (FileSizeCountKey) obj;
+ return volume.equals(s.volume) && bucket.equals(s.bucket) &&
+ fileSizeUpperBound.equals(s.fileSizeUpperBound);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return (volume + bucket + fileSizeUpperBound).hashCode();
+ }
+ }
}
diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java
index 9234131..40d3ba6 100644
--- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java
+++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.recon.api;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
@@ -37,7 +38,11 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
+import org.apache.hadoop.hdds.utils.db.TypedTable;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.recon.ReconTestInjector;
import org.apache.hadoop.ozone.recon.api.types.ClusterStateResponse;
@@ -52,7 +57,11 @@ import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade;
import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl;
import org.apache.hadoop.ozone.recon.spi.impl.StorageContainerServiceProviderImpl;
+import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTask;
import org.apache.hadoop.test.LambdaTestUtils;
+import org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition;
+import org.hadoop.ozone.recon.schema.tables.daos.FileCountBySizeDao;
+import org.hadoop.ozone.recon.schema.tables.pojos.FileCountBySize;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@@ -64,12 +73,16 @@ import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getRandom
import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getTestReconOmMetadataManager;
import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.initializeNewOmMetadataManager;
import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeDataToOm;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import javax.ws.rs.core.Response;
import java.io.IOException;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable;
@@ -80,7 +93,9 @@ public class TestEndpoints extends AbstractReconSqlDBTest {
private NodeEndpoint nodeEndpoint;
private PipelineEndpoint pipelineEndpoint;
private ClusterStateEndpoint clusterStateEndpoint;
+ private UtilizationEndpoint utilizationEndpoint;
private ReconOMMetadataManager reconOMMetadataManager;
+ private FileSizeCountTask fileSizeCountTask;
private ReconStorageContainerManagerFacade reconScm;
private boolean isSetupDone = false;
private String pipelineId;
@@ -90,6 +105,7 @@ public class TestEndpoints extends AbstractReconSqlDBTest {
private ContainerReportsProto containerReportsProto;
private DatanodeDetailsProto datanodeDetailsProto;
private Pipeline pipeline;
+ private FileCountBySizeDao fileCountBySizeDao;
private final String host1 = "host1.datanode";
private final String host2 = "host2.datanode";
private final String ip1 = "1.1.1.1";
@@ -143,6 +159,7 @@ public class TestEndpoints extends AbstractReconSqlDBTest {
.addBinding(ClusterStateEndpoint.class)
.addBinding(NodeEndpoint.class)
.addBinding(ContainerSchemaManager.class)
+ .addBinding(UtilizationEndpoint.class)
.addBinding(StorageContainerLocationProtocol.class, mockScmClient)
.build();
@@ -150,6 +167,13 @@ public class TestEndpoints extends AbstractReconSqlDBTest {
pipelineEndpoint = reconTestInjector.getInstance(PipelineEndpoint.class);
clusterStateEndpoint =
reconTestInjector.getInstance(ClusterStateEndpoint.class);
+ fileCountBySizeDao = getDao(FileCountBySizeDao.class);
+ UtilizationSchemaDefinition utilizationSchemaDefinition =
+ getSchemaDefinition(UtilizationSchemaDefinition.class);
+ utilizationEndpoint = new UtilizationEndpoint(
+ fileCountBySizeDao, utilizationSchemaDefinition);
+ fileSizeCountTask =
+ new FileSizeCountTask(fileCountBySizeDao, utilizationSchemaDefinition);
reconScm = (ReconStorageContainerManagerFacade)
reconTestInjector.getInstance(OzoneStorageContainerManager.class);
}
@@ -397,6 +421,104 @@ public class TestEndpoints extends AbstractReconSqlDBTest {
});
}
+ @Test
+ public void testGetFileCounts() throws Exception {
+ OmKeyInfo omKeyInfo1 = mock(OmKeyInfo.class);
+ given(omKeyInfo1.getKeyName()).willReturn("key1");
+ given(omKeyInfo1.getVolumeName()).willReturn("vol1");
+ given(omKeyInfo1.getBucketName()).willReturn("bucket1");
+ given(omKeyInfo1.getDataSize()).willReturn(1000L);
+
+ OmKeyInfo omKeyInfo2 = mock(OmKeyInfo.class);
+ given(omKeyInfo2.getKeyName()).willReturn("key2");
+ given(omKeyInfo2.getVolumeName()).willReturn("vol1");
+ given(omKeyInfo2.getBucketName()).willReturn("bucket1");
+ given(omKeyInfo2.getDataSize()).willReturn(100000L);
+
+ OmKeyInfo omKeyInfo3 = mock(OmKeyInfo.class);
+ given(omKeyInfo3.getKeyName()).willReturn("key1");
+ given(omKeyInfo3.getVolumeName()).willReturn("vol2");
+ given(omKeyInfo3.getBucketName()).willReturn("bucket1");
+ given(omKeyInfo3.getDataSize()).willReturn(1000L);
+
+ OMMetadataManager omMetadataManager = mock(OmMetadataManagerImpl.class);
+ TypedTable<String, OmKeyInfo> keyTable = mock(TypedTable.class);
+
+ TypedTable.TypedTableIterator mockKeyIter = mock(TypedTable
+ .TypedTableIterator.class);
+ TypedTable.TypedKeyValue mockKeyValue = mock(
+ TypedTable.TypedKeyValue.class);
+
+ when(keyTable.iterator()).thenReturn(mockKeyIter);
+ when(omMetadataManager.getKeyTable()).thenReturn(keyTable);
+ when(mockKeyIter.hasNext())
+ .thenReturn(true)
+ .thenReturn(true)
+ .thenReturn(true)
+ .thenReturn(false);
+ when(mockKeyIter.next()).thenReturn(mockKeyValue);
+ when(mockKeyValue.getValue())
+ .thenReturn(omKeyInfo1)
+ .thenReturn(omKeyInfo2)
+ .thenReturn(omKeyInfo3);
+
+ Pair<String, Boolean> result =
+ fileSizeCountTask.reprocess(omMetadataManager);
+ assertTrue(result.getRight());
+
+ assertEquals(3, fileCountBySizeDao.count());
+ Response response = utilizationEndpoint.getFileCounts(null, null, 0);
+ List<FileCountBySize> resultSet =
+ (List<FileCountBySize>) response.getEntity();
+ assertEquals(3, resultSet.size());
+ assertTrue(resultSet.stream().anyMatch(o -> o.getVolume().equals("vol1") &&
+ o.getBucket().equals("bucket1") && o.getFileSize() == 1024L &&
+ o.getCount() == 1L));
+ assertTrue(resultSet.stream().anyMatch(o -> o.getVolume().equals("vol1") &&
+ o.getBucket().equals("bucket1") && o.getFileSize() == 131072 &&
+ o.getCount() == 1L));
+ assertTrue(resultSet.stream().anyMatch(o -> o.getVolume().equals("vol2") &&
+ o.getBucket().equals("bucket1") && o.getFileSize() == 1024L &&
+ o.getCount() == 1L));
+
+ // Test for "volume" query param
+ response = utilizationEndpoint.getFileCounts("vol1", null, 0);
+ resultSet = (List<FileCountBySize>) response.getEntity();
+ assertEquals(2, resultSet.size());
+ assertTrue(resultSet.stream().allMatch(o -> o.getVolume().equals("vol1")));
+
+ // Test for non-existent volume
+ response = utilizationEndpoint.getFileCounts("vol", null, 0);
+ resultSet = (List<FileCountBySize>) response.getEntity();
+ assertEquals(0, resultSet.size());
+
+ // Test for "volume" + "bucket" query param
+ response = utilizationEndpoint.getFileCounts("vol1", "bucket1", 0);
+ resultSet = (List<FileCountBySize>) response.getEntity();
+ assertEquals(2, resultSet.size());
+ assertTrue(resultSet.stream().allMatch(o -> o.getVolume().equals("vol1") &&
+ o.getBucket().equals("bucket1")));
+
+ // Test for non-existent bucket
+ response = utilizationEndpoint.getFileCounts("vol1", "bucket", 0);
+ resultSet = (List<FileCountBySize>) response.getEntity();
+ assertEquals(0, resultSet.size());
+
+ // Test for "volume" + "bucket" + "fileSize" query params
+ response = utilizationEndpoint.getFileCounts("vol1", "bucket1", 131072);
+ resultSet = (List<FileCountBySize>) response.getEntity();
+ assertEquals(1, resultSet.size());
+ FileCountBySize o = resultSet.get(0);
+ assertTrue(o.getVolume().equals("vol1") && o.getBucket().equals(
+ "bucket1") && o.getFileSize() == 131072);
+
+ // Test for non-existent fileSize
+ response = utilizationEndpoint.getFileCounts("vol1", "bucket1", 1310725);
+ resultSet = (List<FileCountBySize>) response.getEntity();
+ assertEquals(0, resultSet.size());
+ }
+
+
private void waitAndCheckConditionAfterHeartbeat(Callable<Boolean> check)
throws Exception {
// if container report is processed first, and pipeline does not exist
diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/persistence/TestUtilizationSchemaDefinition.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/persistence/TestUtilizationSchemaDefinition.java
index 9e781da..ab21a35 100644
--- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/persistence/TestUtilizationSchemaDefinition.java
+++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/persistence/TestUtilizationSchemaDefinition.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.recon.persistence;
import static org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition.CLUSTER_GROWTH_DAILY_TABLE_NAME;
import static org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition.FILE_COUNT_BY_SIZE_TABLE_NAME;
import static org.hadoop.ozone.recon.schema.tables.ClusterGrowthDailyTable.CLUSTER_GROWTH_DAILY;
+import static org.hadoop.ozone.recon.schema.tables.FileCountBySizeTable.FILE_COUNT_BY_SIZE;
import static org.junit.Assert.assertEquals;
import java.sql.Connection;
@@ -33,11 +34,13 @@ import java.util.List;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
+import org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition;
import org.hadoop.ozone.recon.schema.tables.daos.ClusterGrowthDailyDao;
import org.hadoop.ozone.recon.schema.tables.daos.FileCountBySizeDao;
import org.hadoop.ozone.recon.schema.tables.pojos.ClusterGrowthDaily;
import org.hadoop.ozone.recon.schema.tables.pojos.FileCountBySize;
import org.hadoop.ozone.recon.schema.tables.records.FileCountBySizeRecord;
+import org.jooq.Record3;
import org.jooq.Table;
import org.jooq.UniqueKey;
import org.junit.Assert;
@@ -82,6 +85,10 @@ public class TestUtilizationSchemaDefinition extends AbstractReconSqlDBTest {
List<Pair<String, Integer>> expectedPairsFileCount = new ArrayList<>();
expectedPairsFileCount.add(
+ new ImmutablePair<>("volume", Types.VARCHAR));
+ expectedPairsFileCount.add(
+ new ImmutablePair<>("bucket", Types.VARCHAR));
+ expectedPairsFileCount.add(
new ImmutablePair<>("file_size", Types.BIGINT));
expectedPairsFileCount.add(
new ImmutablePair<>("count", Types.BIGINT));
@@ -93,7 +100,7 @@ public class TestUtilizationSchemaDefinition extends AbstractReconSqlDBTest {
"DATA_TYPE")));
}
assertEquals("Unexpected number of columns",
- 2, actualPairsFileCount.size());
+ 4, actualPairsFileCount.size());
assertEquals("Columns Do not Match ",
expectedPairsFileCount, actualPairsFileCount);
}
@@ -182,20 +189,31 @@ public class TestUtilizationSchemaDefinition extends AbstractReconSqlDBTest {
}
FileCountBySizeDao fileCountBySizeDao = getDao(FileCountBySizeDao.class);
+ UtilizationSchemaDefinition utilizationSchemaDefinition =
+ getSchemaDefinition(UtilizationSchemaDefinition.class);
FileCountBySize newRecord = new FileCountBySize();
+ newRecord.setVolume("vol1");
+ newRecord.setBucket("bucket1");
newRecord.setFileSize(1024L);
newRecord.setCount(1L);
fileCountBySizeDao.insert(newRecord);
- FileCountBySize dbRecord = fileCountBySizeDao.findById(1024L);
+ Record3<String, String, Long> recordToFind = utilizationSchemaDefinition
+ .getDSLContext().newRecord(FILE_COUNT_BY_SIZE.VOLUME,
+ FILE_COUNT_BY_SIZE.BUCKET,
+ FILE_COUNT_BY_SIZE.FILE_SIZE)
+ .value1("vol1")
+ .value2("bucket1")
+ .value3(1024L);
+ FileCountBySize dbRecord = fileCountBySizeDao.findById(recordToFind);
assertEquals(Long.valueOf(1), dbRecord.getCount());
dbRecord.setCount(2L);
fileCountBySizeDao.update(dbRecord);
- dbRecord = fileCountBySizeDao.findById(1024L);
+ dbRecord = fileCountBySizeDao.findById(recordToFind);
assertEquals(Long.valueOf(2), dbRecord.getCount());
Table<FileCountBySizeRecord> fileCountBySizeRecordTable =
diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java
index e0e37b5..1cfc0ad 100644
--- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java
+++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -25,20 +25,26 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.hdds.utils.db.TypedTable;
import org.apache.hadoop.ozone.recon.persistence.AbstractReconSqlDBTest;
import org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMUpdateEventBuilder;
+import org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition;
import org.hadoop.ozone.recon.schema.tables.daos.FileCountBySizeDao;
+import org.jooq.DSLContext;
+import org.jooq.Record3;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.AdditionalAnswers;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import static org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMDBUpdateAction.DELETE;
import static org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMDBUpdateAction.PUT;
import static org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMDBUpdateAction.UPDATE;
+import static org.hadoop.ozone.recon.schema.tables.FileCountBySizeTable.FILE_COUNT_BY_SIZE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -49,71 +55,41 @@ import static org.mockito.Mockito.when;
public class TestFileSizeCountTask extends AbstractReconSqlDBTest {
private FileCountBySizeDao fileCountBySizeDao;
+ private FileSizeCountTask fileSizeCountTask;
+ private DSLContext dslContext;
@Before
public void setUp() {
fileCountBySizeDao = getDao(FileCountBySizeDao.class);
- }
-
- @Test
- public void testCalculateBinIndex() {
- FileSizeCountTask fileSizeCountTask = mock(FileSizeCountTask.class);
-
- when(fileSizeCountTask.getMaxFileSizeUpperBound()).
- thenReturn(1125899906842624L); // 1 PB
- when(fileSizeCountTask.getOneKB()).thenReturn(1024L);
- when(fileSizeCountTask.getMaxBinSize()).thenReturn(42);
- when(fileSizeCountTask.calculateBinIndex(anyLong())).thenCallRealMethod();
- when(fileSizeCountTask.nextClosestPowerIndexOfTwo(
- anyLong())).thenCallRealMethod();
-
- long fileSize = 1024L; // 1 KB
- int binIndex = fileSizeCountTask.calculateBinIndex(fileSize);
- assertEquals(1, binIndex);
-
- fileSize = 1023L; // 1KB - 1B
- binIndex = fileSizeCountTask.calculateBinIndex(fileSize);
- assertEquals(0, binIndex);
-
- fileSize = 562949953421312L; // 512 TB
- binIndex = fileSizeCountTask.calculateBinIndex(fileSize);
- assertEquals(40, binIndex);
-
- fileSize = 562949953421313L; // (512 TB + 1B)
- binIndex = fileSizeCountTask.calculateBinIndex(fileSize);
- assertEquals(40, binIndex);
-
- fileSize = 562949953421311L; // (512 TB - 1B)
- binIndex = fileSizeCountTask.calculateBinIndex(fileSize);
- assertEquals(39, binIndex);
-
- fileSize = 1125899906842624L; // 1 PB - last (extra) bin
- binIndex = fileSizeCountTask.calculateBinIndex(fileSize);
- assertEquals(41, binIndex);
-
- fileSize = 100000L;
- binIndex = fileSizeCountTask.calculateBinIndex(fileSize);
- assertEquals(7, binIndex);
-
- fileSize = 1125899906842623L; // (1 PB - 1B)
- binIndex = fileSizeCountTask.calculateBinIndex(fileSize);
- assertEquals(40, binIndex);
-
- fileSize = 1125899906842624L * 4; // 4 PB - last extra bin
- binIndex = fileSizeCountTask.calculateBinIndex(fileSize);
- assertEquals(41, binIndex);
-
- fileSize = Long.MAX_VALUE; // extra bin
- binIndex = fileSizeCountTask.calculateBinIndex(fileSize);
- assertEquals(41, binIndex);
+ UtilizationSchemaDefinition utilizationSchemaDefinition =
+ getSchemaDefinition(UtilizationSchemaDefinition.class);
+ fileSizeCountTask =
+ new FileSizeCountTask(fileCountBySizeDao, utilizationSchemaDefinition);
+ dslContext = utilizationSchemaDefinition.getDSLContext();
+ // Truncate table before running each test
+ dslContext.truncate(FILE_COUNT_BY_SIZE);
}
@Test
public void testReprocess() throws IOException {
OmKeyInfo omKeyInfo1 = mock(OmKeyInfo.class);
given(omKeyInfo1.getKeyName()).willReturn("key1");
+ given(omKeyInfo1.getVolumeName()).willReturn("vol1");
+ given(omKeyInfo1.getBucketName()).willReturn("bucket1");
given(omKeyInfo1.getDataSize()).willReturn(1000L);
+ OmKeyInfo omKeyInfo2 = mock(OmKeyInfo.class);
+ given(omKeyInfo2.getKeyName()).willReturn("key2");
+ given(omKeyInfo2.getVolumeName()).willReturn("vol1");
+ given(omKeyInfo2.getBucketName()).willReturn("bucket1");
+ given(omKeyInfo2.getDataSize()).willReturn(100000L);
+
+ OmKeyInfo omKeyInfo3 = mock(OmKeyInfo.class);
+ given(omKeyInfo3.getKeyName()).willReturn("key3");
+ given(omKeyInfo3.getVolumeName()).willReturn("vol1");
+ given(omKeyInfo3.getBucketName()).willReturn("bucket1");
+ given(omKeyInfo3.getDataSize()).willReturn(1125899906842624L * 4); // 4PB
+
OMMetadataManager omMetadataManager = mock(OmMetadataManagerImpl.class);
TypedTable<String, OmKeyInfo> keyTable = mock(TypedTable.class);
@@ -124,30 +100,47 @@ public class TestFileSizeCountTask extends AbstractReconSqlDBTest {
when(keyTable.iterator()).thenReturn(mockKeyIter);
when(omMetadataManager.getKeyTable()).thenReturn(keyTable);
- when(mockKeyIter.hasNext()).thenReturn(true).thenReturn(false);
+ when(mockKeyIter.hasNext())
+ .thenReturn(true)
+ .thenReturn(true)
+ .thenReturn(true)
+ .thenReturn(false);
when(mockKeyIter.next()).thenReturn(mockKeyValue);
- when(mockKeyValue.getValue()).thenReturn(omKeyInfo1);
+ when(mockKeyValue.getValue())
+ .thenReturn(omKeyInfo1)
+ .thenReturn(omKeyInfo2)
+ .thenReturn(omKeyInfo3);
- FileSizeCountTask fileSizeCountTask =
- new FileSizeCountTask(fileCountBySizeDao);
Pair<String, Boolean> result =
fileSizeCountTask.reprocess(omMetadataManager);
assertTrue(result.getRight());
- long[] upperBoundCount = fileSizeCountTask.getUpperBoundCount();
- assertEquals(1, upperBoundCount[0]);
- for (int i = 1; i < upperBoundCount.length; i++) {
- assertEquals(0, upperBoundCount[i]);
- }
+ assertEquals(3, fileCountBySizeDao.count());
+ Record3<String, String, Long> recordToFind = dslContext
+ .newRecord(FILE_COUNT_BY_SIZE.VOLUME,
+ FILE_COUNT_BY_SIZE.BUCKET,
+ FILE_COUNT_BY_SIZE.FILE_SIZE)
+ .value1("vol1")
+ .value2("bucket1")
+ .value3(1024L);
+ assertEquals(1L,
+ fileCountBySizeDao.findById(recordToFind).getCount().longValue());
+ // file size upper bound for 100000L is 131072L (next highest power of 2)
+ recordToFind.value3(131072L);
+ assertEquals(1L,
+ fileCountBySizeDao.findById(recordToFind).getCount().longValue());
+ // file size upper bound for 4PB is Long.MAX_VALUE
+ recordToFind.value3(Long.MAX_VALUE);
+ assertEquals(1L,
+ fileCountBySizeDao.findById(recordToFind).getCount().longValue());
}
@Test
public void testProcess() {
- FileSizeCountTask fileSizeCountTask =
- new FileSizeCountTask(fileCountBySizeDao);
-
// Write 2 keys.
OmKeyInfo toBeDeletedKey = mock(OmKeyInfo.class);
+ given(toBeDeletedKey.getVolumeName()).willReturn("vol1");
+ given(toBeDeletedKey.getBucketName()).willReturn("bucket1");
given(toBeDeletedKey.getKeyName()).willReturn("deletedKey");
given(toBeDeletedKey.getDataSize()).willReturn(2000L); // Bin 1
OMDBUpdateEvent event = new OMUpdateEventBuilder()
@@ -157,6 +150,8 @@ public class TestFileSizeCountTask extends AbstractReconSqlDBTest {
.build();
OmKeyInfo toBeUpdatedKey = mock(OmKeyInfo.class);
+ given(toBeUpdatedKey.getVolumeName()).willReturn("vol1");
+ given(toBeUpdatedKey.getBucketName()).willReturn("bucket1");
given(toBeUpdatedKey.getKeyName()).willReturn("updatedKey");
given(toBeUpdatedKey.getDataSize()).willReturn(10000L); // Bin 4
OMDBUpdateEvent event2 = new OMUpdateEventBuilder()
@@ -170,12 +165,25 @@ public class TestFileSizeCountTask extends AbstractReconSqlDBTest {
fileSizeCountTask.process(omUpdateEventBatch);
// Verify 2 keys are in correct bins.
- long[] upperBoundCount = fileSizeCountTask.getUpperBoundCount();
- assertEquals(1, upperBoundCount[4]); // updatedKey
- assertEquals(1, upperBoundCount[1]); // deletedKey
+ assertEquals(2, fileCountBySizeDao.count());
+ Record3<String, String, Long> recordToFind = dslContext
+ .newRecord(FILE_COUNT_BY_SIZE.VOLUME,
+ FILE_COUNT_BY_SIZE.BUCKET,
+ FILE_COUNT_BY_SIZE.FILE_SIZE)
+ .value1("vol1")
+ .value2("bucket1")
+ .value3(2048L);
+ assertEquals(1L,
+ fileCountBySizeDao.findById(recordToFind).getCount().longValue());
+ // file size upper bound for 10000L is 16384L (next highest power of 2)
+ recordToFind.value3(16384L);
+ assertEquals(1L,
+ fileCountBySizeDao.findById(recordToFind).getCount().longValue());
// Add new key.
OmKeyInfo newKey = mock(OmKeyInfo.class);
+ given(newKey.getVolumeName()).willReturn("vol1");
+ given(newKey.getBucketName()).willReturn("bucket1");
given(newKey.getKeyName()).willReturn("newKey");
given(newKey.getDataSize()).willReturn(1000L); // Bin 0
OMDBUpdateEvent putEvent = new OMUpdateEventBuilder()
@@ -186,6 +194,8 @@ public class TestFileSizeCountTask extends AbstractReconSqlDBTest {
// Update existing key.
OmKeyInfo updatedKey = mock(OmKeyInfo.class);
+ given(updatedKey.getVolumeName()).willReturn("vol1");
+ given(updatedKey.getBucketName()).willReturn("bucket1");
given(updatedKey.getKeyName()).willReturn("updatedKey");
given(updatedKey.getDataSize()).willReturn(50000L); // Bin 6
OMDBUpdateEvent updateEvent = new OMUpdateEventBuilder()
@@ -206,10 +216,189 @@ public class TestFileSizeCountTask extends AbstractReconSqlDBTest {
Arrays.asList(updateEvent, putEvent, deleteEvent));
fileSizeCountTask.process(omUpdateEventBatch);
- upperBoundCount = fileSizeCountTask.getUpperBoundCount();
- assertEquals(1, upperBoundCount[0]); // newKey
- assertEquals(0, upperBoundCount[1]); // deletedKey
- assertEquals(0, upperBoundCount[4]); // updatedKey old
- assertEquals(1, upperBoundCount[6]); // updatedKey new
+ assertEquals(4, fileCountBySizeDao.count());
+ recordToFind.value3(1024L);
+ assertEquals(1, fileCountBySizeDao.findById(recordToFind)
+ .getCount().longValue());
+ recordToFind.value3(2048L);
+ assertEquals(0, fileCountBySizeDao.findById(recordToFind)
+ .getCount().longValue());
+ recordToFind.value3(16384L);
+ assertEquals(0, fileCountBySizeDao.findById(recordToFind)
+ .getCount().longValue());
+ recordToFind.value3(65536L);
+ assertEquals(1, fileCountBySizeDao.findById(recordToFind)
+ .getCount().longValue());
+ }
+
+ @Test
+ public void testReprocessAtScale() throws IOException {
+ // generate mocks for 2 volumes, 500 buckets each volume
+ // and 42 keys in each bucket.
+ List<OmKeyInfo> omKeyInfoList = new ArrayList<>();
+ List<Boolean> hasNextAnswer = new ArrayList<>();
+ for (int volIndex = 1; volIndex <= 2; volIndex++) {
+ for (int bktIndex = 1; bktIndex <= 500; bktIndex++) {
+ for (int keyIndex = 1; keyIndex <= 42; keyIndex++) {
+ OmKeyInfo omKeyInfo = mock(OmKeyInfo.class);
+ given(omKeyInfo.getKeyName()).willReturn("key" + keyIndex);
+ given(omKeyInfo.getVolumeName()).willReturn("vol" + volIndex);
+ given(omKeyInfo.getBucketName()).willReturn("bucket" + bktIndex);
+ // Place keys in each bin
+ long fileSize = (long)Math.pow(2, keyIndex + 9) - 1L;
+ given(omKeyInfo.getDataSize()).willReturn(fileSize);
+ omKeyInfoList.add(omKeyInfo);
+ hasNextAnswer.add(true);
+ }
+ }
+ }
+ hasNextAnswer.add(false);
+
+ OMMetadataManager omMetadataManager = mock(OmMetadataManagerImpl.class);
+ TypedTable<String, OmKeyInfo> keyTable = mock(TypedTable.class);
+
+ TypedTable.TypedTableIterator mockKeyIter = mock(TypedTable
+ .TypedTableIterator.class);
+ TypedTable.TypedKeyValue mockKeyValue = mock(
+ TypedTable.TypedKeyValue.class);
+
+ when(keyTable.iterator()).thenReturn(mockKeyIter);
+ when(omMetadataManager.getKeyTable()).thenReturn(keyTable);
+ when(mockKeyIter.hasNext())
+ .thenAnswer(AdditionalAnswers.returnsElementsOf(hasNextAnswer));
+ when(mockKeyIter.next()).thenReturn(mockKeyValue);
+ when(mockKeyValue.getValue())
+ .thenAnswer(AdditionalAnswers.returnsElementsOf(omKeyInfoList));
+
+ Pair<String, Boolean> result =
+ fileSizeCountTask.reprocess(omMetadataManager);
+ assertTrue(result.getRight());
+
+ // 2 volumes * 500 buckets * 42 bins = 42000 rows
+ assertEquals(42000, fileCountBySizeDao.count());
+ Record3<String, String, Long> recordToFind = dslContext
+ .newRecord(FILE_COUNT_BY_SIZE.VOLUME,
+ FILE_COUNT_BY_SIZE.BUCKET,
+ FILE_COUNT_BY_SIZE.FILE_SIZE)
+ .value1("vol1")
+ .value2("bucket1")
+ .value3(1024L);
+ assertEquals(1L,
+ fileCountBySizeDao.findById(recordToFind).getCount().longValue());
+ // file size upper bound for 100000L is 131072L (next highest power of 2)
+ recordToFind.value1("vol1");
+ recordToFind.value3(131072L);
+ assertEquals(1L,
+ fileCountBySizeDao.findById(recordToFind).getCount().longValue());
+ recordToFind.value2("bucket500");
+ recordToFind.value3(Long.MAX_VALUE);
+ assertEquals(1L,
+ fileCountBySizeDao.findById(recordToFind).getCount().longValue());
+ }
+
+ @Test
+ public void testProcessAtScale() {
+ // Write 10000 keys.
+ List<OMDBUpdateEvent> omDbEventList = new ArrayList<>();
+ List<OmKeyInfo> omKeyInfoList = new ArrayList<>();
+ for (int volIndex = 1; volIndex <= 10; volIndex++) {
+ for (int bktIndex = 1; bktIndex <= 100; bktIndex++) {
+ for (int keyIndex = 1; keyIndex <= 10; keyIndex++) {
+ OmKeyInfo omKeyInfo = mock(OmKeyInfo.class);
+ given(omKeyInfo.getKeyName()).willReturn("key" + keyIndex);
+ given(omKeyInfo.getVolumeName()).willReturn("vol" + volIndex);
+ given(omKeyInfo.getBucketName()).willReturn("bucket" + bktIndex);
+ // Place keys in each bin
+ long fileSize = (long)Math.pow(2, keyIndex + 9) - 1L;
+ given(omKeyInfo.getDataSize()).willReturn(fileSize);
+ omKeyInfoList.add(omKeyInfo);
+ omDbEventList.add(new OMUpdateEventBuilder()
+ .setAction(PUT)
+ .setKey("key" + keyIndex)
+ .setValue(omKeyInfo)
+ .build());
+ }
+ }
+ }
+
+ OMUpdateEventBatch omUpdateEventBatch =
+ new OMUpdateEventBatch(omDbEventList);
+ fileSizeCountTask.process(omUpdateEventBatch);
+
+ // Verify 2 keys are in correct bins.
+ assertEquals(10000, fileCountBySizeDao.count());
+ Record3<String, String, Long> recordToFind = dslContext
+ .newRecord(FILE_COUNT_BY_SIZE.VOLUME,
+ FILE_COUNT_BY_SIZE.BUCKET,
+ FILE_COUNT_BY_SIZE.FILE_SIZE)
+ .value1("vol1")
+ .value2("bucket1")
+ .value3(2048L);
+ assertEquals(1L,
+ fileCountBySizeDao.findById(recordToFind).getCount().longValue());
+ recordToFind.value1("vol10");
+ recordToFind.value2("bucket100");
+ // file size upper bound for 10000L is 16384L (next highest power of 2)
+ recordToFind.value3(16384L);
+ assertEquals(1L,
+ fileCountBySizeDao.findById(recordToFind).getCount().longValue());
+
+ // Process 500 deletes and 500 updates
+ omDbEventList = new ArrayList<>();
+ for (int volIndex = 1; volIndex <= 1; volIndex++) {
+ for (int bktIndex = 1; bktIndex <= 100; bktIndex++) {
+ for (int keyIndex = 1; keyIndex <= 10; keyIndex++) {
+ OmKeyInfo omKeyInfo = mock(OmKeyInfo.class);
+ given(omKeyInfo.getKeyName()).willReturn("key" + keyIndex);
+ given(omKeyInfo.getVolumeName()).willReturn("vol" + volIndex);
+ given(omKeyInfo.getBucketName()).willReturn("bucket" + bktIndex);
+ if (keyIndex <= 5) {
+ long fileSize = (long)Math.pow(2, keyIndex + 9) - 1L;
+ given(omKeyInfo.getDataSize()).willReturn(fileSize);
+ omDbEventList.add(new OMUpdateEventBuilder()
+ .setAction(DELETE)
+ .setKey("key" + keyIndex)
+ .setValue(omKeyInfo)
+ .build());
+ } else {
+ // update all the files with keyIndex > 5 to filesize 1023L
+ // so that they get into first bin
+ given(omKeyInfo.getDataSize()).willReturn(1023L);
+ omDbEventList.add(new OMUpdateEventBuilder()
+ .setAction(UPDATE)
+ .setKey("key" + keyIndex)
+ .setValue(omKeyInfo)
+ .setOldValue(
+ omKeyInfoList.get((volIndex * bktIndex) + keyIndex))
+ .build());
+ }
+ }
+ }
+ }
+
+ omUpdateEventBatch = new OMUpdateEventBatch(omDbEventList);
+ fileSizeCountTask.process(omUpdateEventBatch);
+
+ assertEquals(10000, fileCountBySizeDao.count());
+ recordToFind = dslContext
+ .newRecord(FILE_COUNT_BY_SIZE.VOLUME,
+ FILE_COUNT_BY_SIZE.BUCKET,
+ FILE_COUNT_BY_SIZE.FILE_SIZE)
+ .value1("vol1")
+ .value2("bucket1")
+ .value3(1024L);
+ // The update events on keys 6-10 should now put them under first bin 1024L
+ assertEquals(5, fileCountBySizeDao.findById(recordToFind)
+ .getCount().longValue());
+ recordToFind.value2("bucket100");
+ assertEquals(5, fileCountBySizeDao.findById(recordToFind)
+ .getCount().longValue());
+ recordToFind.value3(2048L);
+ assertEquals(0, fileCountBySizeDao.findById(recordToFind)
+ .getCount().longValue());
+ // Volumes 2 - 10 should not be affected by this process
+ recordToFind.value1("vol2");
+ assertEquals(1, fileCountBySizeDao.findById(recordToFind)
+ .getCount().longValue());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org