You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by av...@apache.org on 2020/08/12 14:25:27 UTC
[hadoop-ozone] branch master updated: HDDS-4009. Recon Overview
page: The volume, bucket and key counts are not accurate (#1305)
This is an automated email from the ASF dual-hosted git repository.
avijayan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/master by this push:
new eb70d9e HDDS-4009. Recon Overview page: The volume, bucket and key counts are not accurate (#1305)
eb70d9e is described below
commit eb70d9ee8482f5e2deb4dabef0d801b147dc980f
Author: Vivek Ratnavel Subramanian <vi...@gmail.com>
AuthorDate: Wed Aug 12 07:25:16 2020 -0700
HDDS-4009. Recon Overview page: The volume, bucket and key counts are not accurate (#1305)
---
.../apache/hadoop/hdds/utils/db/DBDefinition.java | 27 +++
.../org/apache/hadoop/hdds/utils/db/RDBTable.java | 13 ++
.../org/apache/hadoop/hdds/utils/db/Table.java | 13 ++
.../apache/hadoop/hdds/utils/db/TypedTable.java | 13 ++
.../ozone/recon/schema/StatsSchemaDefinition.java | 12 +-
.../recon/schema/UtilizationSchemaDefinition.java | 13 +-
.../apache/hadoop/ozone/recon/ReconConstants.java | 8 +-
.../hadoop/ozone/recon/ReconControllerModule.java | 2 +
.../org/apache/hadoop/ozone/recon/ReconUtils.java | 32 ++++
.../ozone/recon/api/ClusterStateEndpoint.java | 47 +++---
.../hadoop/ozone/recon/api/ContainerEndpoint.java | 2 +-
.../spi/impl/ContainerDBServiceProviderImpl.java | 33 +---
.../ozone/recon/tasks/FileSizeCountTask.java | 4 +-
.../hadoop/ozone/recon/tasks/OMDBUpdateEvent.java | 2 +-
.../ozone/recon/tasks/OMDBUpdatesHandler.java | 92 +++++-----
.../hadoop/ozone/recon/tasks/TableCountTask.java | 185 +++++++++++++++++++++
.../src/views/overview/overview.tsx | 2 +-
.../hadoop/ozone/recon/api/TestEndpoints.java | 36 +++-
.../ozone/recon/tasks/TestOMDBUpdatesHandler.java | 54 +++++-
.../ozone/recon/tasks/TestTableCountTask.java | 178 ++++++++++++++++++++
20 files changed, 633 insertions(+), 135 deletions(-)
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBDefinition.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBDefinition.java
index 3058261..6d7d09e 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBDefinition.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBDefinition.java
@@ -21,6 +21,10 @@ package org.apache.hadoop.hdds.utils.db;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.function.Function;
+
/**
* Simple interface to provide information to create a DBStore..
*/
@@ -43,4 +47,27 @@ public interface DBDefinition {
*/
DBColumnFamilyDefinition[] getColumnFamilies();
+ /**
+ * Get the key type class for the given table.
+ * @param table table name
+ * @return the class of key type of the given table wrapped in an
+ * {@link Optional}
+ */
+ default Optional<Class> getKeyType(String table) {
+ return Arrays.stream(getColumnFamilies()).filter(cf -> cf.getName().equals(
+ table)).map((Function<DBColumnFamilyDefinition, Class>)
+ DBColumnFamilyDefinition::getKeyType).findAny();
+ }
+
+ /**
+ * Get the value type class for the given table.
+ * @param table table name
+ * @return the class of value type of the given table wrapped in an
+ * {@link Optional}
+ */
+ default Optional<Class> getValueType(String table) {
+ return Arrays.stream(getColumnFamilies()).filter(cf -> cf.getName().equals(
+ table)).map((Function<DBColumnFamilyDefinition, Class>)
+ DBColumnFamilyDefinition::getValueType).findAny();
+ }
}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
index 4dbb59a..8f2a1f8 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
@@ -157,6 +157,19 @@ class RDBTable implements Table<byte[], byte[]> {
}
}
+ /**
+ * Skip checking cache and get the value mapped to the given key in byte
+ * array or returns null if the key is not found.
+ *
+ * @param bytes metadata key
+ * @return value in byte array or null if the key is not found.
+ * @throws IOException on Failure
+ */
+ @Override
+ public byte[] getSkipCache(byte[] bytes) throws IOException {
+ return get(bytes);
+ }
+
@Override
public byte[] getIfExist(byte[] key) throws IOException {
try {
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java
index 72111f9..0aa852f 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java
@@ -83,6 +83,19 @@ public interface Table<KEY, VALUE> extends AutoCloseable {
/**
+ * Skip checking cache and get the value mapped to the given key in byte
+ * array or returns null if the key is not found.
+ *
+ * @param key metadata key
+ * @return value in byte array or null if the key is not found.
+ * @throws IOException on Failure
+ */
+ default VALUE getSkipCache(KEY key) throws IOException {
+ throw new NotImplementedException("getSkipCache is not implemented");
+ }
+
+
+ /**
* Returns the value mapped to the given key in byte array or returns null
* if the key is not found.
*
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
index 1451946..0d22c3f 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
@@ -178,6 +178,19 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, VALUE> {
}
/**
+ * Skip checking cache and get the value mapped to the given key in byte
+ * array or returns null if the key is not found.
+ *
+ * @param key metadata key
+ * @return value in byte array or null if the key is not found.
+ * @throws IOException on Failure
+ */
+ @Override
+ public VALUE getSkipCache(KEY key) throws IOException {
+ return getFromTable(key);
+ }
+
+ /**
* This method returns the value if it exists in cache, if it
* does not, get the value from the underlying rockdb table. If it
* exists in cache, it returns the same reference of the cached value.
diff --git a/hadoop-ozone/recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/StatsSchemaDefinition.java b/hadoop-ozone/recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/StatsSchemaDefinition.java
index adfaca6..55f3b93 100644
--- a/hadoop-ozone/recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/StatsSchemaDefinition.java
+++ b/hadoop-ozone/recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/StatsSchemaDefinition.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -22,6 +22,7 @@ import static org.hadoop.ozone.recon.codegen.SqlDbUtils.TABLE_EXISTS_CHECK;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import org.jooq.DSLContext;
import org.jooq.impl.DSL;
import org.jooq.impl.SQLDataType;
@@ -36,6 +37,7 @@ import java.sql.SQLException;
public class StatsSchemaDefinition implements ReconSchemaDefinition {
public static final String GLOBAL_STATS_TABLE_NAME = "GLOBAL_STATS";
+ private DSLContext dslContext;
private final DataSource dataSource;
@Inject
@@ -46,17 +48,17 @@ public class StatsSchemaDefinition implements ReconSchemaDefinition {
@Override
public void initializeSchema() throws SQLException {
Connection conn = dataSource.getConnection();
+ dslContext = DSL.using(conn);
if (!TABLE_EXISTS_CHECK.test(conn, GLOBAL_STATS_TABLE_NAME)) {
- createGlobalStatsTable(conn);
+ createGlobalStatsTable();
}
}
/**
* Create the Ozone Global Stats table.
- * @param conn connection
*/
- private void createGlobalStatsTable(Connection conn) {
- DSL.using(conn).createTableIfNotExists(GLOBAL_STATS_TABLE_NAME)
+ private void createGlobalStatsTable() {
+ dslContext.createTableIfNotExists(GLOBAL_STATS_TABLE_NAME)
.column("key", SQLDataType.VARCHAR(255))
.column("value", SQLDataType.BIGINT)
.column("last_updated_timestamp", SQLDataType.TIMESTAMP)
diff --git a/hadoop-ozone/recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/UtilizationSchemaDefinition.java b/hadoop-ozone/recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/UtilizationSchemaDefinition.java
index 92de19e..b2b2881 100644
--- a/hadoop-ozone/recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/UtilizationSchemaDefinition.java
+++ b/hadoop-ozone/recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/UtilizationSchemaDefinition.java
@@ -63,14 +63,14 @@ public class UtilizationSchemaDefinition implements ReconSchemaDefinition {
Connection conn = dataSource.getConnection();
dslContext = DSL.using(conn);
if (!TABLE_EXISTS_CHECK.test(conn, FILE_COUNT_BY_SIZE_TABLE_NAME)) {
- createFileSizeCountTable(conn);
+ createFileSizeCountTable();
}
if (!TABLE_EXISTS_CHECK.test(conn, CLUSTER_GROWTH_DAILY_TABLE_NAME)) {
- createClusterGrowthTable(conn);
+ createClusterGrowthTable();
}
}
- private void createClusterGrowthTable(Connection conn) {
+ private void createClusterGrowthTable() {
dslContext.createTableIfNotExists(CLUSTER_GROWTH_DAILY_TABLE_NAME)
.column("timestamp", SQLDataType.TIMESTAMP)
.column("datanode_id", SQLDataType.INTEGER)
@@ -85,7 +85,7 @@ public class UtilizationSchemaDefinition implements ReconSchemaDefinition {
.execute();
}
- private void createFileSizeCountTable(Connection conn) {
+ private void createFileSizeCountTable() {
dslContext.createTableIfNotExists(FILE_COUNT_BY_SIZE_TABLE_NAME)
.column("volume", SQLDataType.VARCHAR(64))
.column("bucket", SQLDataType.VARCHAR(64))
@@ -96,6 +96,11 @@ public class UtilizationSchemaDefinition implements ReconSchemaDefinition {
.execute();
}
+ /**
+ * Returns the DSL context.
+ *
+ * @return dslContext
+ */
public DSLContext getDSLContext() {
return dslContext;
}
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
index 972399f..0cbc61f 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconConstants.java
@@ -32,13 +32,11 @@ public final class ReconConstants {
public static final String RECON_CONTAINER_KEY_DB = "recon-container-key.db";
- public static final String CONTAINER_COUNT_KEY = "totalCount";
+ public static final String CONTAINER_COUNT_KEY = "containerCount";
- public static final String RECON_OM_SNAPSHOT_DB =
- "om.snapshot.db";
+ public static final String RECON_OM_SNAPSHOT_DB = "om.snapshot.db";
- public static final String CONTAINER_KEY_TABLE =
- "containerKeyTable";
+ public static final String CONTAINER_KEY_TABLE = "containerKeyTable";
public static final String CONTAINER_KEY_COUNT_TABLE =
"containerKeyCountTable";
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
index 1199630..8991767 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.ozone.recon.spi.impl.ReconContainerDBProvider;
import org.apache.hadoop.ozone.recon.spi.impl.StorageContainerServiceProviderImpl;
import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTask;
import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTask;
+import org.apache.hadoop.ozone.recon.tasks.TableCountTask;
import org.apache.hadoop.ozone.recon.tasks.ReconOmTask;
import org.apache.hadoop.ozone.recon.tasks.ReconTaskController;
import org.apache.hadoop.ozone.recon.tasks.ReconTaskControllerImpl;
@@ -116,6 +117,7 @@ public class ReconControllerModule extends AbstractModule {
Multibinder.newSetBinder(binder(), ReconOmTask.class);
taskBinder.addBinding().to(ContainerKeyMapperTask.class);
taskBinder.addBinding().to(FileSizeCountTask.class);
+ taskBinder.addBinding().to(TableCountTask.class);
}
}
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
index cba7428..b6e4d7c 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
@@ -29,6 +29,7 @@ import java.net.URL;
import java.net.URLConnection;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.sql.Timestamp;
import java.util.zip.GZIPOutputStream;
import org.apache.hadoop.hdds.HddsConfigKeys;
@@ -44,8 +45,14 @@ import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import static org.apache.hadoop.hdds.server.ServerUtils.getDirectoryFromConfig;
import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_DB_DIR;
+import static org.jooq.impl.DSL.currentTimestamp;
+import static org.jooq.impl.DSL.select;
+import static org.jooq.impl.DSL.using;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.hadoop.ozone.recon.schema.tables.daos.GlobalStatsDao;
+import org.hadoop.ozone.recon.schema.tables.pojos.GlobalStats;
+import org.jooq.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -266,4 +273,29 @@ public class ReconUtils {
new File(reconDbDir.getPath(), lastKnownSnapshotFileName);
}
+ /**
+ * Upsert row in GlobalStats table.
+ *
+ * @param sqlConfiguration
+ * @param globalStatsDao
+ * @param key
+ * @param count
+ */
+ public static void upsertGlobalStatsTable(Configuration sqlConfiguration,
+ GlobalStatsDao globalStatsDao,
+ String key,
+ Long count) {
+ // Get the current timestamp
+ Timestamp now =
+ using(sqlConfiguration).fetchValue(select(currentTimestamp()));
+ GlobalStats record = globalStatsDao.fetchOneByKey(key);
+ GlobalStats newRecord = new GlobalStats(key, count, now);
+
+ // Insert a new record for key if it does not exist
+ if (record == null) {
+ globalStatsDao.insert(newRecord);
+ } else {
+ globalStatsDao.update(newRecord);
+ }
+ }
}
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ClusterStateEndpoint.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ClusterStateEndpoint.java
index 918ee18..de0028c 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ClusterStateEndpoint.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ClusterStateEndpoint.java
@@ -24,10 +24,12 @@ import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
import org.apache.hadoop.ozone.recon.api.types.ClusterStateResponse;
import org.apache.hadoop.ozone.recon.api.types.DatanodeStorageReport;
-import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
import org.apache.hadoop.ozone.recon.scm.ReconContainerManager;
import org.apache.hadoop.ozone.recon.scm.ReconNodeManager;
import org.apache.hadoop.ozone.recon.scm.ReconPipelineManager;
+import org.apache.hadoop.ozone.recon.tasks.TableCountTask;
+import org.hadoop.ozone.recon.schema.tables.daos.GlobalStatsDao;
+import org.hadoop.ozone.recon.schema.tables.pojos.GlobalStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,6 +41,10 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.util.List;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.BUCKET_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.VOLUME_TABLE;
+
/**
* Endpoint to fetch current state of ozone cluster.
*/
@@ -52,17 +58,17 @@ public class ClusterStateEndpoint {
private ReconNodeManager nodeManager;
private ReconPipelineManager pipelineManager;
private ReconContainerManager containerManager;
- private ReconOMMetadataManager omMetadataManager;
+ private GlobalStatsDao globalStatsDao;
@Inject
ClusterStateEndpoint(OzoneStorageContainerManager reconSCM,
- ReconOMMetadataManager omMetadataManager) {
+ GlobalStatsDao globalStatsDao) {
this.nodeManager =
(ReconNodeManager) reconSCM.getScmNodeManager();
this.pipelineManager = (ReconPipelineManager) reconSCM.getPipelineManager();
this.containerManager =
(ReconContainerManager) reconSCM.getContainerManager();
- this.omMetadataManager = omMetadataManager;
+ this.globalStatsDao = globalStatsDao;
}
/**
@@ -80,25 +86,20 @@ public class ClusterStateEndpoint {
new DatanodeStorageReport(stats.getCapacity().get(),
stats.getScmUsed().get(), stats.getRemaining().get());
ClusterStateResponse.Builder builder = ClusterStateResponse.newBuilder();
- if (omMetadataManager.isOmTablesInitialized()) {
- try {
- builder.setVolumes(
- omMetadataManager.getVolumeTable().getEstimatedKeyCount());
- } catch (Exception ex) {
- LOG.error("Unable to get Volumes count in ClusterStateResponse.", ex);
- }
- try {
- builder.setBuckets(
- omMetadataManager.getBucketTable().getEstimatedKeyCount());
- } catch (Exception ex) {
- LOG.error("Unable to get Buckets count in ClusterStateResponse.", ex);
- }
- try {
- builder.setKeys(
- omMetadataManager.getKeyTable().getEstimatedKeyCount());
- } catch (Exception ex) {
- LOG.error("Unable to get Keys count in ClusterStateResponse.", ex);
- }
+ GlobalStats volumeRecord = globalStatsDao.findById(
+ TableCountTask.getRowKeyFromTable(VOLUME_TABLE));
+ GlobalStats bucketRecord = globalStatsDao.findById(
+ TableCountTask.getRowKeyFromTable(BUCKET_TABLE));
+ GlobalStats keyRecord = globalStatsDao.findById(
+ TableCountTask.getRowKeyFromTable(KEY_TABLE));
+ if (volumeRecord != null) {
+ builder.setVolumes(volumeRecord.getValue());
+ }
+ if (bucketRecord != null) {
+ builder.setBuckets(bucketRecord.getValue());
+ }
+ if (keyRecord != null) {
+ builder.setKeys(keyRecord.getValue());
}
ClusterStateResponse response = builder
.setStorageReport(storageReport)
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java
index c534062..1778b84 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java
@@ -157,7 +157,7 @@ public class ContainerEndpoint {
// Directly calling get() on the Key table instead of iterating since
// only full keys are supported now. When we change to using a prefix
// of the key, this needs to change to prefix seek.
- OmKeyInfo omKeyInfo = omMetadataManager.getKeyTable().get(
+ OmKeyInfo omKeyInfo = omMetadataManager.getKeyTable().getSkipCache(
containerKeyPrefix.getKeyPrefix());
if (null != omKeyInfo) {
// Filter keys by version.
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ContainerDBServiceProviderImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ContainerDBServiceProviderImpl.java
index aeefeef..6360cf2 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ContainerDBServiceProviderImpl.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ContainerDBServiceProviderImpl.java
@@ -22,13 +22,9 @@ import static org.apache.hadoop.ozone.recon.ReconConstants.CONTAINER_COUNT_KEY;
import static org.apache.hadoop.ozone.recon.spi.impl.ReconContainerDBProvider.getNewDBStore;
import static org.apache.hadoop.ozone.recon.spi.impl.ReconDBDefinition.CONTAINER_KEY;
import static org.apache.hadoop.ozone.recon.spi.impl.ReconDBDefinition.CONTAINER_KEY_COUNT;
-import static org.jooq.impl.DSL.currentTimestamp;
-import static org.jooq.impl.DSL.select;
-import static org.jooq.impl.DSL.using;
import java.io.File;
import java.io.IOException;
-import java.sql.Timestamp;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -38,6 +34,7 @@ import javax.inject.Singleton;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.recon.ReconUtils;
import org.apache.hadoop.ozone.recon.api.types.ContainerKeyPrefix;
import org.apache.hadoop.ozone.recon.api.types.ContainerMetadata;
import org.apache.hadoop.ozone.recon.spi.ContainerDBServiceProvider;
@@ -152,7 +149,7 @@ public class ContainerDBServiceProviderImpl
*
* @param containerKeyPrefix the containerID, key-prefix tuple.
* @param count Count of the keys matching that prefix.
- * @throws IOException
+ * @throws IOException on failure.
*/
@Override
public void storeContainerKeyMapping(ContainerKeyPrefix containerKeyPrefix,
@@ -166,7 +163,7 @@ public class ContainerDBServiceProviderImpl
*
* @param containerID the containerID.
* @param count count of the keys within the given containerID.
- * @throws IOException
+ * @throws IOException on failure.
*/
@Override
public void storeContainerKeyCount(Long containerID, Long count)
@@ -179,7 +176,7 @@ public class ContainerDBServiceProviderImpl
*
* @param containerID the given containerID.
* @return count of keys within the given containerID.
- * @throws IOException
+ * @throws IOException on failure.
*/
@Override
public long getKeyCountForContainer(Long containerID) throws IOException {
@@ -192,7 +189,7 @@ public class ContainerDBServiceProviderImpl
*
* @param containerID the given containerID.
* @return if the given ContainerID exists or not.
- * @throws IOException
+ * @throws IOException on failure.
*/
@Override
public boolean doesContainerExists(Long containerID) throws IOException {
@@ -205,7 +202,7 @@ public class ContainerDBServiceProviderImpl
*
* @param containerKeyPrefix the containerID, key-prefix tuple.
* @return count of keys matching the containerID, key-prefix.
- * @throws IOException
+ * @throws IOException on failure.
*/
@Override
public Integer getCountForContainerKeyPrefix(
@@ -305,7 +302,7 @@ public class ContainerDBServiceProviderImpl
* @param prevContainer containerID after which the
* list of containers are scanned.
* @return Map of containerID -> containerMetadata.
- * @throws IOException
+ * @throws IOException on failure.
*/
@Override
public Map<Long, ContainerMetadata> getContainers(int limit,
@@ -385,20 +382,8 @@ public class ContainerDBServiceProviderImpl
*/
@Override
public void storeContainerCount(Long count) {
- // Get the current timestamp
- Timestamp now =
- using(sqlConfiguration).fetchValue(select(currentTimestamp()));
- GlobalStats containerCountRecord =
- globalStatsDao.fetchOneByKey(CONTAINER_COUNT_KEY);
- GlobalStats globalStatsRecord =
- new GlobalStats(CONTAINER_COUNT_KEY, count, now);
-
- // Insert a new record for CONTAINER_COUNT_KEY if it does not exist
- if (containerCountRecord == null) {
- globalStatsDao.insert(globalStatsRecord);
- } else {
- globalStatsDao.update(globalStatsRecord);
- }
+ ReconUtils.upsertGlobalStatsTable(sqlConfiguration, globalStatsDao,
+ CONTAINER_COUNT_KEY, count);
}
/**
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTask.java
index 80b2526..7092c54 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTask.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTask.java
@@ -243,8 +243,8 @@ public class FileSizeCountTask implements ReconOmTask {
Map<FileSizeCountKey, Long>
fileSizeCountMap) {
if (omKeyInfo == null) {
- LOG.warn("Unexpected error while handling DELETE key event. Key not " +
- "found in Recon OM DB : {}", key);
+ LOG.warn("Deleting a key not found while handling DELETE key event. Key" +
+ " not found in Recon OM DB : {}", key);
} else {
FileSizeCountKey countKey = getFileSizeCountKey(omKeyInfo);
Long count = fileSizeCountMap.containsKey(countKey) ?
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdateEvent.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdateEvent.java
index 949439c..f32b04e 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdateEvent.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdateEvent.java
@@ -22,7 +22,7 @@ import java.util.Objects;
/**
* A class used to encapsulate a single OM DB update event.
- * Currently only PUT and DELETE are supported.
+ * Currently PUT, DELETE and UPDATE are supported.
* @param <KEY> Type of Key.
* @param <VALUE> Type of Value.
*/
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdatesHandler.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdatesHandler.java
index 82e1ae8..34c4c33 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdatesHandler.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdatesHandler.java
@@ -18,9 +18,6 @@
package org.apache.hadoop.ozone.recon.tasks;
-import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.BUCKET_TABLE;
-import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE;
-import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.VOLUME_TABLE;
import static org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMDBUpdateAction.DELETE;
import static org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMDBUpdateAction.PUT;
import static org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMDBUpdateAction.UPDATE;
@@ -29,13 +26,12 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
+import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.om.OMMetadataManager;
-import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
-import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.codec.OMDBDefinition;
import org.apache.hadoop.hdds.utils.db.CodecRegistry;
-import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
import org.slf4j.Logger;
@@ -53,16 +49,17 @@ public class OMDBUpdatesHandler extends WriteBatch.Handler {
private CodecRegistry codecRegistry;
private OMMetadataManager omMetadataManager;
private List<OMDBUpdateEvent> omdbUpdateEvents = new ArrayList<>();
+ private OMDBDefinition omdbDefinition;
public OMDBUpdatesHandler(OMMetadataManager metadataManager) {
omMetadataManager = metadataManager;
tablesNames = metadataManager.getStore().getTableNames();
codecRegistry = metadataManager.getStore().getCodecRegistry();
+ omdbDefinition = new OMDBDefinition();
}
@Override
- public void put(int cfIndex, byte[] keyBytes, byte[] valueBytes) throws
- RocksDBException {
+ public void put(int cfIndex, byte[] keyBytes, byte[] valueBytes) {
try {
processEvent(cfIndex, keyBytes, valueBytes,
OMDBUpdateEvent.OMDBUpdateAction.PUT);
@@ -72,7 +69,7 @@ public class OMDBUpdatesHandler extends WriteBatch.Handler {
}
@Override
- public void delete(int cfIndex, byte[] keyBytes) throws RocksDBException {
+ public void delete(int cfIndex, byte[] keyBytes) {
try {
processEvent(cfIndex, keyBytes, null,
OMDBUpdateEvent.OMDBUpdateAction.DELETE);
@@ -93,41 +90,44 @@ public class OMDBUpdatesHandler extends WriteBatch.Handler {
valueBytes, OMDBUpdateEvent.OMDBUpdateAction action)
throws IOException {
String tableName = tablesNames.get(cfIndex);
- Class<String> keyType = getKeyType();
- Class valueType = getValueType(tableName);
- if (valueType != null) {
+ Optional<Class> keyType = omdbDefinition.getKeyType(tableName);
+ Optional<Class> valueType = omdbDefinition.getValueType(tableName);
+ if (keyType.isPresent() && valueType.isPresent()) {
OMDBUpdateEvent.OMUpdateEventBuilder builder =
new OMDBUpdateEvent.OMUpdateEventBuilder<>();
builder.setTable(tableName);
builder.setAction(action);
-
- String key = codecRegistry.asObject(keyBytes, keyType);
+ String key = (String) codecRegistry.asObject(keyBytes, keyType.get());
builder.setKey(key);
+ // Put new
+ // Put existing --> Update
+ // Delete existing
+ // Delete non-existing
+ Table table = omMetadataManager.getTable(tableName);
+ // Recon does not add entries to cache and it is safer to always use
+ // getSkipCache in Recon.
+ Object oldValue = table.getSkipCache(key);
if (action == PUT) {
- Object value = codecRegistry.asObject(valueBytes, valueType);
+ Object value = codecRegistry.asObject(valueBytes, valueType.get());
builder.setValue(value);
- // If a PUT key operation happens on an existing Key, it is tagged
+ // If a PUT operation happens on an existing Key, it is tagged
// as an "UPDATE" event.
- if (tableName.equalsIgnoreCase(KEY_TABLE)) {
- if (omMetadataManager.getKeyTable().isExist(key)) {
- OmKeyInfo omKeyInfo = omMetadataManager.getKeyTable().get(key);
- builder.setOldValue(omKeyInfo);
- builder.setAction(UPDATE);
- }
+ if (oldValue != null) {
+ builder.setOldValue(oldValue);
+ builder.setAction(UPDATE);
}
} else if (action.equals(DELETE)) {
- // When you delete a Key, we add the old OmKeyInfo to the event so that
+ // When you delete a Key, we add the old value to the event so that
// a downstream task can use it.
- if (tableName.equalsIgnoreCase(KEY_TABLE)) {
- OmKeyInfo omKeyInfo = omMetadataManager.getKeyTable().get(key);
- builder.setValue(omKeyInfo);
- }
+ builder.setValue(oldValue);
}
OMDBUpdateEvent event = builder.build();
- LOG.debug("Generated OM update Event for table : " + event.getTable()
- + ", Key = " + event.getKey() + ", action = " + event.getAction());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Generated OM update Event for table : %s, " +
+ "action = %s", tableName, action));
+ }
if (omdbUpdateEvents.contains(event)) {
// If the same event is part of this batch, the last one only holds.
// For example, if there are 2 PUT key1 events, then the first one
@@ -135,6 +135,13 @@ public class OMDBUpdatesHandler extends WriteBatch.Handler {
omdbUpdateEvents.remove(event);
}
omdbUpdateEvents.add(event);
+ } else {
+ // key type or value type cannot be determined for this table.
+ // log a warn message and ignore the update.
+ if (LOG.isWarnEnabled()) {
+ LOG.warn(String.format("KeyType or ValueType could not be determined" +
+ " for table %s. Ignoring the event.", tableName));
+ }
}
}
@@ -262,35 +269,10 @@ public class OMDBUpdatesHandler extends WriteBatch.Handler {
}
/**
- * Return Key type class for a given table name.
- * @param name table name.
- * @return String.class by default.
- */
- private Class<String> getKeyType() {
- return String.class;
- }
-
- /**
- * Return Value type class for a given table.
- * @param name table name
- * @return Value type based on table name.
- */
- @VisibleForTesting
- protected Class getValueType(String name) {
- switch (name) {
- case KEY_TABLE : return OmKeyInfo.class;
- case VOLUME_TABLE : return OmVolumeArgs.class;
- case BUCKET_TABLE : return OmBucketInfo.class;
- default: return null;
- }
- }
-
- /**
* Get List of events.
* @return List of events.
*/
public List<OMDBUpdateEvent> getEvents() {
return omdbUpdateEvents;
}
-
}
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/TableCountTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/TableCountTask.java
new file mode 100644
index 0000000..2621529
--- /dev/null
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/TableCountTask.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.tasks;
+
+import com.google.inject.Inject;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.recon.ReconUtils;
+import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
+import org.hadoop.ozone.recon.schema.tables.daos.GlobalStatsDao;
+import org.hadoop.ozone.recon.schema.tables.pojos.GlobalStats;
+import org.jooq.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+/**
+ * Class to iterate over the OM DB and store the total counts of volumes,
+ * buckets, keys, open keys, deleted keys, etc.
+ */
+public class TableCountTask implements ReconOmTask {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TableCountTask.class);
+
+ private GlobalStatsDao globalStatsDao;
+ private Configuration sqlConfiguration;
+ private ReconOMMetadataManager reconOMMetadataManager;
+
+ @Inject
+ public TableCountTask(GlobalStatsDao globalStatsDao,
+ Configuration sqlConfiguration,
+ ReconOMMetadataManager reconOMMetadataManager) {
+ this.globalStatsDao = globalStatsDao;
+ this.sqlConfiguration = sqlConfiguration;
+ this.reconOMMetadataManager = reconOMMetadataManager;
+ }
+
+ /**
+ * Iterate the rows of each table in OM snapshot DB and calculate the
+ * counts for each table.
+ *
+ * @param omMetadataManager OM Metadata instance.
+ * @return Pair
+ */
+ @Override
+ public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
+ for (String tableName : getTaskTables()) {
+ Table table = omMetadataManager.getTable(tableName);
+ try (TableIterator keyIter = table.iterator()) {
+ long count = getCount(keyIter);
+ ReconUtils.upsertGlobalStatsTable(sqlConfiguration, globalStatsDao,
+ getRowKeyFromTable(tableName),
+ count);
+ } catch (IOException ioEx) {
+ LOG.error("Unable to populate Table Count in Recon DB.", ioEx);
+ return new ImmutablePair<>(getTaskName(), false);
+ }
+ }
+ LOG.info("Completed a 'reprocess' run of TableCountTask.");
+ return new ImmutablePair<>(getTaskName(), true);
+ }
+
+ private long getCount(Iterator iterator) {
+ long count = 0L;
+ while (iterator.hasNext()) {
+ count++;
+ iterator.next();
+ }
+ return count;
+ }
+
+ @Override
+ public String getTaskName() {
+ return "TableCountTask";
+ }
+
+ @Override
+ public Collection<String> getTaskTables() {
+ return new ArrayList<>(reconOMMetadataManager.listTableNames());
+ }
+
+ /**
+ * Read the update events and update the count of respective object
+ * (volume, bucket, key etc.) based on the action (put or delete).
+ *
+ * @param events Update events - PUT, DELETE and UPDATE.
+ * @return Pair
+ */
+ @Override
+ public Pair<String, Boolean> process(OMUpdateEventBatch events) {
+ Iterator<OMDBUpdateEvent> eventIterator = events.getIterator();
+
+ HashMap<String, Long> objectCountMap = initializeCountMap();
+
+ while (eventIterator.hasNext()) {
+ OMDBUpdateEvent<String, Object> omdbUpdateEvent = eventIterator.next();
+ String rowKey = getRowKeyFromTable(omdbUpdateEvent.getTable());
+ try{
+ switch (omdbUpdateEvent.getAction()) {
+ case PUT:
+ objectCountMap.computeIfPresent(rowKey, (k, count) -> count + 1L);
+ break;
+
+ case DELETE:
+ // if value is null, it means that the volume / bucket / key
+ // is already deleted and does not exist in the OM database anymore.
+ if (omdbUpdateEvent.getValue() != null) {
+ String key = getRowKeyFromTable(omdbUpdateEvent.getTable());
+ objectCountMap.computeIfPresent(key,
+ (k, count) -> count > 0 ? count - 1L : 0L);
+ }
+ break;
+
+ default: LOG.trace("Skipping DB update event : Table: {}, Action: {}",
+ omdbUpdateEvent.getTable(), omdbUpdateEvent.getAction());
+ }
+ } catch (Exception e) {
+ LOG.error("Unexpected exception while processing the table {}, " +
+ "Action: {}", omdbUpdateEvent.getTable(),
+ omdbUpdateEvent.getAction(), e);
+ return new ImmutablePair<>(getTaskName(), false);
+ }
+ }
+ for (Entry<String, Long> entry: objectCountMap.entrySet()) {
+ ReconUtils.upsertGlobalStatsTable(sqlConfiguration, globalStatsDao,
+ entry.getKey(),
+ entry.getValue());
+ }
+
+ LOG.info("Completed a 'process' run of TableCountTask.");
+ return new ImmutablePair<>(getTaskName(), true);
+ }
+
+ private HashMap<String, Long> initializeCountMap() {
+ Collection<String> tables = getTaskTables();
+ HashMap<String, Long> objectCountMap = new HashMap<>(tables.size());
+ for (String tableName: tables) {
+ String key = getRowKeyFromTable(tableName);
+ objectCountMap.put(key, getCountForKey(key));
+ }
+ return objectCountMap;
+ }
+
+ public static String getRowKeyFromTable(String tableName) {
+ return tableName + "Count";
+ }
+
+ /**
+ * Get the count stored for the given key from Global Stats table.
+ * Return 0 if record not found.
+ *
+ * @param key Key in the global stats table
+ * @return count
+ */
+ private long getCountForKey(String key) {
+ GlobalStats record = globalStatsDao.fetchOneByKey(key);
+
+ return (record == null) ? 0L : record.getValue();
+ }
+}
diff --git a/hadoop-ozone/recon/src/main/resources/webapps/recon/ozone-recon-web/src/views/overview/overview.tsx b/hadoop-ozone/recon/src/main/resources/webapps/recon/ozone-recon-web/src/views/overview/overview.tsx
index 4497d71..bd92f5f 100644
--- a/hadoop-ozone/recon/src/main/resources/webapps/recon/ozone-recon-web/src/views/overview/overview.tsx
+++ b/hadoop-ozone/recon/src/main/resources/webapps/recon/ozone-recon-web/src/views/overview/overview.tsx
@@ -180,7 +180,7 @@ export class Overview extends React.Component<Record<string, object>, IOverviewS
<OverviewCard loading={loading} title='Buckets' data={buckets.toString()} icon='folder-open'/>
</Col>
<Col xs={24} sm={18} md={12} lg={12} xl={6}>
- <OverviewCard loading={loading} title='Keys (Estimated)' data={keys.toString()} icon='file-text'/>
+ <OverviewCard loading={loading} title='Keys' data={keys.toString()} icon='file-text'/>
</Col>
</Row>
</div>
diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java
index a592119..f1350a9 100644
--- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java
+++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java
@@ -59,10 +59,14 @@ import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl;
import org.apache.hadoop.ozone.recon.spi.impl.StorageContainerServiceProviderImpl;
import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTask;
+import org.apache.hadoop.ozone.recon.tasks.TableCountTask;
import org.apache.hadoop.test.LambdaTestUtils;
import org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition;
import org.hadoop.ozone.recon.schema.tables.daos.FileCountBySizeDao;
+import org.hadoop.ozone.recon.schema.tables.daos.GlobalStatsDao;
import org.hadoop.ozone.recon.schema.tables.pojos.FileCountBySize;
+import org.jooq.Configuration;
+import org.jooq.DSLContext;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@@ -74,6 +78,7 @@ import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getRandom
import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getTestReconOmMetadataManager;
import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.initializeNewOmMetadataManager;
import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.writeDataToOm;
+import static org.hadoop.ozone.recon.schema.tables.GlobalStatsTable.GLOBAL_STATS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.BDDMockito.given;
@@ -97,6 +102,7 @@ public class TestEndpoints extends AbstractReconSqlDBTest {
private UtilizationEndpoint utilizationEndpoint;
private ReconOMMetadataManager reconOMMetadataManager;
private FileSizeCountTask fileSizeCountTask;
+ private TableCountTask tableCountTask;
private ReconStorageContainerManagerFacade reconScm;
private boolean isSetupDone = false;
private String pipelineId;
@@ -107,6 +113,7 @@ public class TestEndpoints extends AbstractReconSqlDBTest {
private DatanodeDetailsProto datanodeDetailsProto;
private Pipeline pipeline;
private FileCountBySizeDao fileCountBySizeDao;
+ private DSLContext dslContext;
private final String host1 = "host1.datanode";
private final String host2 = "host2.datanode";
private final String ip1 = "1.1.1.1";
@@ -166,17 +173,23 @@ public class TestEndpoints extends AbstractReconSqlDBTest {
nodeEndpoint = reconTestInjector.getInstance(NodeEndpoint.class);
pipelineEndpoint = reconTestInjector.getInstance(PipelineEndpoint.class);
- clusterStateEndpoint =
- reconTestInjector.getInstance(ClusterStateEndpoint.class);
fileCountBySizeDao = getDao(FileCountBySizeDao.class);
+ GlobalStatsDao globalStatsDao = getDao(GlobalStatsDao.class);
UtilizationSchemaDefinition utilizationSchemaDefinition =
getSchemaDefinition(UtilizationSchemaDefinition.class);
+ Configuration sqlConfiguration =
+ reconTestInjector.getInstance(Configuration.class);
utilizationEndpoint = new UtilizationEndpoint(
fileCountBySizeDao, utilizationSchemaDefinition);
fileSizeCountTask =
new FileSizeCountTask(fileCountBySizeDao, utilizationSchemaDefinition);
+ tableCountTask = new TableCountTask(
+ globalStatsDao, sqlConfiguration, reconOMMetadataManager);
reconScm = (ReconStorageContainerManagerFacade)
reconTestInjector.getInstance(OzoneStorageContainerManager.class);
+ clusterStateEndpoint =
+ new ClusterStateEndpoint(reconScm, globalStatsDao);
+ dslContext = getDslContext();
}
@Before
@@ -305,6 +318,9 @@ public class TestEndpoints extends AbstractReconSqlDBTest {
// key = key_three
writeDataToOm(reconOMMetadataManager, "key_three");
+
+ // Truncate global stats table before running each test
+ dslContext.truncate(GLOBAL_STATS);
}
private void testDatanodeResponse(DatanodeMetadata datanodeMetadata)
@@ -415,9 +431,9 @@ public class TestEndpoints extends AbstractReconSqlDBTest {
(ClusterStateResponse) response.getEntity();
Assert.assertEquals(1, clusterStateResponse.getPipelines());
- Assert.assertEquals(2, clusterStateResponse.getVolumes());
- Assert.assertEquals(2, clusterStateResponse.getBuckets());
- Assert.assertEquals(3, clusterStateResponse.getKeys());
+ Assert.assertEquals(0, clusterStateResponse.getVolumes());
+ Assert.assertEquals(0, clusterStateResponse.getBuckets());
+ Assert.assertEquals(0, clusterStateResponse.getKeys());
Assert.assertEquals(2, clusterStateResponse.getTotalDatanodes());
Assert.assertEquals(2, clusterStateResponse.getHealthyDatanodes());
@@ -427,6 +443,16 @@ public class TestEndpoints extends AbstractReconSqlDBTest {
(ClusterStateResponse) response1.getEntity();
return (clusterStateResponse1.getContainers() == 1);
});
+
+ // check volume, bucket and key count after running table count task
+ Pair<String, Boolean> result =
+ tableCountTask.reprocess(reconOMMetadataManager);
+ assertTrue(result.getRight());
+ response = clusterStateEndpoint.getClusterState();
+ clusterStateResponse = (ClusterStateResponse) response.getEntity();
+ Assert.assertEquals(2, clusterStateResponse.getVolumes());
+ Assert.assertEquals(2, clusterStateResponse.getBuckets());
+ Assert.assertEquals(3, clusterStateResponse.getKeys());
}
@Test
diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOMDBUpdatesHandler.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOMDBUpdatesHandler.java
index d1cb1e9..92c797b 100644
--- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOMDBUpdatesHandler.java
+++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOMDBUpdatesHandler.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -35,10 +35,12 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.server.ServerUtils;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.codec.OMDBDefinition;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -54,6 +56,8 @@ public class TestOMDBUpdatesHandler {
@Rule
public TemporaryFolder folder = new TemporaryFolder();
+ private OMDBDefinition omdbDefinition = new OMDBDefinition();
+
private OzoneConfiguration createNewTestPath() throws IOException {
OzoneConfiguration configuration = new OzoneConfiguration();
File newFolder = folder.newFolder();
@@ -149,6 +153,7 @@ public class TestOMDBUpdatesHandler {
// Write 1 volume, 1 key into source and target OM DBs.
String volumeKey = metaMgr.getVolumeKey("sampleVol");
+ String nonExistVolumeKey = metaMgr.getVolumeKey("nonExistingVolume");
OmVolumeArgs args =
OmVolumeArgs.newBuilder()
.setVolume("sampleVol")
@@ -165,6 +170,9 @@ public class TestOMDBUpdatesHandler {
// Delete the volume and key from target DB.
metaMgr.getKeyTable().delete("/sampleVol/bucketOne/key_one");
metaMgr.getVolumeTable().delete(volumeKey);
+ // Delete a non-existing volume and key
+ metaMgr.getKeyTable().delete("/sampleVol/bucketOne/key_two");
+ metaMgr.getVolumeTable().delete(metaMgr.getVolumeKey("nonExistingVolume"));
RDBStore rdbStore = (RDBStore) metaMgr.getStore();
RocksDB rocksDB = rdbStore.getDb();
@@ -191,7 +199,7 @@ public class TestOMDBUpdatesHandler {
}
List<OMDBUpdateEvent> events = omdbUpdatesHandler.getEvents();
- assertTrue(events.size() == 2);
+ assertEquals(4, events.size());
OMDBUpdateEvent keyEvent = events.get(0);
assertEquals(OMDBUpdateEvent.OMDBUpdateAction.DELETE, keyEvent.getAction());
@@ -201,7 +209,35 @@ public class TestOMDBUpdatesHandler {
OMDBUpdateEvent volEvent = events.get(1);
assertEquals(OMDBUpdateEvent.OMDBUpdateAction.DELETE, volEvent.getAction());
assertEquals(volumeKey, volEvent.getKey());
- assertNull(volEvent.getValue());
+ assertNotNull(volEvent.getValue());
+ OmVolumeArgs volumeInfo = (OmVolumeArgs) volEvent.getValue();
+ assertEquals("sampleVol", volumeInfo.getVolume());
+
+ // Assert the values of non existent keys are set to null.
+ OMDBUpdateEvent nonExistKey = events.get(2);
+ assertEquals(OMDBUpdateEvent.OMDBUpdateAction.DELETE,
+ nonExistKey.getAction());
+ assertEquals("/sampleVol/bucketOne/key_two", nonExistKey.getKey());
+ assertNull(nonExistKey.getValue());
+
+ OMDBUpdateEvent nonExistVolume = events.get(3);
+ assertEquals(OMDBUpdateEvent.OMDBUpdateAction.DELETE,
+ nonExistVolume.getAction());
+ assertEquals(nonExistVolumeKey, nonExistVolume.getKey());
+ assertNull(nonExistVolume.getValue());
+ }
+
+ @Test
+ public void testGetKeyType() throws IOException {
+ OzoneConfiguration configuration = createNewTestPath();
+ OmMetadataManagerImpl metaMgr = new OmMetadataManagerImpl(configuration);
+ OMDBUpdatesHandler omdbUpdatesHandler =
+ new OMDBUpdatesHandler(metaMgr);
+
+ assertEquals(String.class, omdbDefinition.getKeyType(
+ metaMgr.getKeyTable().getName()).get());
+ assertEquals(OzoneTokenIdentifier.class, omdbDefinition.getKeyType(
+ metaMgr.getDelegationTokenTable().getName()).get());
}
@Test
@@ -211,12 +247,12 @@ public class TestOMDBUpdatesHandler {
OMDBUpdatesHandler omdbUpdatesHandler =
new OMDBUpdatesHandler(metaMgr);
- assertEquals(OmKeyInfo.class, omdbUpdatesHandler.getValueType(
- metaMgr.getKeyTable().getName()));
- assertEquals(OmVolumeArgs.class, omdbUpdatesHandler.getValueType(
- metaMgr.getVolumeTable().getName()));
- assertEquals(OmBucketInfo.class, omdbUpdatesHandler.getValueType(
- metaMgr.getBucketTable().getName()));
+ assertEquals(OmKeyInfo.class, omdbDefinition.getValueType(
+ metaMgr.getKeyTable().getName()).get());
+ assertEquals(OmVolumeArgs.class, omdbDefinition.getValueType(
+ metaMgr.getVolumeTable().getName()).get());
+ assertEquals(OmBucketInfo.class, omdbDefinition.getValueType(
+ metaMgr.getBucketTable().getName()).get());
}
private OmKeyInfo getOmKeyInfo(String volumeName, String bucketName,
diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestTableCountTask.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestTableCountTask.java
new file mode 100644
index 0000000..94d7673
--- /dev/null
+++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestTableCountTask.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.tasks;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.utils.db.TypedTable;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.recon.persistence.AbstractReconSqlDBTest;
+import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
+import org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMUpdateEventBuilder;
+
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.BUCKET_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DELETED_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.OPEN_KEY_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.VOLUME_TABLE;
+import org.hadoop.ozone.recon.schema.tables.daos.GlobalStatsDao;
+import org.jooq.DSLContext;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getTestReconOmMetadataManager;
+import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.initializeNewOmMetadataManager;
+import static org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMDBUpdateAction.DELETE;
+import static org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMDBUpdateAction.PUT;
+import static org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMDBUpdateAction.UPDATE;
+import static org.hadoop.ozone.recon.schema.tables.GlobalStatsTable.GLOBAL_STATS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit test for Object Count Task.
+ */
+public class TestTableCountTask extends AbstractReconSqlDBTest {
+
+ private GlobalStatsDao globalStatsDao;
+ private TableCountTask tableCountTask;
+ private DSLContext dslContext;
+ private boolean isSetupDone = false;
+
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ private void initializeInjector() throws IOException {
+ ReconOMMetadataManager omMetadataManager = getTestReconOmMetadataManager(
+ initializeNewOmMetadataManager(temporaryFolder.newFolder()),
+ temporaryFolder.newFolder());
+ globalStatsDao = getDao(GlobalStatsDao.class);
+ tableCountTask = new TableCountTask(globalStatsDao, getConfiguration(),
+ omMetadataManager);
+ dslContext = getDslContext();
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ // The following setup runs only once
+ if (!isSetupDone) {
+ initializeInjector();
+ isSetupDone = true;
+ }
+ // Truncate table before running each test
+ dslContext.truncate(GLOBAL_STATS);
+ }
+
+ @Test
+ public void testReprocess() {
+ OMMetadataManager omMetadataManager = mock(OmMetadataManagerImpl.class);
+ // Mock 5 rows in each table and test the count
+ for (String tableName: tableCountTask.getTaskTables()) {
+ TypedTable<String, Object> table = mock(TypedTable.class);
+ TypedTable.TypedTableIterator mockIter = mock(TypedTable
+ .TypedTableIterator.class);
+ when(table.iterator()).thenReturn(mockIter);
+ when(omMetadataManager.getTable(tableName)).thenReturn(table);
+ when(mockIter.hasNext())
+ .thenReturn(true)
+ .thenReturn(true)
+ .thenReturn(true)
+ .thenReturn(true)
+ .thenReturn(true)
+ .thenReturn(false);
+ }
+
+ Pair<String, Boolean> result = tableCountTask.reprocess(omMetadataManager);
+ assertTrue(result.getRight());
+
+ assertEquals(5L, getCountForTable(KEY_TABLE));
+ assertEquals(5L, getCountForTable(VOLUME_TABLE));
+ assertEquals(5L, getCountForTable(BUCKET_TABLE));
+ assertEquals(5L, getCountForTable(OPEN_KEY_TABLE));
+ assertEquals(5L, getCountForTable(DELETED_TABLE));
+ }
+
+ @Test
+ public void testProcess() {
+ ArrayList<OMDBUpdateEvent> events = new ArrayList<>();
+ // Create 5 put, 1 delete and 1 update event for each table
+ for (String tableName: tableCountTask.getTaskTables()) {
+ for (int i=0; i<5; i++) {
+ events.add(getOMUpdateEvent("item" + i, null, tableName, PUT));
+ }
+ // for delete event, if value is set to null, the counter will not be
+ // decremented. This is because the value will be null if item does not
+ // exist in the database and there is no need to delete.
+ events.add(getOMUpdateEvent("item0", mock(OmKeyInfo.class), tableName,
+ DELETE));
+ events.add(getOMUpdateEvent("item1", null, tableName, UPDATE));
+ }
+ OMUpdateEventBatch omUpdateEventBatch = new OMUpdateEventBatch(events);
+ tableCountTask.process(omUpdateEventBatch);
+
+ // Verify 4 items in each table. (5 puts - 1 delete + 0 update)
+ assertEquals(4L, getCountForTable(KEY_TABLE));
+ assertEquals(4L, getCountForTable(VOLUME_TABLE));
+ assertEquals(4L, getCountForTable(BUCKET_TABLE));
+ assertEquals(4L, getCountForTable(OPEN_KEY_TABLE));
+ assertEquals(4L, getCountForTable(DELETED_TABLE));
+
+ // add a new key and simulate delete on non-existing item (value: null)
+ ArrayList<OMDBUpdateEvent> newEvents = new ArrayList<>();
+ for (String tableName: tableCountTask.getTaskTables()) {
+ newEvents.add(getOMUpdateEvent("item5", null, tableName, PUT));
+ // This delete event should be a noop since value is null
+ newEvents.add(getOMUpdateEvent("item0", null, tableName, DELETE));
+ }
+
+ omUpdateEventBatch = new OMUpdateEventBatch(newEvents);
+ tableCountTask.process(omUpdateEventBatch);
+
+ // Verify 5 items in each table. (1 new put + 0 delete)
+ assertEquals(5L, getCountForTable(KEY_TABLE));
+ assertEquals(5L, getCountForTable(VOLUME_TABLE));
+ assertEquals(5L, getCountForTable(BUCKET_TABLE));
+ assertEquals(5L, getCountForTable(OPEN_KEY_TABLE));
+ assertEquals(5L, getCountForTable(DELETED_TABLE));
+ }
+
+ private OMDBUpdateEvent getOMUpdateEvent(String name, Object value,
+ String table,
+ OMDBUpdateEvent.OMDBUpdateAction action) {
+ return new OMUpdateEventBuilder()
+ .setAction(action)
+ .setKey(name)
+ .setValue(value)
+ .setTable(table)
+ .build();
+ }
+
+ private long getCountForTable(String tableName) {
+ String key = TableCountTask.getRowKeyFromTable(tableName);
+ return globalStatsDao.findById(key).getValue();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org