You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2019/08/19 17:47:58 UTC
[hadoop] branch trunk updated: HDDS-1105 : Add mechanism in Recon
to obtain DB snapshot 'delta' updates from Ozone Manager (#1259)
This is an automated email from the ASF dual-hosted git repository.
hanishakoneru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new d69a1a0 HDDS-1105 : Add mechanism in Recon to obtain DB snapshot 'delta' updates from Ozone Manager (#1259)
d69a1a0 is described below
commit d69a1a0aa49614c084fa4b9546aceeeee65aebe4
Author: avijayanhwx <14...@users.noreply.github.com>
AuthorDate: Mon Aug 19 10:47:49 2019 -0700
HDDS-1105 : Add mechanism in Recon to obtain DB snapshot 'delta' updates from Ozone Manager (#1259)
---
.../apache/hadoop/utils/db/DBUpdatesWrapper.java | 4 +
.../apache/hadoop/utils/db/RDBBatchOperation.java | 4 +
.../ozone/om/protocol/OzoneManagerProtocol.java | 13 +
...OzoneManagerProtocolClientSideTranslatorPB.java | 28 ++-
.../org/apache/hadoop/ozone/om/OzoneManager.java | 1 +
.../protocolPB/OzoneManagerRequestHandler.java | 5 +-
hadoop-ozone/ozone-recon/pom.xml | 24 --
.../hadoop/ozone/recon/ReconControllerModule.java | 36 ++-
.../org/apache/hadoop/ozone/recon/ReconServer.java | 77 ++----
.../hadoop/ozone/recon/ReconServerConfigKeys.java | 2 +-
...ceProvider.java => ReconTaskBindingModule.java} | 37 ++-
.../org/apache/hadoop/ozone/recon/ReconUtils.java | 10 +-
.../recon/spi/OzoneManagerServiceProvider.java | 10 +-
.../spi/impl/ContainerDBServiceProviderImpl.java | 7 +-
.../spi/impl/OzoneManagerServiceProviderImpl.java | 216 +++++++++++++---
.../recon/spi/impl/ReconContainerDBProvider.java | 13 +-
.../ozone/recon/tasks/ContainerKeyMapperTask.java | 34 +--
.../ozone/recon/tasks/FileSizeCountTask.java | 36 ++-
.../hadoop/ozone/recon/tasks/OMDBUpdateEvent.java | 43 +---
.../ozone/recon/tasks/OMDBUpdatesHandler.java | 9 +-
.../ozone/recon/tasks/OMUpdateEventBatch.java | 16 +-
.../ozone/recon/tasks/ReconDBUpdateTask.java | 20 +-
.../ozone/recon/tasks/ReconTaskController.java | 25 +-
.../ozone/recon/tasks/ReconTaskControllerImpl.java | 137 +++++++----
.../ozone/recon/AbstractOMMetadataManagerTest.java | 15 +-
.../apache/hadoop/ozone/recon/TestReconUtils.java | 7 +-
.../ozone/recon/api/TestContainerKeyService.java | 73 ++----
.../ozone/recon/api/TestUtilizationService.java | 15 +-
.../impl/TestOzoneManagerServiceProviderImpl.java | 274 ++++++++++++++++-----
.../hadoop/ozone/recon/tasks/DummyReconDBTask.java | 18 +-
.../recon/tasks/TestContainerKeyMapperTask.java | 48 ++--
.../ozone/recon/tasks/TestFileSizeCountTask.java | 16 +-
.../recon/tasks/TestReconTaskControllerImpl.java | 94 ++++---
33 files changed, 865 insertions(+), 502 deletions(-)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBUpdatesWrapper.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBUpdatesWrapper.java
index 54ebc7c..c4dca6a 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBUpdatesWrapper.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBUpdatesWrapper.java
@@ -41,6 +41,10 @@ public class DBUpdatesWrapper {
return dataList;
}
+ public void setCurrentSequenceNumber(long sequenceNumber) {
+ this.currentSequenceNumber = sequenceNumber;
+ }
+
public long getCurrentSequenceNumber() {
return currentSequenceNumber;
}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBBatchOperation.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBBatchOperation.java
index a8b78ed..19699f5 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBBatchOperation.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBBatchOperation.java
@@ -37,6 +37,10 @@ public class RDBBatchOperation implements BatchOperation {
writeBatch = new WriteBatch();
}
+ public RDBBatchOperation(WriteBatch writeBatch) {
+ this.writeBatch = writeBatch;
+ }
+
public void commit(RocksDB db, WriteOptions writeOptions) throws IOException {
try {
db.write(writeOptions, writeBatch);
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
index 3d4dd93..f6a4e64 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
import java.io.Closeable;
@@ -49,6 +50,8 @@ import org.apache.hadoop.ozone.security.OzoneDelegationTokenSelector;
import org.apache.hadoop.ozone.security.acl.OzoneObj;
import org.apache.hadoop.security.KerberosInfo;
import org.apache.hadoop.security.token.TokenInfo;
+import org.apache.hadoop.utils.db.DBUpdatesWrapper;
+import org.apache.hadoop.utils.db.SequenceNumberNotFoundException;
/**
* Protocol to talk to OM.
@@ -505,4 +508,14 @@ public interface OzoneManagerProtocol
* */
List<OzoneAcl> getAcl(OzoneObj obj) throws IOException;
+ /**
+ * Get DB updates since a specific sequence number.
+ * @param dbUpdatesRequest request that encapsulates a sequence number.
+ * @return Wrapper containing the updates.
+ * @throws SequenceNumberNotFoundException if db is unable to read the data.
+ */
+ DBUpdatesWrapper getDBUpdates(
+ OzoneManagerProtocolProtos.DBUpdatesRequest dbUpdatesRequest)
+ throws IOException;
+
}
\ No newline at end of file
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index f761548..83c5316 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -66,7 +66,9 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateF
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateFileResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListStatusRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListStatusResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetAclResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DBUpdatesRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DBUpdatesResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AddAclRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateDirectoryRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetFileStatusResponse;
@@ -138,11 +140,14 @@ import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequ
import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto;
import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.utils.db.DBUpdatesWrapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
+import com.google.protobuf.ByteString;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1396,7 +1401,7 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
OMRequest omRequest = createOMRequest(Type.SetAcl)
.setSetAclRequest(builder.build())
.build();
- OzoneManagerProtocolProtos.SetAclResponse response =
+ SetAclResponse response =
handleError(submitRequest(omRequest)).getSetAclResponse();
return response.getResponse();
@@ -1426,6 +1431,25 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
}
@Override
+ public DBUpdatesWrapper getDBUpdates(DBUpdatesRequest dbUpdatesRequest)
+ throws IOException {
+ OMRequest omRequest = createOMRequest(Type.DBUpdates)
+ .setDbUpdatesRequest(dbUpdatesRequest)
+ .build();
+
+ DBUpdatesResponse dbUpdatesResponse =
+ handleError(submitRequest(omRequest)).getDbUpdatesResponse();
+
+ DBUpdatesWrapper dbUpdatesWrapper = new DBUpdatesWrapper();
+ for (ByteString byteString : dbUpdatesResponse.getDataList()) {
+ dbUpdatesWrapper.addWriteBatch(byteString.toByteArray(), 0L);
+ }
+ dbUpdatesWrapper.setCurrentSequenceNumber(
+ dbUpdatesResponse.getSequenceNumber());
+ return dbUpdatesWrapper;
+ }
+
+ @Override
public OpenKeySession createFile(OmKeyArgs args,
boolean overWrite, boolean recursive) throws IOException {
KeyArgs keyArgs = KeyArgs.newBuilder()
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index a81b0de..dbd5d39 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -3422,6 +3422,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
* @return Wrapper containing the updates.
* @throws SequenceNumberNotFoundException if db is unable to read the data.
*/
+ @Override
public DBUpdatesWrapper getDBUpdates(
DBUpdatesRequest dbUpdatesRequest)
throws SequenceNumberNotFoundException {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
index 7c9f126..c2d6227 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
@@ -394,9 +394,10 @@ public class OzoneManagerRequestHandler implements RequestHandler {
DBUpdatesWrapper dbUpdatesWrapper =
impl.getDBUpdates(dbUpdatesRequest);
for (int i = 0; i < dbUpdatesWrapper.getData().size(); i++) {
- builder.setData(i,
- OMPBHelper.getByteString(dbUpdatesWrapper.getData().get(i)));
+ builder.addData(OMPBHelper.getByteString(
+ dbUpdatesWrapper.getData().get(i)));
}
+ builder.setSequenceNumber(dbUpdatesWrapper.getCurrentSequenceNumber());
return builder.build();
}
diff --git a/hadoop-ozone/ozone-recon/pom.xml b/hadoop-ozone/ozone-recon/pom.xml
index 9672796..130ad35 100644
--- a/hadoop-ozone/ozone-recon/pom.xml
+++ b/hadoop-ozone/ozone-recon/pom.xml
@@ -273,30 +273,6 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.powermock</groupId>
- <artifactId>powermock-module-junit4</artifactId>
- <version>1.7.4</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.javassist</groupId>
- <artifactId>javassist</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.powermock</groupId>
- <artifactId>powermock-api-mockito2</artifactId>
- <version>1.7.4</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
<groupId>org.jooq</groupId>
<artifactId>jooq</artifactId>
<version>${jooq.version}</version>
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
index e7c20f0..21bc5be 100644
--- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
@@ -29,8 +29,13 @@ import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SQ
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SQL_MAX_IDLE_CONNECTION_AGE;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SQL_MAX_IDLE_CONNECTION_TEST_STMT;
+import java.io.IOException;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.recon.persistence.DataSourceConfiguration;
import org.apache.hadoop.ozone.recon.persistence.JooqPersistenceModule;
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
@@ -40,9 +45,15 @@ import org.apache.hadoop.ozone.recon.spi.OzoneManagerServiceProvider;
import org.apache.hadoop.ozone.recon.spi.impl.ReconContainerDBProvider;
import org.apache.hadoop.ozone.recon.spi.impl.ContainerDBServiceProviderImpl;
import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl;
+import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTask;
+import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTask;
import org.apache.hadoop.ozone.recon.tasks.ReconTaskController;
import org.apache.hadoop.ozone.recon.tasks.ReconTaskControllerImpl;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.utils.db.DBStore;
+import org.apache.ratis.protocol.ClientId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
@@ -52,6 +63,9 @@ import com.google.inject.Singleton;
* Guice controller that defines concrete bindings.
*/
public class ReconControllerModule extends AbstractModule {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ReconControllerModule.class);
+
@Override
protected void configure() {
bind(Configuration.class).toProvider(ConfigurationProvider.class);
@@ -60,17 +74,37 @@ public class ReconControllerModule extends AbstractModule {
.toProvider(ReconContainerDBProvider.class).in(Singleton.class);
bind(ReconOMMetadataManager.class)
.to(ReconOmMetadataManagerImpl.class).in(Singleton.class);
+ bind(OMMetadataManager.class).to(ReconOmMetadataManagerImpl.class)
+ .in(Singleton.class);
bind(ContainerDBServiceProvider.class)
.to(ContainerDBServiceProviderImpl.class).in(Singleton.class);
bind(OzoneManagerServiceProvider.class)
.to(OzoneManagerServiceProviderImpl.class).in(Singleton.class);
-
+ bind(ReconUtils.class).in(Singleton.class);
// Persistence - inject configuration provider
install(new JooqPersistenceModule(
getProvider(DataSourceConfiguration.class)));
bind(ReconTaskController.class)
.to(ReconTaskControllerImpl.class).in(Singleton.class);
+ bind(ContainerKeyMapperTask.class);
+ bind(FileSizeCountTask.class);
+ }
+
+ @Provides
+ OzoneManagerProtocol getOzoneManagerProtocol(
+ final OzoneConfiguration ozoneConfiguration) {
+ OzoneManagerProtocol ozoneManagerClient = null;
+ try {
+ ClientId clientId = ClientId.randomId();
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ ozoneManagerClient = new
+ OzoneManagerProtocolClientSideTranslatorPB(
+ ozoneConfiguration, clientId.toString(), ugi);
+ } catch (IOException ioEx) {
+ LOG.error("Error in provisioning OzoneManagerProtocol ", ioEx);
+ }
+ return ozoneManagerClient;
}
@Provides
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServer.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServer.java
index a11cb5f..1aaf887 100644
--- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServer.java
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServer.java
@@ -18,26 +18,15 @@
package org.apache.hadoop.ozone.recon;
-import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY;
-import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT;
-import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INTERVAL;
-import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INTERVAL_DEFAULT;
-
-import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdds.cli.GenericCli;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.ozone.recon.spi.ContainerDBServiceProvider;
import org.apache.hadoop.ozone.recon.spi.OzoneManagerServiceProvider;
-import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTask;
-import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTask;
import org.hadoop.ozone.recon.schema.ReconInternalSchemaDefinition;
import org.hadoop.ozone.recon.schema.StatsSchemaDefinition;
import org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition;
-import org.jooq.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,13 +58,15 @@ public class ReconServer extends GenericCli {
ConfigurationProvider.setConfiguration(ozoneConfiguration);
injector = Guice.createInjector(new
- ReconControllerModule(), new ReconRestServletModule() {
+ ReconControllerModule(),
+ new ReconRestServletModule() {
@Override
protected void configureServlets() {
rest("/api/*")
.packages("org.apache.hadoop.ozone.recon.api");
}
- });
+ },
+ new ReconTaskBindingModule());
//Pass on injector to listener that does the Guice - Jersey HK2 bridging.
ReconGuiceServletContextListener.setInjector(injector);
@@ -95,14 +86,20 @@ public class ReconServer extends GenericCli {
reconInternalSchemaDefinition.initializeSchema();
LOG.info("Recon server initialized successfully!");
+
+ httpServer = injector.getInstance(ReconHttpServer.class);
+ LOG.info("Starting Recon server");
+ httpServer.start();
+
+ //Start Ozone Manager Service that pulls data from OM.
+ OzoneManagerServiceProvider ozoneManagerServiceProvider = injector
+ .getInstance(OzoneManagerServiceProvider.class);
+ ozoneManagerServiceProvider.start();
} catch (Exception e) {
LOG.error("Error during initializing Recon server.", e);
+ stop();
}
- httpServer = injector.getInstance(ReconHttpServer.class);
- LOG.info("Starting Recon server");
- httpServer.start();
- scheduleReconTasks();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
stop();
@@ -113,52 +110,6 @@ public class ReconServer extends GenericCli {
return null;
}
- /**
- * Schedule the tasks that is required by Recon to keep its metadata up to
- * date.
- */
- private void scheduleReconTasks() {
- OzoneConfiguration configuration = injector.getInstance(
- OzoneConfiguration.class);
- ContainerDBServiceProvider containerDBServiceProvider = injector
- .getInstance(ContainerDBServiceProvider.class);
- OzoneManagerServiceProvider ozoneManagerServiceProvider = injector
- .getInstance(OzoneManagerServiceProvider.class);
- Configuration sqlConfiguration = injector.getInstance(Configuration.class);
- long initialDelay = configuration.getTimeDuration(
- RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY,
- RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT,
- TimeUnit.MILLISECONDS);
- long interval = configuration.getTimeDuration(
- RECON_OM_SNAPSHOT_TASK_INTERVAL,
- RECON_OM_SNAPSHOT_TASK_INTERVAL_DEFAULT,
- TimeUnit.MILLISECONDS);
-
-
- scheduler.scheduleWithFixedDelay(() -> {
- try {
- ozoneManagerServiceProvider.updateReconOmDBWithNewSnapshot();
- // Schedule the task to read OM DB and write the reverse mapping to
- // Recon container DB.
- ContainerKeyMapperTask containerKeyMapperTask =
- new ContainerKeyMapperTask(containerDBServiceProvider,
- ozoneManagerServiceProvider.getOMMetadataManagerInstance());
- containerKeyMapperTask.reprocess(
- ozoneManagerServiceProvider.getOMMetadataManagerInstance());
- FileSizeCountTask fileSizeCountTask = new
- FileSizeCountTask(
- ozoneManagerServiceProvider.getOMMetadataManagerInstance(),
- sqlConfiguration);
- fileSizeCountTask.reprocess(
- ozoneManagerServiceProvider.getOMMetadataManagerInstance());
-
- } catch (IOException e) {
- LOG.error("Unable to get OM " +
- "Snapshot", e);
- }
- }, initialDelay, interval, TimeUnit.MILLISECONDS);
- }
-
void stop() throws Exception {
LOG.info("Stopping Recon server");
httpServer.stop();
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
index 0501093..034af4a 100644
--- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
@@ -114,7 +114,7 @@ public final class ReconServerConfigKeys {
public static final String OZONE_RECON_TASK_THREAD_COUNT_KEY =
"ozone.recon.task.thread.count";
- public static final int OZONE_RECON_TASK_THREAD_COUNT_DEFAULT = 1;
+ public static final int OZONE_RECON_TASK_THREAD_COUNT_DEFAULT = 5;
/**
* Private constructor for utility class.
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/OzoneManagerServiceProvider.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconTaskBindingModule.java
similarity index 54%
copy from hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/OzoneManagerServiceProvider.java
copy to hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconTaskBindingModule.java
index 420f333..19cc0da 100644
--- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/OzoneManagerServiceProvider.java
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconTaskBindingModule.java
@@ -1,5 +1,3 @@
-package org.apache.hadoop.ozone.recon.spi;
-
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -18,28 +16,25 @@ package org.apache.hadoop.ozone.recon.spi;
* limitations under the License.
*/
-import java.io.IOException;
+package org.apache.hadoop.ozone.recon;
+
+import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTask;
+import org.apache.hadoop.ozone.recon.tasks.FileSizeCountTask;
+import org.apache.hadoop.ozone.recon.tasks.ReconDBUpdateTask;
-import org.apache.hadoop.ozone.om.OMMetadataManager;
+import com.google.inject.AbstractModule;
+import com.google.inject.multibindings.Multibinder;
/**
- * Interface to access OM endpoints.
+ * Binds the various Recon Tasks.
*/
-public interface OzoneManagerServiceProvider {
-
- /**
- * Initialize Ozone Manager Service Provider Impl.
- */
- void init() throws IOException;
-
- /**
- * Update Recon OM DB with new snapshot from OM.
- */
- void updateReconOmDBWithNewSnapshot() throws IOException;
+public class ReconTaskBindingModule extends AbstractModule {
- /**
- * Return instance of OM Metadata manager.
- * @return OM metadata manager instance.
- */
- OMMetadataManager getOMMetadataManagerInstance();
+ @Override
+ protected void configure() {
+ Multibinder<ReconDBUpdateTask> taskBinder =
+ Multibinder.newSetBinder(binder(), ReconDBUpdateTask.class);
+ taskBinder.addBinding().to(ContainerKeyMapperTask.class);
+ taskBinder.addBinding().to(FileSizeCountTask.class);
+ }
}
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
index 324a369..95e6f9b 100644
--- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
@@ -51,11 +51,11 @@ import org.slf4j.LoggerFactory;
/**
* Recon Utility class.
*/
-public final class ReconUtils {
+public class ReconUtils {
private final static int WRITE_BUFFER = 1048576; //1MB
- private ReconUtils() {
+ public ReconUtils() {
}
private static final Logger LOG = LoggerFactory.getLogger(
@@ -69,7 +69,7 @@ public final class ReconUtils {
* @param dirConfigKey key to check
* @return Return File based on configured or fallback value.
*/
- public static File getReconDbDir(Configuration conf, String dirConfigKey) {
+ public File getReconDbDir(Configuration conf, String dirConfigKey) {
File metadataDir = getDirectoryFromConfig(conf, dirConfigKey,
"Recon");
@@ -90,7 +90,7 @@ public final class ReconUtils {
* @param destPath destination path to untar to.
* @throws IOException ioException
*/
- public static void untarCheckpointFile(File tarFile, Path destPath)
+ public void untarCheckpointFile(File tarFile, Path destPath)
throws IOException {
FileInputStream fileInputStream = null;
@@ -153,7 +153,7 @@ public final class ReconUtils {
* @return Inputstream to the response of the HTTP call.
* @throws IOException While reading the response.
*/
- public static InputStream makeHttpCall(CloseableHttpClient httpClient,
+ public InputStream makeHttpCall(CloseableHttpClient httpClient,
String url)
throws IOException {
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/OzoneManagerServiceProvider.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/OzoneManagerServiceProvider.java
index 420f333..3f57af6 100644
--- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/OzoneManagerServiceProvider.java
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/OzoneManagerServiceProvider.java
@@ -18,8 +18,6 @@ package org.apache.hadoop.ozone.recon.spi;
* limitations under the License.
*/
-import java.io.IOException;
-
import org.apache.hadoop.ozone.om.OMMetadataManager;
/**
@@ -28,14 +26,14 @@ import org.apache.hadoop.ozone.om.OMMetadataManager;
public interface OzoneManagerServiceProvider {
/**
- * Initialize Ozone Manager Service Provider Impl.
+ * Start a task to sync data from OM.
*/
- void init() throws IOException;
+ void start();
/**
- * Update Recon OM DB with new snapshot from OM.
+ * Stop the OM sync data.
*/
- void updateReconOmDBWithNewSnapshot() throws IOException;
+ void stop();
/**
* Return instance of OM Metadata manager.
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ContainerDBServiceProviderImpl.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ContainerDBServiceProviderImpl.java
index 42aab2e..b1532fa 100644
--- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ContainerDBServiceProviderImpl.java
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ContainerDBServiceProviderImpl.java
@@ -37,6 +37,7 @@ import javax.inject.Singleton;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.recon.ReconUtils;
import org.apache.hadoop.ozone.recon.api.types.ContainerKeyPrefix;
import org.apache.hadoop.ozone.recon.api.types.ContainerMetadata;
import org.apache.hadoop.ozone.recon.spi.ContainerDBServiceProvider;
@@ -74,6 +75,9 @@ public class ContainerDBServiceProviderImpl
private Configuration sqlConfiguration;
@Inject
+ private ReconUtils reconUtils;
+
+ @Inject
public ContainerDBServiceProviderImpl(DBStore dbStore,
Configuration sqlConfiguration) {
globalStatsDao = new GlobalStatsDao(sqlConfiguration);
@@ -101,7 +105,8 @@ public class ContainerDBServiceProviderImpl
throws IOException {
File oldDBLocation = containerDbStore.getDbLocation();
- containerDbStore = ReconContainerDBProvider.getNewDBStore(configuration);
+ containerDbStore = ReconContainerDBProvider
+ .getNewDBStore(configuration, reconUtils);
containerKeyTable = containerDbStore.getTable(CONTAINER_KEY_TABLE,
ContainerKeyPrefix.class, Integer.class);
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
index 389be1b..e7da3a0 100644
--- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java
@@ -27,35 +27,54 @@ import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_CONNE
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_CONNECTION_TIMEOUT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_CONNECTION_TIMEOUT_DEFAULT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_FLUSH_PARAM;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INTERVAL;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INTERVAL_DEFAULT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SOCKET_TIMEOUT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SOCKET_TIMEOUT_DEFAULT;
-import static org.apache.hadoop.ozone.recon.ReconUtils.getReconDbDir;
-import static org.apache.hadoop.ozone.recon.ReconUtils.makeHttpCall;
-import static org.apache.hadoop.ozone.recon.ReconUtils.untarCheckpointFile;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DBUpdatesRequest;
+import org.apache.hadoop.ozone.recon.ReconUtils;
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
import org.apache.hadoop.ozone.recon.spi.OzoneManagerServiceProvider;
+import org.apache.hadoop.ozone.recon.tasks.OMDBUpdatesHandler;
+import org.apache.hadoop.ozone.recon.tasks.OMUpdateEventBatch;
+import org.apache.hadoop.ozone.recon.tasks.ReconTaskController;
import org.apache.hadoop.utils.db.DBCheckpoint;
+import org.apache.hadoop.utils.db.DBUpdatesWrapper;
+import org.apache.hadoop.utils.db.RDBBatchOperation;
+import org.apache.hadoop.utils.db.RDBStore;
import org.apache.hadoop.utils.db.RocksDBCheckpoint;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.ratis.protocol.ClientId;
+import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao;
+import org.hadoop.ozone.recon.schema.tables.pojos.ReconTaskStatus;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,11 +94,28 @@ public class OzoneManagerServiceProviderImpl
private File omSnapshotDBParentDir = null;
private String omDBSnapshotUrl;
- @Inject
+ private OzoneManagerProtocol ozoneManagerClient;
+ private final ClientId clientId = ClientId.randomId();
+ private final OzoneConfiguration configuration;
+ private final ScheduledExecutorService scheduler =
+ Executors.newScheduledThreadPool(1);
+
private ReconOMMetadataManager omMetadataManager;
+ private ReconTaskController reconTaskController;
+ private ReconTaskStatusDao reconTaskStatusDao;
+ private ReconUtils reconUtils;
+ private enum OmSnapshotTaskName {
+ OM_DB_FULL_SNAPSHOT,
+ OM_DB_DELTA_UPDATES
+ }
@Inject
- public OzoneManagerServiceProviderImpl(Configuration configuration) {
+ public OzoneManagerServiceProviderImpl(
+ OzoneConfiguration configuration,
+ ReconOMMetadataManager omMetadataManager,
+ ReconTaskController reconTaskController,
+ ReconUtils reconUtils,
+ OzoneManagerProtocol ozoneManagerClient) throws IOException {
String ozoneManagerHttpAddress = configuration.get(OMConfigKeys
.OZONE_OM_HTTP_ADDRESS_KEY);
@@ -87,7 +123,7 @@ public class OzoneManagerServiceProviderImpl
String ozoneManagerHttpsAddress = configuration.get(OMConfigKeys
.OZONE_OM_HTTPS_ADDRESS_KEY);
- omSnapshotDBParentDir = getReconDbDir(configuration,
+ omSnapshotDBParentDir = reconUtils.getReconDbDir(configuration,
OZONE_RECON_OM_SNAPSHOT_DB_DIR);
HttpConfig.Policy policy = DFSUtil.getHttpPolicy(configuration);
@@ -127,33 +163,39 @@ public class OzoneManagerServiceProviderImpl
omDBSnapshotUrl += "?" + OZONE_DB_CHECKPOINT_REQUEST_FLUSH + "=true";
}
+ this.reconUtils = reconUtils;
+ this.omMetadataManager = omMetadataManager;
+ this.reconTaskController = reconTaskController;
+ this.reconTaskStatusDao = reconTaskController.getReconTaskStatusDao();
+ this.ozoneManagerClient = ozoneManagerClient;
+ this.configuration = configuration;
}
@Override
- public void init() throws IOException {
- updateReconOmDBWithNewSnapshot();
+ public OMMetadataManager getOMMetadataManagerInstance() {
+ return omMetadataManager;
}
@Override
- public void updateReconOmDBWithNewSnapshot() throws IOException {
- //Obtain the current DB snapshot from OM and
- //update the in house OM metadata managed DB instance.
- DBCheckpoint dbSnapshot = getOzoneManagerDBSnapshot();
- if (dbSnapshot != null && dbSnapshot.getCheckpointLocation() != null) {
- try {
- omMetadataManager.updateOmDB(dbSnapshot.getCheckpointLocation()
- .toFile());
- } catch (IOException e) {
- LOG.error("Unable to refresh Recon OM DB Snapshot. ", e);
- }
- } else {
- LOG.error("Null snapshot location got from OM.");
- }
+ public void start() {
+ long initialDelay = configuration.getTimeDuration(
+ RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY,
+ RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ long interval = configuration.getTimeDuration(
+ RECON_OM_SNAPSHOT_TASK_INTERVAL,
+ RECON_OM_SNAPSHOT_TASK_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ scheduler.scheduleWithFixedDelay(this::syncDataFromOM,
+ initialDelay,
+ interval,
+ TimeUnit.MILLISECONDS);
}
@Override
- public OMMetadataManager getOMMetadataManagerInstance() {
- return omMetadataManager;
+ public void stop() {
+ reconTaskController.stop();
+ scheduler.shutdownNow();
}
/**
@@ -161,24 +203,24 @@ public class OzoneManagerServiceProviderImpl
* @return DBCheckpoint instance.
*/
@VisibleForTesting
- protected DBCheckpoint getOzoneManagerDBSnapshot() {
+ DBCheckpoint getOzoneManagerDBSnapshot() {
String snapshotFileName = RECON_OM_SNAPSHOT_DB + "_" + System
.currentTimeMillis();
File targetFile = new File(omSnapshotDBParentDir, snapshotFileName +
".tar.gz");
try {
- try (InputStream inputStream = makeHttpCall(httpClient,
+ try (InputStream inputStream = reconUtils.makeHttpCall(httpClient,
omDBSnapshotUrl)) {
FileUtils.copyInputStreamToFile(inputStream, targetFile);
}
- //Untar the checkpoint file.
+ // Untar the checkpoint file.
Path untarredDbDir = Paths.get(omSnapshotDBParentDir.getAbsolutePath(),
snapshotFileName);
- untarCheckpointFile(targetFile, untarredDbDir);
+ reconUtils.untarCheckpointFile(targetFile, untarredDbDir);
FileUtils.deleteQuietly(targetFile);
- //TODO Create Checkpoint based on OM DB type.
+ // TODO Create Checkpoint based on OM DB type.
// Currently, OM DB type is not configurable. Hence, defaulting to
// RocksDB.
return new RocksDBCheckpoint(untarredDbDir);
@@ -187,5 +229,119 @@ public class OzoneManagerServiceProviderImpl
}
return null;
}
+
+ /**
+ * Update Local OM DB with new OM DB snapshot.
+ * @throws IOException
+ */
+ @VisibleForTesting
+ void updateReconOmDBWithNewSnapshot() throws IOException {
+ // Obtain the current DB snapshot from OM and
+ // update the in house OM metadata managed DB instance.
+ DBCheckpoint dbSnapshot = getOzoneManagerDBSnapshot();
+ if (dbSnapshot != null && dbSnapshot.getCheckpointLocation() != null) {
+ try {
+ omMetadataManager.updateOmDB(dbSnapshot.getCheckpointLocation()
+ .toFile());
+ } catch (IOException e) {
+ LOG.error("Unable to refresh Recon OM DB Snapshot. ", e);
+ }
+ } else {
+ LOG.error("Null snapshot location got from OM.");
+ }
+ }
+
+ /**
+ * Get Delta updates from OM through RPC call and apply to local OM DB as
+ * well as accumulate in a buffer.
+ * @param fromSequenceNumber from sequence number to request from.
+ * @param omdbUpdatesHandler OM DB updates handler to buffer updates.
+ * @throws IOException when OM RPC request fails.
+ * @throws RocksDBException when writing to RocksDB fails.
+ */
+ @VisibleForTesting
+ void getAndApplyDeltaUpdatesFromOM(
+ long fromSequenceNumber, OMDBUpdatesHandler omdbUpdatesHandler)
+ throws IOException, RocksDBException {
+ DBUpdatesRequest dbUpdatesRequest = DBUpdatesRequest.newBuilder()
+ .setSequenceNumber(fromSequenceNumber).build();
+ DBUpdatesWrapper dbUpdates = ozoneManagerClient.getDBUpdates(
+ dbUpdatesRequest);
+ if (null != dbUpdates) {
+ RDBStore rocksDBStore = (RDBStore)omMetadataManager.getStore();
+ RocksDB rocksDB = rocksDBStore.getDb();
+ LOG.debug("Number of updates received from OM : " +
+ dbUpdates.getData().size());
+ for (byte[] data : dbUpdates.getData()) {
+ WriteBatch writeBatch = new WriteBatch(data);
+ writeBatch.iterate(omdbUpdatesHandler);
+ RDBBatchOperation rdbBatchOperation = new RDBBatchOperation(writeBatch);
+ rdbBatchOperation.commit(rocksDB, new WriteOptions());
+ }
+ }
+ }
+
+ /**
+ * Based on current state of Recon's OM DB, we either get delta updates or
+ * full snapshot from Ozone Manager.
+ */
+ @VisibleForTesting
+ void syncDataFromOM() {
+ long currentSequenceNumber = getCurrentOMDBSequenceNumber();
+ boolean fullSnapshot = false;
+
+ if (currentSequenceNumber <= 0) {
+ fullSnapshot = true;
+ } else {
+ OMDBUpdatesHandler omdbUpdatesHandler =
+ new OMDBUpdatesHandler(omMetadataManager);
+ try {
+ // Get updates from OM and apply to local Recon OM DB.
+ getAndApplyDeltaUpdatesFromOM(currentSequenceNumber,
+ omdbUpdatesHandler);
+ // Update timestamp of successful delta updates query.
+ ReconTaskStatus reconTaskStatusRecord = new ReconTaskStatus(
+ OmSnapshotTaskName.OM_DB_DELTA_UPDATES.name(),
+ System.currentTimeMillis(), getCurrentOMDBSequenceNumber());
+ reconTaskStatusDao.update(reconTaskStatusRecord);
+ // Pass on DB update events to tasks that are listening.
+ reconTaskController.consumeOMEvents(new OMUpdateEventBatch(
+ omdbUpdatesHandler.getEvents()), omMetadataManager);
+ } catch (IOException | InterruptedException | RocksDBException e) {
+ LOG.warn("Unable to get and apply delta updates from OM.", e);
+ fullSnapshot = true;
+ }
+ }
+
+ if (fullSnapshot) {
+ try {
+ // Update local Recon OM DB to new snapshot.
+ updateReconOmDBWithNewSnapshot();
+ // Update timestamp of successful delta updates query.
+ ReconTaskStatus reconTaskStatusRecord =
+ new ReconTaskStatus(
+ OmSnapshotTaskName.OM_DB_FULL_SNAPSHOT.name(),
+ System.currentTimeMillis(), getCurrentOMDBSequenceNumber());
+ reconTaskStatusDao.update(reconTaskStatusRecord);
+ // Reinitialize tasks that are listening.
+ reconTaskController.reInitializeTasks(omMetadataManager);
+ } catch (IOException | InterruptedException e) {
+ LOG.error("Unable to update Recon's OM DB with new snapshot ", e);
+ }
+ }
+ }
+
+ /**
+ * Get OM RocksDB's latest sequence number.
+ * @return latest sequence number.
+ */
+ private long getCurrentOMDBSequenceNumber() {
+ RDBStore rocksDBStore = (RDBStore)omMetadataManager.getStore();
+ if (null == rocksDBStore) {
+ return 0;
+ } else {
+ return rocksDBStore.getDb().getLatestSequenceNumber();
+ }
+ }
}
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconContainerDBProvider.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconContainerDBProvider.java
index de5c030..9b99f70 100644
--- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconContainerDBProvider.java
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/ReconContainerDBProvider.java
@@ -22,11 +22,11 @@ import static org.apache.hadoop.ozone.recon.ReconConstants.CONTAINER_KEY_COUNT_T
import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_CONTAINER_DB;
import static org.apache.hadoop.ozone.recon.ReconConstants.CONTAINER_KEY_TABLE;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DB_DIR;
-import static org.apache.hadoop.ozone.recon.ReconUtils.getReconDbDir;
import java.nio.file.Path;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.recon.ReconUtils;
import org.apache.hadoop.ozone.recon.api.types.ContainerKeyPrefix;
import org.apache.hadoop.utils.db.DBStore;
import org.apache.hadoop.utils.db.DBStoreBuilder;
@@ -52,9 +52,12 @@ public class ReconContainerDBProvider implements Provider<DBStore> {
@Inject
private OzoneConfiguration configuration;
+ @Inject
+ private ReconUtils reconUtils;
+
@Override
public DBStore get() {
- DBStore dbStore = getNewDBStore(configuration);
+ DBStore dbStore = getNewDBStore(configuration, reconUtils);
if (dbStore == null) {
throw new ProvisionException("Unable to provide instance of DBStore " +
"store.");
@@ -62,11 +65,13 @@ public class ReconContainerDBProvider implements Provider<DBStore> {
return dbStore;
}
- public static DBStore getNewDBStore(OzoneConfiguration configuration) {
+ public static DBStore getNewDBStore(OzoneConfiguration configuration,
+ ReconUtils reconUtils) {
DBStore dbStore = null;
String dbName = RECON_CONTAINER_DB + "_" + System.currentTimeMillis();
try {
- Path metaDir = getReconDbDir(configuration, OZONE_RECON_DB_DIR).toPath();
+ Path metaDir = reconUtils.getReconDbDir(
+ configuration, OZONE_RECON_DB_DIR).toPath();
dbStore = DBStoreBuilder.newBuilder(configuration)
.setPath(metaDir)
.setName(dbName)
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java
index ec8695c..18a1e38 100644
--- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java
@@ -18,11 +18,13 @@
package org.apache.hadoop.ozone.recon.tasks;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE;
+
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
-import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -41,28 +43,23 @@ import org.apache.hadoop.utils.db.TableIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.inject.Inject;
+
/**
* Class to iterate over the OM DB and populate the Recon container DB with
* the container -> Key reverse mapping.
*/
-public class ContainerKeyMapperTask extends ReconDBUpdateTask {
+public class ContainerKeyMapperTask implements ReconDBUpdateTask {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerKeyMapperTask.class);
private ContainerDBServiceProvider containerDBServiceProvider;
- private Collection<String> tables = new ArrayList<>();
+ @Inject
public ContainerKeyMapperTask(ContainerDBServiceProvider
- containerDBServiceProvider,
- OMMetadataManager omMetadataManager) {
- super("ContainerKeyMapperTask");
+ containerDBServiceProvider) {
this.containerDBServiceProvider = containerDBServiceProvider;
- try {
- tables.add(omMetadataManager.getKeyTable().getName());
- } catch (IOException ioEx) {
- LOG.error("Unable to listen on Key Table updates ", ioEx);
- }
}
/**
@@ -103,13 +100,19 @@ public class ContainerKeyMapperTask extends ReconDBUpdateTask {
}
@Override
- protected Collection<String> getTaskTables() {
- return tables;
+ public String getTaskName() {
+ return "ContainerKeyMapperTask";
+ }
+
+ @Override
+ public Collection<String> getTaskTables() {
+ return Collections.singletonList(KEY_TABLE);
}
@Override
- Pair<String, Boolean> process(OMUpdateEventBatch events) {
+ public Pair<String, Boolean> process(OMUpdateEventBatch events) {
Iterator<OMDBUpdateEvent> eventIterator = events.getIterator();
+ int eventCount = 0;
while (eventIterator.hasNext()) {
OMDBUpdateEvent<String, OmKeyInfo> omdbUpdateEvent = eventIterator.next();
String updatedKey = omdbUpdateEvent.getKey();
@@ -127,12 +130,15 @@ public class ContainerKeyMapperTask extends ReconDBUpdateTask {
default: LOG.debug("Skipping DB update event : " + omdbUpdateEvent
.getAction());
}
+ eventCount++;
} catch (IOException e) {
LOG.error("Unexpected exception while updating key data : {} ",
updatedKey, e);
return new ImmutablePair<>(getTaskName(), false);
}
}
+ LOG.info("{} successfully processed {} OM DB update event(s).",
+ getTaskName(), eventCount);
return new ImmutablePair<>(getTaskName(), true);
}
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTask.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTask.java
index a09eaff..7432392 100644
--- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTask.java
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTask.java
@@ -33,11 +33,12 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE;
import static org.apache.hadoop.ozone.recon.tasks.
OMDBUpdateEvent.OMDBUpdateAction.DELETE;
import static org.apache.hadoop.ozone.recon.tasks.
@@ -48,7 +49,7 @@ import static org.apache.hadoop.ozone.recon.tasks.
* files binned into ranges (1KB, 2Kb..,4MB,.., 1TB,..1PB) to the Recon
* fileSize DB.
*/
-public class FileSizeCountTask extends ReconDBUpdateTask {
+public class FileSizeCountTask implements ReconDBUpdateTask {
private static final Logger LOG =
LoggerFactory.getLogger(FileSizeCountTask.class);
@@ -56,19 +57,11 @@ public class FileSizeCountTask extends ReconDBUpdateTask {
private long maxFileSizeUpperBound = 1125899906842624L; // 1 PB
private long[] upperBoundCount;
private long oneKb = 1024L;
- private Collection<String> tables = new ArrayList<>();
private FileCountBySizeDao fileCountBySizeDao;
@Inject
- public FileSizeCountTask(OMMetadataManager omMetadataManager,
- Configuration sqlConfiguration) {
- super("FileSizeCountTask");
- try {
- tables.add(omMetadataManager.getKeyTable().getName());
- fileCountBySizeDao = new FileCountBySizeDao(sqlConfiguration);
- } catch (Exception e) {
- LOG.error("Unable to fetch Key Table updates ", e);
- }
+ public FileSizeCountTask(Configuration sqlConfiguration) {
+ fileCountBySizeDao = new FileCountBySizeDao(sqlConfiguration);
upperBoundCount = new long[getMaxBinSize()];
}
@@ -98,7 +91,6 @@ public class FileSizeCountTask extends ReconDBUpdateTask {
*/
@Override
public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
- LOG.info("Starting a 'reprocess' run of FileSizeCountTask.");
Table<String, OmKeyInfo> omKeyInfoTable = omMetadataManager.getKeyTable();
try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
keyIter = omKeyInfoTable.iterator()) {
@@ -119,8 +111,13 @@ public class FileSizeCountTask extends ReconDBUpdateTask {
}
@Override
- protected Collection<String> getTaskTables() {
- return tables;
+ public String getTaskName() {
+ return "FileSizeCountTask";
+ }
+
+ @Override
+ public Collection<String> getTaskTables() {
+ return Collections.singletonList(KEY_TABLE);
}
private void updateCountFromDB() {
@@ -144,8 +141,7 @@ public class FileSizeCountTask extends ReconDBUpdateTask {
* @return Pair
*/
@Override
- Pair<String, Boolean> process(OMUpdateEventBatch events) {
- LOG.info("Starting a 'process' run of FileSizeCountTask.");
+ public Pair<String, Boolean> process(OMUpdateEventBatch events) {
Iterator<OMDBUpdateEvent> eventIterator = events.getIterator();
//update array with file size count from DB
@@ -246,9 +242,9 @@ public class FileSizeCountTask extends ReconDBUpdateTask {
//decrement only if it had files before, default DB value is 0
upperBoundCount[binIndex]--;
} else {
- LOG.debug("Cannot decrement count. Default value is 0 (zero).");
- throw new IOException("Cannot decrement count. "
- + "Default value is 0 (zero).");
+ LOG.warn("Unexpected error while updating bin count. Found 0 count " +
+ "for index : " + binIndex + " while processing DELETE event for "
+ + omKeyInfo.getKeyName());
}
}
}
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdateEvent.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdateEvent.java
index 82b7a35..0fcabcc 100644
--- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdateEvent.java
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdateEvent.java
@@ -30,18 +30,18 @@ public final class OMDBUpdateEvent<KEY, VALUE> {
private final String table;
private final KEY updatedKey;
private final VALUE updatedValue;
- private final EventInfo eventInfo;
+ private final long sequenceNumber;
private OMDBUpdateEvent(OMDBUpdateAction action,
String table,
KEY updatedKey,
VALUE updatedValue,
- EventInfo eventInfo) {
+ long sequenceNumber) {
this.action = action;
this.table = table;
this.updatedKey = updatedKey;
this.updatedValue = updatedValue;
- this.eventInfo = eventInfo;
+ this.sequenceNumber = sequenceNumber;
}
public OMDBUpdateAction getAction() {
@@ -60,8 +60,8 @@ public final class OMDBUpdateEvent<KEY, VALUE> {
return updatedValue;
}
- public EventInfo getEventInfo() {
- return eventInfo;
+ public long getSequenceNumber() {
+ return sequenceNumber;
}
/**
@@ -75,7 +75,7 @@ public final class OMDBUpdateEvent<KEY, VALUE> {
private String table;
private KEY updatedKey;
private VALUE updatedValue;
- private EventInfo eventInfo;
+ private long lastSequenceNumber;
OMUpdateEventBuilder setAction(OMDBUpdateAction omdbUpdateAction) {
this.action = omdbUpdateAction;
@@ -97,10 +97,8 @@ public final class OMDBUpdateEvent<KEY, VALUE> {
return this;
}
- OMUpdateEventBuilder setEventInfo(long sequenceNumber,
- long eventTimestampMillis) {
- this.eventInfo = new EventInfo(sequenceNumber,
- eventTimestampMillis);
+ OMUpdateEventBuilder setSequenceNumber(long sequenceNumber) {
+ this.lastSequenceNumber = sequenceNumber;
return this;
}
@@ -114,30 +112,7 @@ public final class OMDBUpdateEvent<KEY, VALUE> {
table,
updatedKey,
updatedValue,
- eventInfo);
- }
- }
-
- /**
- * Class used to hold timing information for an event. (Seq number and
- * timestamp)
- */
- public static class EventInfo {
- private long sequenceNumber;
- private long eventTimestampMillis;
-
- public EventInfo(long sequenceNumber,
- long eventTimestampMillis) {
- this.sequenceNumber = sequenceNumber;
- this.eventTimestampMillis = eventTimestampMillis;
- }
-
- public long getSequenceNumber() {
- return sequenceNumber;
- }
-
- public long getEventTimestampMillis() {
- return eventTimestampMillis;
+ lastSequenceNumber);
}
}
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdatesHandler.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdatesHandler.java
index d2d11b2..00bbade 100644
--- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdatesHandler.java
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMDBUpdatesHandler.java
@@ -78,6 +78,11 @@ public class OMDBUpdatesHandler extends WriteBatch.Handler {
/**
*
+ * @param cfIndex
+ * @param keyBytes
+ * @param valueBytes
+ * @param action
+ * @throws IOException
*/
private void processEvent(int cfIndex, byte[] keyBytes, byte[]
valueBytes, OMDBUpdateEvent.OMDBUpdateAction action)
@@ -100,8 +105,8 @@ public class OMDBUpdatesHandler extends WriteBatch.Handler {
builder.setAction(action);
OMDBUpdateEvent event = builder.build();
- LOG.info("Generated OM update Event for table : " + event.getTable()
- + ", Key = " + event.getKey());
+ LOG.debug("Generated OM update Event for table : " + event.getTable()
+ + ", Key = " + event.getKey() + ", action = " + event.getAction());
// Temporarily adding to an event buffer for testing. In subsequent JIRAs,
// a Recon side class will be implemented that requests delta updates
// from OM and calls on this handler. In that case, we will fill up
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMUpdateEventBatch.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMUpdateEventBatch.java
index 3b7cc5b..f137418 100644
--- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMUpdateEventBatch.java
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OMUpdateEventBatch.java
@@ -31,7 +31,7 @@ public class OMUpdateEventBatch {
private List<OMDBUpdateEvent> events;
- OMUpdateEventBatch(Collection<OMDBUpdateEvent> e) {
+ public OMUpdateEventBatch(Collection<OMDBUpdateEvent> e) {
events = new ArrayList<>(e);
}
@@ -39,11 +39,11 @@ public class OMUpdateEventBatch {
* Get Sequence Number and timestamp of last event in this batch.
* @return Event Info instance.
*/
- OMDBUpdateEvent.EventInfo getLastEventInfo() {
+ long getLastSequenceNumber() {
if (events.isEmpty()) {
- return new OMDBUpdateEvent.EventInfo(-1, -1);
+ return -1;
} else {
- return events.get(events.size() - 1).getEventInfo();
+ return events.get(events.size() - 1).getSequenceNumber();
}
}
@@ -66,4 +66,12 @@ public class OMUpdateEventBatch {
.filter(e -> tables.contains(e.getTable()))
.collect(Collectors.toList()));
}
+
+ /**
+ * Return if empty.
+ * @return true if empty, else false.
+ */
+ public boolean isEmpty() {
+ return !getIterator().hasNext();
+ }
}
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconDBUpdateTask.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconDBUpdateTask.java
index d828577..426e0ae 100644
--- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconDBUpdateTask.java
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconDBUpdateTask.java
@@ -24,43 +24,35 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.ozone.om.OMMetadataManager;
/**
- * Abstract class used to denote a Recon task that needs to act on OM DB events.
+ * Interface used to denote a Recon task that needs to act on OM DB events.
*/
-public abstract class ReconDBUpdateTask {
-
- private String taskName;
-
- protected ReconDBUpdateTask(String taskName) {
- this.taskName = taskName;
- }
+public interface ReconDBUpdateTask {
/**
* Return task name.
* @return task name
*/
- public String getTaskName() {
- return taskName;
- }
+ String getTaskName();
/**
* Return the list of tables that the task is listening on.
* Empty list means the task is NOT listening on any tables.
* @return Collection of Tables.
*/
- protected abstract Collection<String> getTaskTables();
+ Collection<String> getTaskTables();
/**
* Process a set of OM events on tables that the task is listening on.
* @param events Set of events to be processed by the task.
* @return Pair of task name -> task success.
*/
- abstract Pair<String, Boolean> process(OMUpdateEventBatch events);
+ Pair<String, Boolean> process(OMUpdateEventBatch events);
/**
* Process a on tables that the task is listening on.
* @param omMetadataManager OM Metadata manager instance.
* @return Pair of task name -> task success.
*/
- abstract Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager);
+ Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager);
}
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskController.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskController.java
index 7548cc9..728a199 100644
--- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskController.java
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskController.java
@@ -20,6 +20,9 @@ package org.apache.hadoop.ozone.recon.tasks;
import java.util.Map;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao;
+
/**
* Controller used by Recon to manage Tasks that are waiting on Recon events.
*/
@@ -36,11 +39,31 @@ public interface ReconTaskController {
* @param events set of events
* @throws InterruptedException InterruptedException
*/
- void consumeOMEvents(OMUpdateEventBatch events) throws InterruptedException;
+ void consumeOMEvents(OMUpdateEventBatch events,
+ OMMetadataManager omMetadataManager)
+ throws InterruptedException;
+
+ /**
+ * Pass on the handle to a new OM DB instance to the registered tasks.
+ * @param omMetadataManager OM Metadata Manager instance
+ */
+ void reInitializeTasks(OMMetadataManager omMetadataManager)
+ throws InterruptedException;
/**
* Get set of registered tasks.
* @return Map of Task name -> Task.
*/
Map<String, ReconDBUpdateTask> getRegisteredTasks();
+
+ /**
+ * Get instance of ReconTaskStatusDao.
+ * @return instance of ReconTaskStatusDao
+ */
+ ReconTaskStatusDao getReconTaskStatusDao();
+
+ /**
+ * Stop the tasks. Start API is not needed since it is implicit.
+ */
+ void stop();
}
diff --git a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java
index 3fd7d96..9135705 100644
--- a/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java
+++ b/hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java
@@ -26,6 +26,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -36,7 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao;
import org.hadoop.ozone.recon.schema.tables.pojos.ReconTaskStatus;
import org.jooq.Configuration;
@@ -57,21 +58,22 @@ public class ReconTaskControllerImpl implements ReconTaskController {
private ExecutorService executorService;
private int threadCount = 1;
private final Semaphore taskSemaphore = new Semaphore(1);
- private final ReconOMMetadataManager omMetadataManager;
private Map<String, AtomicInteger> taskFailureCounter = new HashMap<>();
private static final int TASK_FAILURE_THRESHOLD = 2;
private ReconTaskStatusDao reconTaskStatusDao;
@Inject
public ReconTaskControllerImpl(OzoneConfiguration configuration,
- ReconOMMetadataManager omMetadataManager,
- Configuration sqlConfiguration) {
- this.omMetadataManager = omMetadataManager;
+ Configuration sqlConfiguration,
+ Set<ReconDBUpdateTask> tasks) {
reconDBUpdateTasks = new HashMap<>();
threadCount = configuration.getInt(OZONE_RECON_TASK_THREAD_COUNT_KEY,
OZONE_RECON_TASK_THREAD_COUNT_DEFAULT);
executorService = Executors.newFixedThreadPool(threadCount);
reconTaskStatusDao = new ReconTaskStatusDao(sqlConfiguration);
+ for (ReconDBUpdateTask task : tasks) {
+ registerTask(task);
+ }
}
@Override
@@ -86,7 +88,9 @@ public class ReconTaskControllerImpl implements ReconTaskController {
// Create DB record for the task.
ReconTaskStatus reconTaskStatusRecord = new ReconTaskStatus(taskName,
0L, 0L);
- reconTaskStatusDao.insert(reconTaskStatusRecord);
+ if (!reconTaskStatusDao.existsById(taskName)) {
+ reconTaskStatusDao.insert(reconTaskStatusRecord);
+ }
}
/**
@@ -98,7 +102,69 @@ public class ReconTaskControllerImpl implements ReconTaskController {
* @throws InterruptedException
*/
@Override
- public void consumeOMEvents(OMUpdateEventBatch events)
+ public void consumeOMEvents(OMUpdateEventBatch events,
+ OMMetadataManager omMetadataManager)
+ throws InterruptedException {
+ taskSemaphore.acquire();
+
+ try {
+ if (!events.isEmpty()) {
+ Collection<Callable<Pair>> tasks = new ArrayList<>();
+ for (Map.Entry<String, ReconDBUpdateTask> taskEntry :
+ reconDBUpdateTasks.entrySet()) {
+ ReconDBUpdateTask task = taskEntry.getValue();
+ Collection<String> tables = task.getTaskTables();
+ tasks.add(() -> task.process(events.filter(tables)));
+ }
+
+ List<Future<Pair>> results = executorService.invokeAll(tasks);
+ List<String> failedTasks = processTaskResults(results, events);
+
+ // Retry
+ List<String> retryFailedTasks = new ArrayList<>();
+ if (!failedTasks.isEmpty()) {
+ tasks.clear();
+ for (String taskName : failedTasks) {
+ ReconDBUpdateTask task = reconDBUpdateTasks.get(taskName);
+ Collection<String> tables = task.getTaskTables();
+ tasks.add(() -> task.process(events.filter(tables)));
+ }
+ results = executorService.invokeAll(tasks);
+ retryFailedTasks = processTaskResults(results, events);
+ }
+
+ // Reprocess the failed tasks.
+ // TODO Move to a separate task queue since reprocess may be a heavy
+ // operation for large OM DB instances
+ if (!retryFailedTasks.isEmpty()) {
+ tasks.clear();
+ for (String taskName : failedTasks) {
+ ReconDBUpdateTask task = reconDBUpdateTasks.get(taskName);
+ tasks.add(() -> task.reprocess(omMetadataManager));
+ }
+ results = executorService.invokeAll(tasks);
+ List<String> reprocessFailedTasks =
+ processTaskResults(results, events);
+ for (String taskName : reprocessFailedTasks) {
+ LOG.info("Reprocess step failed for task : " + taskName);
+ if (taskFailureCounter.get(taskName).incrementAndGet() >
+ TASK_FAILURE_THRESHOLD) {
+ LOG.info("Blacklisting Task since it failed retry and " +
+ "reprocess more than " + TASK_FAILURE_THRESHOLD + " times.");
+ reconDBUpdateTasks.remove(taskName);
+ }
+ }
+ }
+ }
+ } catch (ExecutionException e) {
+ LOG.error("Unexpected error : ", e);
+ } finally {
+ taskSemaphore.release();
+ }
+ }
+
+ @Override
+ public void reInitializeTasks(OMMetadataManager omMetadataManager)
throws InterruptedException {
taskSemaphore.acquire();
@@ -107,43 +173,14 @@ public class ReconTaskControllerImpl implements ReconTaskController {
for (Map.Entry<String, ReconDBUpdateTask> taskEntry :
reconDBUpdateTasks.entrySet()) {
ReconDBUpdateTask task = taskEntry.getValue();
- tasks.add(() -> task.process(events));
+ tasks.add(() -> task.reprocess(omMetadataManager));
}
List<Future<Pair>> results = executorService.invokeAll(tasks);
- List<String> failedTasks = processTaskResults(results, events);
-
- //Retry
- List<String> retryFailedTasks = new ArrayList<>();
- if (!failedTasks.isEmpty()) {
- tasks.clear();
- for (String taskName : failedTasks) {
- ReconDBUpdateTask task = reconDBUpdateTasks.get(taskName);
- tasks.add(() -> task.process(events));
- }
- results = executorService.invokeAll(tasks);
- retryFailedTasks = processTaskResults(results, events);
- }
-
- //Reprocess
- //TODO Move to a separate task queue since reprocess may be a heavy
- //operation for large OM DB instances
- if (!retryFailedTasks.isEmpty()) {
- tasks.clear();
- for (String taskName : failedTasks) {
- ReconDBUpdateTask task = reconDBUpdateTasks.get(taskName);
- tasks.add(() -> task.reprocess(omMetadataManager));
- }
- results = executorService.invokeAll(tasks);
- List<String> reprocessFailedTasks = processTaskResults(results, events);
- for (String taskName : reprocessFailedTasks) {
- LOG.info("Reprocess step failed for task : " + taskName);
- if (taskFailureCounter.get(taskName).incrementAndGet() >
- TASK_FAILURE_THRESHOLD) {
- LOG.info("Blacklisting Task since it failed retry and " +
- "reprocess more than " + TASK_FAILURE_THRESHOLD + " times.");
- reconDBUpdateTasks.remove(taskName);
- }
+ for (Future<Pair> f : results) {
+ String taskName = f.get().getLeft().toString();
+ if (!(Boolean)f.get().getRight()) {
+ LOG.info("Init failed for task : " + taskName);
}
}
} catch (ExecutionException e) {
@@ -157,12 +194,12 @@ public class ReconTaskControllerImpl implements ReconTaskController {
* Store the last completed event sequence number and timestamp to the DB
* for that task.
* @param taskName taskname to be updated.
- * @param eventInfo contains the new sequence number and timestamp.
+ * @param lastSequenceNumber contains the new sequence number.
*/
private void storeLastCompletedTransaction(
- String taskName, OMDBUpdateEvent.EventInfo eventInfo) {
+ String taskName, long lastSequenceNumber) {
ReconTaskStatus reconTaskStatusRecord = new ReconTaskStatus(taskName,
- eventInfo.getEventTimestampMillis(), eventInfo.getSequenceNumber());
+ System.currentTimeMillis(), lastSequenceNumber);
reconTaskStatusDao.update(reconTaskStatusRecord);
}
@@ -171,6 +208,16 @@ public class ReconTaskControllerImpl implements ReconTaskController {
return reconDBUpdateTasks;
}
+ @Override
+ public ReconTaskStatusDao getReconTaskStatusDao() {
+ return reconTaskStatusDao;
+ }
+
+ @Override
+ public void stop() {
+ this.executorService.shutdownNow();
+ }
+
/**
* Wait on results of all tasks.
* @param results Set of Futures.
@@ -190,7 +237,7 @@ public class ReconTaskControllerImpl implements ReconTaskController {
failedTasks.add(f.get().getLeft().toString());
} else {
taskFailureCounter.get(taskName).set(0);
- storeLastCompletedTransaction(taskName, events.getLastEventInfo());
+ storeLastCompletedTransaction(taskName, events.getLastSequenceNumber());
}
}
return failedTasks;
diff --git a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/AbstractOMMetadataManagerTest.java b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/AbstractOMMetadataManagerTest.java
index 7dc987d..fe2cf49 100644
--- a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/AbstractOMMetadataManagerTest.java
+++ b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/AbstractOMMetadataManagerTest.java
@@ -56,7 +56,7 @@ public abstract class AbstractOMMetadataManagerTest {
public TemporaryFolder temporaryFolder = new TemporaryFolder();
/**
- * Create a new OM Metadata manager instance.
+ * Create a new OM Metadata manager instance with default volume and bucket.
* @throws IOException ioEx
*/
protected OMMetadataManager initializeNewOmMetadataManager()
@@ -88,6 +88,19 @@ public abstract class AbstractOMMetadataManagerTest {
}
/**
+ * Create an empty OM Metadata manager instance.
+ * @throws IOException ioEx
+ */
+ protected OMMetadataManager initializeEmptyOmMetadataManager()
+ throws IOException {
+ File omDbDir = temporaryFolder.newFolder();
+ OzoneConfiguration omConfiguration = new OzoneConfiguration();
+ omConfiguration.set(OZONE_OM_DB_DIRS,
+ omDbDir.getAbsolutePath());
+ return new OmMetadataManagerImpl(omConfiguration);
+ }
+
+ /**
* Get an instance of Recon OM Metadata manager.
* @return ReconOMMetadataManager
* @throws IOException when creating the RocksDB instance.
diff --git a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconUtils.java b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconUtils.java
index f531bb2..ad04837 100644
--- a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconUtils.java
+++ b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconUtils.java
@@ -61,7 +61,7 @@ public class TestReconUtils {
OzoneConfiguration configuration = new OzoneConfiguration();
configuration.set("TEST_DB_DIR", filePath);
- File file = ReconUtils.getReconDbDir(configuration,
+ File file = new ReconUtils().getReconDbDir(configuration,
"TEST_DB_DIR");
Assert.assertEquals(filePath, file.getAbsolutePath());
}
@@ -89,7 +89,7 @@ public class TestReconUtils {
//Create test tar file.
File tarFile = OmUtils.createTarFile(newDir.toPath());
File outputDir = folder.newFolder();
- ReconUtils.untarCheckpointFile(tarFile, outputDir.toPath());
+ new ReconUtils().untarCheckpointFile(tarFile, outputDir.toPath());
assertTrue(outputDir.isDirectory());
assertTrue(outputDir.listFiles().length == 2);
@@ -126,7 +126,8 @@ public class TestReconUtils {
}
});
- InputStream inputStream = ReconUtils.makeHttpCall(httpClientMock, url);
+ InputStream inputStream = new ReconUtils()
+ .makeHttpCall(httpClientMock, url);
String contents = IOUtils.toString(inputStream, Charset.defaultCharset());
assertEquals("File 1 Contents", contents);
diff --git a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerKeyService.java b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerKeyService.java
index 3ae39a6..3bef4a0 100644
--- a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerKeyService.java
+++ b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestContainerKeyService.java
@@ -20,10 +20,9 @@ package org.apache.hadoop.ozone.recon.api;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -36,15 +35,12 @@ import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.recon.AbstractOMMetadataManagerTest;
import org.apache.hadoop.ozone.recon.GuiceInjectorUtilsForTestsImpl;
-import org.apache.hadoop.ozone.recon.ReconUtils;
import org.apache.hadoop.ozone.recon.api.types.ContainerMetadata;
import org.apache.hadoop.ozone.recon.api.types.ContainersResponse;
import org.apache.hadoop.ozone.recon.api.types.KeyMetadata;
@@ -53,53 +49,33 @@ import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
import org.apache.hadoop.ozone.recon.spi.ContainerDBServiceProvider;
import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl;
import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperTask;
-import org.apache.hadoop.utils.db.DBCheckpoint;
-import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.hadoop.utils.db.Table;
import org.hadoop.ozone.recon.schema.StatsSchemaDefinition;
import org.jooq.impl.DSL;
import org.jooq.impl.DefaultConfiguration;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
import com.google.inject.AbstractModule;
import com.google.inject.Injector;
-import org.junit.rules.TemporaryFolder;
-
/**
* Test for container key service.
*/
-@RunWith(PowerMockRunner.class)
-@PowerMockIgnore({"javax.management.*", "javax.net.ssl.*"})
-@PrepareForTest(ReconUtils.class)
public class TestContainerKeyService extends AbstractOMMetadataManagerTest {
- @Rule
- public TemporaryFolder temporaryFolder = new TemporaryFolder();
private ContainerDBServiceProvider containerDbServiceProvider;
- private OMMetadataManager omMetadataManager;
private Injector injector;
private OzoneManagerServiceProviderImpl ozoneManagerServiceProvider;
private ContainerKeyService containerKeyService;
private GuiceInjectorUtilsForTestsImpl guiceInjectorTest =
new GuiceInjectorUtilsForTestsImpl();
private boolean isSetupDone = false;
-
+ private ReconOMMetadataManager reconOMMetadataManager;
private void initializeInjector() throws Exception {
- omMetadataManager = initializeNewOmMetadataManager();
- OzoneConfiguration configuration =
- guiceInjectorTest.getTestOzoneConfiguration(temporaryFolder);
-
- ozoneManagerServiceProvider = new OzoneManagerServiceProviderImpl(
- configuration);
- ReconOMMetadataManager reconOMMetadataManager =
- getTestMetadataManager(omMetadataManager);
+ reconOMMetadataManager = getTestMetadataManager(
+ initializeNewOmMetadataManager());
+ ozoneManagerServiceProvider = getMockOzoneManagerServiceProvider();
Injector parentInjector = guiceInjectorTest.getInjector(
ozoneManagerServiceProvider, reconOMMetadataManager, temporaryFolder);
@@ -150,7 +126,7 @@ public class TestContainerKeyService extends AbstractOMMetadataManagerTest {
OmKeyLocationInfoGroup(0, omKeyLocationInfoList);
//key = key_one, Blocks = [ {CID = 1, LID = 101}, {CID = 2, LID = 102} ]
- writeDataToOm(omMetadataManager,
+ writeDataToOm(reconOMMetadataManager,
"key_one", "bucketOne", "sampleVol",
Collections.singletonList(omKeyLocationInfoGroup));
@@ -174,7 +150,7 @@ public class TestContainerKeyService extends AbstractOMMetadataManagerTest {
omKeyLocationInfoListNew));
//key = key_two, Blocks = [ {CID = 1, LID = 103}, {CID = 1, LID = 104} ]
- writeDataToOm(omMetadataManager,
+ writeDataToOm(reconOMMetadataManager,
"key_two", "bucketOne", "sampleVol", infoGroups);
List<OmKeyLocationInfo> omKeyLocationInfoList2 = new ArrayList<>();
@@ -192,27 +168,18 @@ public class TestContainerKeyService extends AbstractOMMetadataManagerTest {
OmKeyLocationInfoGroup(0, omKeyLocationInfoList2);
//key = key_three, Blocks = [ {CID = 2, LID = 2}, {CID = 2, LID = 3} ]
- writeDataToOm(omMetadataManager,
+ writeDataToOm(reconOMMetadataManager,
"key_three", "bucketOne", "sampleVol",
Collections.singletonList(omKeyLocationInfoGroup2));
- //Take snapshot of OM DB and copy over to Recon OM DB.
- DBCheckpoint checkpoint = omMetadataManager.getStore()
- .getCheckpoint(true);
- File tarFile = OmUtils.createTarFile(checkpoint.getCheckpointLocation());
- InputStream inputStream = new FileInputStream(tarFile);
- PowerMockito.stub(PowerMockito.method(ReconUtils.class,
- "makeHttpCall",
- CloseableHttpClient.class, String.class))
- .toReturn(inputStream);
-
//Generate Recon container DB data.
- ContainerKeyMapperTask containerKeyMapperTask = new ContainerKeyMapperTask(
- containerDbServiceProvider,
- ozoneManagerServiceProvider.getOMMetadataManagerInstance());
- ozoneManagerServiceProvider.updateReconOmDBWithNewSnapshot();
- containerKeyMapperTask.reprocess(ozoneManagerServiceProvider
- .getOMMetadataManagerInstance());
+ OMMetadataManager omMetadataManagerMock = mock(OMMetadataManager.class);
+ Table tableMock = mock(Table.class);
+ when(tableMock.getName()).thenReturn("KeyTable");
+ when(omMetadataManagerMock.getKeyTable()).thenReturn(tableMock);
+ ContainerKeyMapperTask containerKeyMapperTask =
+ new ContainerKeyMapperTask(containerDbServiceProvider);
+ containerKeyMapperTask.reprocess(reconOMMetadataManager);
}
@Test
@@ -397,4 +364,10 @@ public class TestContainerKeyService extends AbstractOMMetadataManagerTest {
assertEquals(2, containers.size());
assertEquals(2, data.getTotalCount());
}
+
+ private OzoneManagerServiceProviderImpl getMockOzoneManagerServiceProvider() {
+ OzoneManagerServiceProviderImpl omServiceProviderMock =
+ mock(OzoneManagerServiceProviderImpl.class);
+ return omServiceProviderMock;
+ }
}
\ No newline at end of file
diff --git a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestUtilizationService.java b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestUtilizationService.java
index a5c7263..a3265b8 100644
--- a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestUtilizationService.java
+++ b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestUtilizationService.java
@@ -18,35 +18,25 @@
package org.apache.hadoop.ozone.recon.api;
-import org.apache.hadoop.ozone.recon.ReconUtils;
import org.hadoop.ozone.recon.schema.tables.daos.FileCountBySizeDao;
import org.hadoop.ozone.recon.schema.tables.pojos.FileCountBySize;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
import javax.ws.rs.core.Response;
import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.assertEquals;
-import static org.powermock.api.mockito.PowerMockito.mock;
-import static org.powermock.api.mockito.PowerMockito.when;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
/**
* Test for File size count service.
*/
-@RunWith(PowerMockRunner.class)
-@PowerMockIgnore({"javax.management.*", "javax.net.ssl.*"})
-@PrepareForTest(ReconUtils.class)
public class TestUtilizationService {
private UtilizationService utilizationService;
- @Mock private FileCountBySizeDao fileCountBySizeDao;
private int maxBinSize = 42;
private List<FileCountBySize> setUpResultList() {
@@ -68,6 +58,7 @@ public class TestUtilizationService {
public void testGetFileCounts() {
List<FileCountBySize> resultList = setUpResultList();
+ FileCountBySizeDao fileCountBySizeDao = mock(FileCountBySizeDao.class);
utilizationService = mock(UtilizationService.class);
when(utilizationService.getFileCounts()).thenCallRealMethod();
when(utilizationService.getDao()).thenReturn(fileCountBySizeDao);
diff --git a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java
index fde8142..c2a6dd8 100644
--- a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java
+++ b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java
@@ -18,104 +18,108 @@
package org.apache.hadoop.ozone.recon.spi.impl;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DB_DIR;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_DB_DIR;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
+import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Paths;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.recon.AbstractOMMetadataManagerTest;
-import org.apache.hadoop.ozone.recon.GuiceInjectorUtilsForTestsImpl;
import org.apache.hadoop.ozone.recon.ReconUtils;
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
+import org.apache.hadoop.ozone.recon.tasks.OMDBUpdatesHandler;
+import org.apache.hadoop.ozone.recon.tasks.OMUpdateEventBatch;
+import org.apache.hadoop.ozone.recon.tasks.ReconTaskController;
import org.apache.hadoop.utils.db.DBCheckpoint;
-import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.hadoop.utils.db.DBUpdatesWrapper;
+import org.apache.hadoop.utils.db.RDBStore;
+import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao;
+import org.hadoop.ozone.recon.schema.tables.pojos.ReconTaskStatus;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import com.google.inject.Injector;
+import org.mockito.ArgumentCaptor;
+import org.rocksdb.RocksDB;
+import org.rocksdb.TransactionLogIterator;
+import org.rocksdb.WriteBatch;
/**
* Class to test Ozone Manager Service Provider Implementation.
*/
-@RunWith(PowerMockRunner.class)
-@PowerMockIgnore({"javax.management.*", "javax.net.ssl.*"})
-@PrepareForTest(ReconUtils.class)
public class TestOzoneManagerServiceProviderImpl extends
AbstractOMMetadataManagerTest {
- private OMMetadataManager omMetadataManager;
- private ReconOMMetadataManager reconOMMetadataManager;
- private Injector injector;
- private GuiceInjectorUtilsForTestsImpl guiceInjectorTest =
- new GuiceInjectorUtilsForTestsImpl();
- private OzoneManagerServiceProviderImpl ozoneManagerServiceProvider;
- private boolean isSetupDone = false;
-
- @Rule
- public TemporaryFolder temporaryFolder = new TemporaryFolder();
+ private OzoneConfiguration configuration;
+ private OzoneManagerProtocol ozoneManagerProtocol;
@Before
public void setUp() throws Exception {
- omMetadataManager = initializeNewOmMetadataManager();
- writeDataToOm(omMetadataManager, "key_one");
- reconOMMetadataManager = getTestMetadataManager(omMetadataManager);
- ozoneManagerServiceProvider =
- new OzoneManagerServiceProviderImpl(
- guiceInjectorTest.getTestOzoneConfiguration(temporaryFolder));
- if (!isSetupDone) {
- injector = guiceInjectorTest.getInjector(ozoneManagerServiceProvider,
- reconOMMetadataManager, temporaryFolder);
-
- isSetupDone = true;
- }
+ configuration = new OzoneConfiguration();
+ configuration.set(OZONE_RECON_OM_SNAPSHOT_DB_DIR,
+ temporaryFolder.newFolder().getAbsolutePath());
+ configuration.set(OZONE_RECON_DB_DIR,
+ temporaryFolder.newFolder().getAbsolutePath());
+ configuration.set("ozone.om.address", "localhost:9862");
+ ozoneManagerProtocol = getMockOzoneManagerClient(new DBUpdatesWrapper());
}
@Test
- public void testInit() throws Exception {
+ public void testUpdateReconOmDBWithNewSnapshot() throws Exception {
- Assert.assertNotNull(reconOMMetadataManager.getKeyTable()
- .get("/sampleVol/bucketOne/key_one"));
- Assert.assertNull(reconOMMetadataManager.getKeyTable()
- .get("/sampleVol/bucketOne/key_two"));
+ OMMetadataManager omMetadataManager = initializeNewOmMetadataManager();
+ ReconOMMetadataManager reconOMMetadataManager =
+ getTestMetadataManager(omMetadataManager);
+ writeDataToOm(omMetadataManager, "key_one");
writeDataToOm(omMetadataManager, "key_two");
+
DBCheckpoint checkpoint = omMetadataManager.getStore()
.getCheckpoint(true);
File tarFile = OmUtils.createTarFile(checkpoint.getCheckpointLocation());
InputStream inputStream = new FileInputStream(tarFile);
- PowerMockito.stub(PowerMockito.method(ReconUtils.class,
- "makeHttpCall",
- CloseableHttpClient.class, String.class))
- .toReturn(inputStream);
+ ReconUtils reconUtilsMock = getMockReconUtils();
+ when(reconUtilsMock.makeHttpCall(any(), anyString()))
+ .thenReturn(inputStream);
- ozoneManagerServiceProvider.init();
+ ReconTaskController reconTaskController = getMockTaskController();
- Assert.assertNotNull(reconOMMetadataManager.getKeyTable()
+ OzoneManagerServiceProviderImpl ozoneManagerServiceProvider =
+ new OzoneManagerServiceProviderImpl(configuration,
+ reconOMMetadataManager, reconTaskController, reconUtilsMock,
+ ozoneManagerProtocol);
+
+ Assert.assertNull(reconOMMetadataManager.getKeyTable()
.get("/sampleVol/bucketOne/key_one"));
- Assert.assertNotNull(reconOMMetadataManager.getKeyTable()
+ Assert.assertNull(reconOMMetadataManager.getKeyTable()
.get("/sampleVol/bucketOne/key_two"));
- }
- @Test
- public void testGetOMMetadataManagerInstance() throws Exception {
- OMMetadataManager omMetaMgr = ozoneManagerServiceProvider
- .getOMMetadataManagerInstance();
- assertNotNull(omMetaMgr);
+ ozoneManagerServiceProvider.updateReconOmDBWithNewSnapshot();
+
+ assertNotNull(reconOMMetadataManager.getKeyTable()
+ .get("/sampleVol/bucketOne/key_one"));
+ assertNotNull(reconOMMetadataManager.getKeyTable()
+ .get("/sampleVol/bucketOne/key_two"));
}
@Test
@@ -144,12 +148,18 @@ public class TestOzoneManagerServiceProviderImpl extends
//Create test tar file.
File tarFile = OmUtils.createTarFile(checkpointDir.toPath());
-
InputStream fileInputStream = new FileInputStream(tarFile);
- PowerMockito.stub(PowerMockito.method(ReconUtils.class,
- "makeHttpCall",
- CloseableHttpClient.class, String.class))
- .toReturn(fileInputStream);
+ ReconUtils reconUtilsMock = getMockReconUtils();
+ when(reconUtilsMock.makeHttpCall(any(), anyString()))
+ .thenReturn(fileInputStream);
+
+ ReconOMMetadataManager reconOMMetadataManager =
+ mock(ReconOMMetadataManager.class);
+ ReconTaskController reconTaskController = getMockTaskController();
+ OzoneManagerServiceProviderImpl ozoneManagerServiceProvider =
+ new OzoneManagerServiceProviderImpl(configuration,
+ reconOMMetadataManager, reconTaskController, reconUtilsMock,
+ ozoneManagerProtocol);
DBCheckpoint checkpoint = ozoneManagerServiceProvider
.getOzoneManagerDBSnapshot();
@@ -158,4 +168,150 @@ public class TestOzoneManagerServiceProviderImpl extends
assertTrue(checkpoint.getCheckpointLocation().toFile()
.listFiles().length == 2);
}
+
+ @Test
+ public void testGetAndApplyDeltaUpdatesFromOM() throws Exception {
+
+ // Writing 2 Keys into a source OM DB and collecting it in a
+ // DBUpdatesWrapper.
+ OMMetadataManager sourceOMMetadataMgr = initializeNewOmMetadataManager();
+ writeDataToOm(sourceOMMetadataMgr, "key_one");
+ writeDataToOm(sourceOMMetadataMgr, "key_two");
+
+ RocksDB rocksDB = ((RDBStore)sourceOMMetadataMgr.getStore()).getDb();
+ TransactionLogIterator transactionLogIterator = rocksDB.getUpdatesSince(0L);
+ DBUpdatesWrapper dbUpdatesWrapper = new DBUpdatesWrapper();
+ while(transactionLogIterator.isValid()) {
+ TransactionLogIterator.BatchResult result =
+ transactionLogIterator.getBatch();
+ result.writeBatch().markWalTerminationPoint();
+ WriteBatch writeBatch = result.writeBatch();
+ dbUpdatesWrapper.addWriteBatch(writeBatch.data(),
+ result.sequenceNumber());
+ transactionLogIterator.next();
+ }
+
+ // OM Service Provider's Metadata Manager.
+ OMMetadataManager omMetadataManager = initializeNewOmMetadataManager();
+
+ OzoneManagerServiceProviderImpl ozoneManagerServiceProvider =
+ new OzoneManagerServiceProviderImpl(configuration,
+ getTestMetadataManager(omMetadataManager),
+ getMockTaskController(), new ReconUtils(),
+ getMockOzoneManagerClient(dbUpdatesWrapper));
+
+ OMDBUpdatesHandler updatesHandler =
+ new OMDBUpdatesHandler(omMetadataManager);
+ ozoneManagerServiceProvider.getAndApplyDeltaUpdatesFromOM(
+ 0L, updatesHandler);
+
+ // In this method, we have to assert the "GET" part and the "APPLY" path.
+
+ // Assert GET path --> verify if the OMDBUpdatesHandler picked up the 4
+ // events ( 1 Vol PUT + 1 Bucket PUT + 2 Key PUTs).
+ assertEquals(4, updatesHandler.getEvents().size());
+
+ // Assert APPLY path --> Verify if the OM service provider's RocksDB got
+ // the changes.
+ String fullKey = omMetadataManager.getOzoneKey("sampleVol",
+ "bucketOne", "key_one");
+ assertTrue(ozoneManagerServiceProvider.getOMMetadataManagerInstance()
+ .getKeyTable().isExist(fullKey));
+ fullKey = omMetadataManager.getOzoneKey("sampleVol",
+ "bucketOne", "key_two");
+ assertTrue(ozoneManagerServiceProvider.getOMMetadataManagerInstance()
+ .getKeyTable().isExist(fullKey));
+ }
+
+ @Test
+ public void testSyncDataFromOMFullSnapshot() throws Exception {
+
+ // Empty OM DB to start with.
+ ReconOMMetadataManager omMetadataManager = getTestMetadataManager(
+ initializeEmptyOmMetadataManager());
+ ReconTaskStatusDao reconTaskStatusDaoMock =
+ mock(ReconTaskStatusDao.class);
+ doNothing().when(reconTaskStatusDaoMock)
+ .update(any(ReconTaskStatus.class));
+
+ ReconTaskController reconTaskControllerMock = getMockTaskController();
+ when(reconTaskControllerMock.getReconTaskStatusDao())
+ .thenReturn(reconTaskStatusDaoMock);
+ doNothing().when(reconTaskControllerMock)
+ .reInitializeTasks(omMetadataManager);
+
+ OzoneManagerServiceProviderImpl ozoneManagerServiceProvider =
+ new OzoneManagerServiceProviderImpl(configuration, omMetadataManager,
+ reconTaskControllerMock, new ReconUtils(), ozoneManagerProtocol);
+
+ //Should trigger full snapshot request.
+ ozoneManagerServiceProvider.syncDataFromOM();
+
+ ArgumentCaptor<ReconTaskStatus> captor =
+ ArgumentCaptor.forClass(ReconTaskStatus.class);
+ verify(reconTaskStatusDaoMock, times(1))
+ .update(captor.capture());
+ assertTrue(captor.getValue().getTaskName().equals("OM_DB_FULL_SNAPSHOT"));
+ verify(reconTaskControllerMock, times(1))
+ .reInitializeTasks(omMetadataManager);
+ }
+
+ @Test
+ public void testSyncDataFromOMDeltaUpdates() throws Exception {
+
+ // Non-Empty OM DB to start with.
+ ReconOMMetadataManager omMetadataManager = getTestMetadataManager(
+ initializeNewOmMetadataManager());
+ ReconTaskStatusDao reconTaskStatusDaoMock =
+ mock(ReconTaskStatusDao.class);
+ doNothing().when(reconTaskStatusDaoMock)
+ .update(any(ReconTaskStatus.class));
+
+ ReconTaskController reconTaskControllerMock = getMockTaskController();
+ when(reconTaskControllerMock.getReconTaskStatusDao())
+ .thenReturn(reconTaskStatusDaoMock);
+ doNothing().when(reconTaskControllerMock)
+ .consumeOMEvents(any(OMUpdateEventBatch.class),
+ any(OMMetadataManager.class));
+
+ OzoneManagerServiceProviderImpl ozoneManagerServiceProvider =
+ new OzoneManagerServiceProviderImpl(configuration, omMetadataManager,
+ reconTaskControllerMock, new ReconUtils(), ozoneManagerProtocol);
+
+ // Should trigger delta updates.
+ ozoneManagerServiceProvider.syncDataFromOM();
+
+ ArgumentCaptor<ReconTaskStatus> captor =
+ ArgumentCaptor.forClass(ReconTaskStatus.class);
+ verify(reconTaskStatusDaoMock, times(1))
+ .update(captor.capture());
+ assertTrue(captor.getValue().getTaskName().equals("OM_DB_DELTA_UPDATES"));
+
+ verify(reconTaskControllerMock, times(1))
+ .consumeOMEvents(any(OMUpdateEventBatch.class),
+ any(OMMetadataManager.class));
+ }
+
+ private ReconTaskController getMockTaskController() {
+ ReconTaskController reconTaskControllerMock =
+ mock(ReconTaskController.class);
+ return reconTaskControllerMock;
+ }
+
+ private ReconUtils getMockReconUtils() throws IOException {
+ ReconUtils reconUtilsMock = mock(ReconUtils.class);
+ when(reconUtilsMock.getReconDbDir(any(), anyString())).thenCallRealMethod();
+ doCallRealMethod().when(reconUtilsMock).untarCheckpointFile(any(), any());
+ return reconUtilsMock;
+ }
+
+ private OzoneManagerProtocol getMockOzoneManagerClient(
+ DBUpdatesWrapper dbUpdatesWrapper) throws IOException {
+ OzoneManagerProtocol ozoneManagerProtocolMock =
+ mock(OzoneManagerProtocol.class);
+ when(ozoneManagerProtocolMock.getDBUpdates(any(OzoneManagerProtocolProtos
+ .DBUpdatesRequest.class))).thenReturn(dbUpdatesWrapper);
+ return ozoneManagerProtocolMock;
+ }
+
}
\ No newline at end of file
diff --git a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/DummyReconDBTask.java b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/DummyReconDBTask.java
index 3073907..66be41e 100644
--- a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/DummyReconDBTask.java
+++ b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/DummyReconDBTask.java
@@ -29,13 +29,14 @@ import org.apache.hadoop.ozone.om.OMMetadataManager;
* Dummy Recon task that has 3 modes of operations.
* ALWAYS_FAIL / FAIL_ONCE / ALWAYS_PASS
*/
-public class DummyReconDBTask extends ReconDBUpdateTask {
+public class DummyReconDBTask implements ReconDBUpdateTask {
private int numFailuresAllowed = Integer.MIN_VALUE;
private int callCtr = 0;
+ private String taskName;
- public DummyReconDBTask(String taskName, TaskType taskType) {
- super(taskName);
+ DummyReconDBTask(String taskName, TaskType taskType) {
+ this.taskName = taskName;
if (taskType.equals(TaskType.FAIL_ONCE)) {
numFailuresAllowed = 1;
} else if (taskType.equals(TaskType.ALWAYS_FAIL)) {
@@ -44,12 +45,17 @@ public class DummyReconDBTask extends ReconDBUpdateTask {
}
@Override
- protected Collection<String> getTaskTables() {
+ public String getTaskName() {
+ return taskName;
+ }
+
+ @Override
+ public Collection<String> getTaskTables() {
return Collections.singletonList("volumeTable");
}
@Override
- Pair<String, Boolean> process(OMUpdateEventBatch events) {
+ public Pair<String, Boolean> process(OMUpdateEventBatch events) {
if (++callCtr <= numFailuresAllowed) {
return new ImmutablePair<>(getTaskName(), false);
} else {
@@ -58,7 +64,7 @@ public class DummyReconDBTask extends ReconDBUpdateTask {
}
@Override
- Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
+ public Pair<String, Boolean> reprocess(OMMetadataManager omMetadataManager) {
if (++callCtr <= numFailuresAllowed) {
return new ImmutablePair<>(getTaskName(), false);
} else {
diff --git a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java
index 5cc9a48..383797e 100644
--- a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java
+++ b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerKeyMapperTask.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.ozone.recon.tasks;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
@@ -28,7 +30,6 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.om.OMMetadataManager;
@@ -37,31 +38,22 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.recon.AbstractOMMetadataManagerTest;
import org.apache.hadoop.ozone.recon.GuiceInjectorUtilsForTestsImpl;
-import org.apache.hadoop.ozone.recon.ReconUtils;
import org.apache.hadoop.ozone.recon.api.types.ContainerKeyPrefix;
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
import org.apache.hadoop.ozone.recon.spi.ContainerDBServiceProvider;
import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl;
+import org.apache.hadoop.utils.db.Table;
import org.hadoop.ozone.recon.schema.StatsSchemaDefinition;
import org.jooq.impl.DSL;
import org.jooq.impl.DefaultConfiguration;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
import com.google.inject.Injector;
import javax.sql.DataSource;
/**
* Unit test for Container Key mapper task.
*/
-@RunWith(PowerMockRunner.class)
-@PowerMockIgnore({"javax.management.*", "javax.net.ssl.*"})
-@PrepareForTest(ReconUtils.class)
public class TestContainerKeyMapperTask extends AbstractOMMetadataManagerTest {
private ContainerDBServiceProvider containerDbServiceProvider;
@@ -77,16 +69,9 @@ public class TestContainerKeyMapperTask extends AbstractOMMetadataManagerTest {
return injector;
}
- @Rule
- TemporaryFolder temporaryFolder = new TemporaryFolder();
-
private void initializeInjector() throws Exception {
omMetadataManager = initializeNewOmMetadataManager();
- OzoneConfiguration configuration =
- guiceInjectorTest.getTestOzoneConfiguration(temporaryFolder);
-
- ozoneManagerServiceProvider = new OzoneManagerServiceProviderImpl(
- configuration);
+ ozoneManagerServiceProvider = getMockOzoneManagerServiceProvider();
reconOMMetadataManager = getTestMetadataManager(omMetadataManager);
injector = guiceInjectorTest.getInjector(
@@ -151,10 +136,8 @@ public class TestContainerKeyMapperTask extends AbstractOMMetadataManagerTest {
Collections.singletonList(omKeyLocationInfoGroup));
ContainerKeyMapperTask containerKeyMapperTask =
- new ContainerKeyMapperTask(containerDbServiceProvider,
- ozoneManagerServiceProvider.getOMMetadataManagerInstance());
- containerKeyMapperTask.reprocess(ozoneManagerServiceProvider
- .getOMMetadataManagerInstance());
+ new ContainerKeyMapperTask(containerDbServiceProvider);
+ containerKeyMapperTask.reprocess(reconOMMetadataManager);
keyPrefixesForContainer =
containerDbServiceProvider.getKeyPrefixesForContainer(1);
@@ -258,10 +241,8 @@ public class TestContainerKeyMapperTask extends AbstractOMMetadataManagerTest {
}});
ContainerKeyMapperTask containerKeyMapperTask =
- new ContainerKeyMapperTask(containerDbServiceProvider,
- ozoneManagerServiceProvider.getOMMetadataManagerInstance());
- containerKeyMapperTask.reprocess(ozoneManagerServiceProvider
- .getOMMetadataManagerInstance());
+ new ContainerKeyMapperTask(containerDbServiceProvider);
+ containerKeyMapperTask.reprocess(reconOMMetadataManager);
keyPrefixesForContainer = containerDbServiceProvider
.getKeyPrefixesForContainer(1);
@@ -317,4 +298,17 @@ public class TestContainerKeyMapperTask extends AbstractOMMetadataManagerTest {
omKeyLocationInfoGroup))
.build();
}
+
+ private OzoneManagerServiceProviderImpl getMockOzoneManagerServiceProvider()
+ throws IOException {
+ OzoneManagerServiceProviderImpl omServiceProviderMock =
+ mock(OzoneManagerServiceProviderImpl.class);
+ OMMetadataManager omMetadataManagerMock = mock(OMMetadataManager.class);
+ Table tableMock = mock(Table.class);
+ when(tableMock.getName()).thenReturn("keyTable");
+ when(omMetadataManagerMock.getKeyTable()).thenReturn(tableMock);
+ when(omServiceProviderMock.getOMMetadataManagerInstance())
+ .thenReturn(omMetadataManagerMock);
+ return omServiceProviderMock;
+ }
}
\ No newline at end of file
diff --git a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java
index 47a5d6f..4d21e4b 100644
--- a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java
+++ b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestFileSizeCountTask.java
@@ -24,31 +24,21 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.utils.db.TypedTable;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
import java.io.IOException;
-import static org.apache.hadoop.ozone.recon.tasks.
- OMDBUpdateEvent.OMDBUpdateAction.PUT;
+import static org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMDBUpdateAction.PUT;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.times;
-import static org.powermock.api.mockito.PowerMockito.mock;
-import static org.powermock.api.mockito.PowerMockito.when;
+import static org.mockito.Mockito.when;
/**
* Unit test for File Size Count Task.
*/
-@RunWith(PowerMockRunner.class)
-@PowerMockIgnore({"javax.management.*", "javax.net.ssl.*"})
-@PrepareForTest(OmKeyInfo.class)
-
public class TestFileSizeCountTask {
@Test
public void testCalculateBinIndex() {
diff --git a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java
index b587c89..6760869 100644
--- a/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java
+++ b/hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.ozone.recon.tasks;
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DB_DIRS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -28,14 +27,13 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.io.File;
import java.util.Collections;
+import java.util.HashSet;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.recon.persistence.AbstractSqlDatabaseTest;
-import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
-import org.apache.hadoop.ozone.recon.recovery.ReconOmMetadataManagerImpl;
import org.hadoop.ozone.recon.schema.ReconInternalSchemaDefinition;
import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao;
import org.hadoop.ozone.recon.schema.tables.pojos.ReconTaskStatus;
@@ -50,16 +48,12 @@ import org.junit.Test;
public class TestReconTaskControllerImpl extends AbstractSqlDatabaseTest {
private ReconTaskController reconTaskController;
-
private Configuration sqlConfiguration;
+
@Before
public void setUp() throws Exception {
- File omDbDir = temporaryFolder.newFolder();
OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
- ozoneConfiguration.set(OZONE_OM_DB_DIRS, omDbDir.getAbsolutePath());
- ReconOMMetadataManager omMetadataManager = new ReconOmMetadataManagerImpl(
- ozoneConfiguration);
sqlConfiguration = getInjector()
.getInstance(Configuration.class);
@@ -69,7 +63,7 @@ public class TestReconTaskControllerImpl extends AbstractSqlDatabaseTest {
schemaDefinition.initializeSchema();
reconTaskController = new ReconTaskControllerImpl(ozoneConfiguration,
- omMetadataManager, sqlConfiguration);
+ sqlConfiguration, new HashSet<>());
}
@Test
@@ -86,15 +80,17 @@ public class TestReconTaskControllerImpl extends AbstractSqlDatabaseTest {
@Test
public void testConsumeOMEvents() throws Exception {
- ReconDBUpdateTask reconDBUpdateTaskMock = mock(ReconDBUpdateTask.class);
- when(reconDBUpdateTaskMock.getTaskTables()).thenReturn(Collections
- .EMPTY_LIST);
- when(reconDBUpdateTaskMock.getTaskName()).thenReturn("MockTask");
+ ReconDBUpdateTask reconDBUpdateTaskMock = getMockTask("MockTask");
when(reconDBUpdateTaskMock.process(any(OMUpdateEventBatch.class)))
.thenReturn(new ImmutablePair<>("MockTask", true));
reconTaskController.registerTask(reconDBUpdateTaskMock);
+ OMUpdateEventBatch omUpdateEventBatchMock = mock(OMUpdateEventBatch.class);
+ when(omUpdateEventBatchMock.isEmpty()).thenReturn(false);
+ when(omUpdateEventBatchMock.filter(Collections.singleton("MockTable")))
+ .thenReturn(omUpdateEventBatchMock);
reconTaskController.consumeOMEvents(
- new OMUpdateEventBatch(Collections.emptyList()));
+ omUpdateEventBatchMock,
+ mock(OMMetadataManager.class));
verify(reconDBUpdateTaskMock, times(1))
.process(any());
@@ -107,17 +103,13 @@ public class TestReconTaskControllerImpl extends AbstractSqlDatabaseTest {
new DummyReconDBTask(taskName, DummyReconDBTask.TaskType.FAIL_ONCE);
reconTaskController.registerTask(dummyReconDBTask);
-
- long currentTime = System.nanoTime();
- OMDBUpdateEvent.EventInfo eventInfoMock = mock(
- OMDBUpdateEvent.EventInfo.class);
- when(eventInfoMock.getSequenceNumber()).thenReturn(100L);
- when(eventInfoMock.getEventTimestampMillis()).thenReturn(currentTime);
-
+ long currentTime = System.currentTimeMillis();
OMUpdateEventBatch omUpdateEventBatchMock = mock(OMUpdateEventBatch.class);
- when(omUpdateEventBatchMock.getLastEventInfo()).thenReturn(eventInfoMock);
+ when(omUpdateEventBatchMock.isEmpty()).thenReturn(false);
+ when(omUpdateEventBatchMock.getLastSequenceNumber()).thenReturn(100L);
- reconTaskController.consumeOMEvents(omUpdateEventBatchMock);
+ reconTaskController.consumeOMEvents(omUpdateEventBatchMock,
+ mock(OMMetadataManager.class));
assertFalse(reconTaskController.getRegisteredTasks().isEmpty());
assertEquals(dummyReconDBTask, reconTaskController.getRegisteredTasks()
.get(dummyReconDBTask.getTaskName()));
@@ -126,8 +118,8 @@ public class TestReconTaskControllerImpl extends AbstractSqlDatabaseTest {
ReconTaskStatus dbRecord = dao.findById(taskName);
Assert.assertEquals(taskName, dbRecord.getTaskName());
- Assert.assertEquals(Long.valueOf(currentTime),
- dbRecord.getLastUpdatedTimestamp());
+ Assert.assertTrue(
+ dbRecord.getLastUpdatedTimestamp() > currentTime);
Assert.assertEquals(Long.valueOf(100L), dbRecord.getLastUpdatedSeqNumber());
}
@@ -138,18 +130,14 @@ public class TestReconTaskControllerImpl extends AbstractSqlDatabaseTest {
new DummyReconDBTask(taskName, DummyReconDBTask.TaskType.ALWAYS_FAIL);
reconTaskController.registerTask(dummyReconDBTask);
-
- long currentTime = System.nanoTime();
- OMDBUpdateEvent.EventInfo eventInfoMock =
- mock(OMDBUpdateEvent.EventInfo.class);
- when(eventInfoMock.getSequenceNumber()).thenReturn(100L);
- when(eventInfoMock.getEventTimestampMillis()).thenReturn(currentTime);
-
OMUpdateEventBatch omUpdateEventBatchMock = mock(OMUpdateEventBatch.class);
- when(omUpdateEventBatchMock.getLastEventInfo()).thenReturn(eventInfoMock);
+ when(omUpdateEventBatchMock.isEmpty()).thenReturn(false);
+ when(omUpdateEventBatchMock.getLastSequenceNumber()).thenReturn(100L);
+ OMMetadataManager omMetadataManagerMock = mock(OMMetadataManager.class);
for (int i = 0; i < 2; i++) {
- reconTaskController.consumeOMEvents(omUpdateEventBatchMock);
+ reconTaskController.consumeOMEvents(omUpdateEventBatchMock,
+ omMetadataManagerMock);
assertFalse(reconTaskController.getRegisteredTasks().isEmpty());
assertEquals(dummyReconDBTask, reconTaskController.getRegisteredTasks()
@@ -157,8 +145,8 @@ public class TestReconTaskControllerImpl extends AbstractSqlDatabaseTest {
}
//Should be blacklisted now.
- reconTaskController.consumeOMEvents(
- new OMUpdateEventBatch(Collections.emptyList()));
+ reconTaskController.consumeOMEvents(omUpdateEventBatchMock,
+ omMetadataManagerMock);
assertTrue(reconTaskController.getRegisteredTasks().isEmpty());
ReconTaskStatusDao dao = new ReconTaskStatusDao(sqlConfiguration);
@@ -168,4 +156,36 @@ public class TestReconTaskControllerImpl extends AbstractSqlDatabaseTest {
Assert.assertEquals(Long.valueOf(0L), dbRecord.getLastUpdatedTimestamp());
Assert.assertEquals(Long.valueOf(0L), dbRecord.getLastUpdatedSeqNumber());
}
+
+
+ @Test
+ public void testReInitializeTasks() throws Exception {
+
+ OMMetadataManager omMetadataManagerMock = mock(OMMetadataManager.class);
+ ReconDBUpdateTask reconDBUpdateTaskMock =
+ getMockTask("MockTask2");
+ when(reconDBUpdateTaskMock.reprocess(omMetadataManagerMock))
+ .thenReturn(new ImmutablePair<>("MockTask2", true));
+
+ reconTaskController.registerTask(reconDBUpdateTaskMock);
+ reconTaskController.reInitializeTasks(omMetadataManagerMock);
+
+ verify(reconDBUpdateTaskMock, times(1))
+ .reprocess(omMetadataManagerMock);
+ }
+
+ /**
+ * Helper method for getting a mocked Task.
+ * @param taskName name of the task.
+ * @return instance of ReconDBUpdateTask.
+ */
+ private ReconDBUpdateTask getMockTask(String taskName) {
+ ReconDBUpdateTask reconDBUpdateTaskMock = mock(ReconDBUpdateTask.class);
+ when(reconDBUpdateTaskMock.getTaskTables()).thenReturn(Collections
+ .EMPTY_LIST);
+ when(reconDBUpdateTaskMock.getTaskName()).thenReturn(taskName);
+ when(reconDBUpdateTaskMock.getTaskTables())
+ .thenReturn(Collections.singleton("MockTable"));
+ return reconDBUpdateTaskMock;
+ }
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org