You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2019/08/13 22:25:29 UTC

[hadoop] branch ozone-0.4.1 updated: HDDS-1488. Scm cli command to start/stop replication manager.

This is an automated email from the ASF dual-hosted git repository.

aengineer pushed a commit to branch ozone-0.4.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/ozone-0.4.1 by this push:
     new 0299f8c  HDDS-1488. Scm cli command to start/stop replication manager.
0299f8c is described below

commit 0299f8c7c5d06d8ea36fd960e96d0606092dc4a6
Author: Nanda kumar <na...@apache.org>
AuthorDate: Sat Aug 3 19:01:29 2019 +0530

    HDDS-1488. Scm cli command to start/stop replication manager.
    
    Signed-off-by: Anu Engineer <ae...@apache.org>
    (cherry picked from commit 69b74e90167041f561bfcccf5a4e46ea208c467e)
---
 .../hdds/scm/client/ContainerOperationClient.java  | 17 ++++++
 .../apache/hadoop/hdds/scm/client/ScmClient.java   | 19 +++++++
 .../protocol/StorageContainerLocationProtocol.java | 18 +++++++
 ...inerLocationProtocolClientSideTranslatorPB.java | 39 ++++++++++++++
 .../org/apache/hadoop/ozone/audit/SCMAction.java   |  5 +-
 ...inerLocationProtocolServerSideTranslatorPB.java | 46 +++++++++++++++++
 .../proto/StorageContainerLocationProtocol.proto   | 31 +++++++++++
 .../hdds/scm/container/ReplicationManager.java     | 36 ++++++++-----
 .../hdds/scm/server/SCMClientProtocolServer.java   | 21 ++++++++
 .../hdds/scm/container/TestReplicationManager.java | 16 ++++++
 .../hdds/scm/cli/ReplicationManagerCommands.java   | 54 +++++++++++++++++++
 .../scm/cli/ReplicationManagerStartSubcommand.java | 53 +++++++++++++++++++
 .../cli/ReplicationManagerStatusSubcommand.java    | 60 ++++++++++++++++++++++
 .../scm/cli/ReplicationManagerStopSubcommand.java  | 55 ++++++++++++++++++++
 .../org/apache/hadoop/hdds/scm/cli/SCMCLI.java     |  3 +-
 15 files changed, 457 insertions(+), 16 deletions(-)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
index 3077f9f..e2856d7 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
@@ -459,4 +459,21 @@ public class ContainerOperationClient implements ScmClient {
   public boolean forceExitSafeMode() throws IOException {
     return storageContainerLocationClient.forceExitSafeMode();
   }
+
+  @Override
+  public void startReplicationManager() throws IOException {
+    storageContainerLocationClient.startReplicationManager();
+  }
+
+  @Override
+  public void stopReplicationManager() throws IOException {
+    storageContainerLocationClient.stopReplicationManager();
+  }
+
+  @Override
+  public boolean getReplicationManagerStatus() throws IOException {
+    return storageContainerLocationClient.getReplicationManagerStatus();
+  }
+
+
 }
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
index 85821ac..c2dd5f9 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
@@ -203,4 +203,23 @@ public interface ScmClient extends Closeable {
    * @throws IOException
    */
   boolean forceExitSafeMode() throws IOException;
+
+  /**
+   * Start ReplicationManager.
+   */
+  void startReplicationManager() throws IOException;
+
+  /**
+   * Stop ReplicationManager.
+   */
+  void stopReplicationManager() throws IOException;
+
+  /**
+   * Returns ReplicationManager status.
+   *
+   * @return True if ReplicationManager is running, false otherwise.
+   */
+  boolean getReplicationManagerStatus() throws IOException;
+
+
 }
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
index cc220a5..565ce47 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
@@ -177,4 +177,22 @@ public interface StorageContainerLocationProtocol extends Closeable {
    * @throws IOException
    */
   boolean forceExitSafeMode() throws IOException;
+
+  /**
+   * Start ReplicationManager.
+   */
+  void startReplicationManager() throws IOException;
+
+  /**
+   * Stop ReplicationManager.
+   */
+  void stopReplicationManager() throws IOException;
+
+  /**
+   * Returns ReplicationManager status.
+   *
+   * @return True if ReplicationManager is running, false otherwise.
+   */
+  boolean getReplicationManagerStatus() throws IOException;
+
 }
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
index ffb4686..9e316f7 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@ -29,6 +29,10 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.InSafeModeRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.InSafeModeResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartReplicationManagerRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopReplicationManagerRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusResponseProto;
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
@@ -408,6 +412,41 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
   }
 
   @Override
+  public void startReplicationManager() throws IOException {
+    try {
+      StartReplicationManagerRequestProto request =
+          StartReplicationManagerRequestProto.getDefaultInstance();
+      rpcProxy.startReplicationManager(NULL_RPC_CONTROLLER, request);
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public void stopReplicationManager() throws IOException {
+    try {
+      StopReplicationManagerRequestProto request =
+          StopReplicationManagerRequestProto.getDefaultInstance();
+      rpcProxy.stopReplicationManager(NULL_RPC_CONTROLLER, request);
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public boolean getReplicationManagerStatus() throws IOException {
+    try {
+      ReplicationManagerStatusRequestProto request =
+          ReplicationManagerStatusRequestProto.getDefaultInstance();
+      ReplicationManagerStatusResponseProto response =
+          rpcProxy.getReplicationManagerStatus(NULL_RPC_CONTROLLER, request);
+      return response.getIsRunning();
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
   public Object getUnderlyingProxyObject() {
     return rpcProxy;
   }
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
index 2dc8df7..b3b4879 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
@@ -36,7 +36,10 @@ public enum SCMAction implements AuditAction {
   DELETE_CONTAINER,
   IN_SAFE_MODE,
   FORCE_EXIT_SAFE_MODE,
-  SORT_DATANODE;
+  SORT_DATANODE,
+  START_REPLICATION_MANAGER,
+  STOP_REPLICATION_MANAGER,
+  GET_REPLICATION_MANAGER_STATUS;
 
   @Override
   public String getAction() {
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
index ea96cfa..30ef7ea 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -33,6 +33,12 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerLocationProtocolProtos.ForceExitSafeModeResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartReplicationManagerRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartReplicationManagerResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopReplicationManagerRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopReplicationManagerResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusResponseProto;
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
@@ -306,4 +312,44 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
       throw new ServiceException(ex);
     }
   }
+
+  @Override
+  public StartReplicationManagerResponseProto startReplicationManager(
+      RpcController controller, StartReplicationManagerRequestProto request)
+      throws ServiceException {
+    try (Scope ignored = TracingUtil.importAndCreateScope(
+        "startReplicationManager", request.getTraceID())) {
+      impl.startReplicationManager();
+      return StartReplicationManagerResponseProto.newBuilder().build();
+    } catch (IOException ex) {
+      throw new ServiceException(ex);
+    }
+  }
+
+  @Override
+  public StopReplicationManagerResponseProto stopReplicationManager(
+      RpcController controller, StopReplicationManagerRequestProto request)
+      throws ServiceException {
+    try (Scope ignored = TracingUtil.importAndCreateScope(
+        "stopReplicationManager", request.getTraceID())) {
+      impl.stopReplicationManager();
+      return StopReplicationManagerResponseProto.newBuilder().build();
+    } catch (IOException ex) {
+      throw new ServiceException(ex);
+    }
+  }
+
+  @Override
+  public ReplicationManagerStatusResponseProto getReplicationManagerStatus(
+      RpcController controller, ReplicationManagerStatusRequestProto request)
+      throws ServiceException {
+    try (Scope ignored = TracingUtil.importAndCreateScope(
+        "getReplicationManagerStatus", request.getTraceID())) {
+      return ReplicationManagerStatusResponseProto.newBuilder()
+          .setIsRunning(impl.getReplicationManagerStatus()).build();
+    } catch (IOException ex) {
+      throw new ServiceException(ex);
+    }
+  }
+
 }
diff --git a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
index 7ad19b1..4e4b50b 100644
--- a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
+++ b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
@@ -192,6 +192,28 @@ message ForceExitSafeModeResponseProto {
   required bool exitedSafeMode = 1;
 }
 
+message StartReplicationManagerRequestProto {
+  optional string traceID = 1;
+}
+
+message StartReplicationManagerResponseProto {
+}
+
+message StopReplicationManagerRequestProto {
+  optional string traceID = 1;
+}
+
+message StopReplicationManagerResponseProto {
+}
+
+message ReplicationManagerStatusRequestProto {
+  optional string traceID = 1;
+}
+
+message ReplicationManagerStatusResponseProto {
+  required bool isRunning = 1;
+}
+
 /**
  * Protocol used from an HDFS node to StorageContainerManager.  See the request
  * and response messages for details of the RPC calls.
@@ -275,4 +297,13 @@ service StorageContainerLocationProtocolService {
   */
   rpc forceExitSafeMode(ForceExitSafeModeRequestProto)
   returns (ForceExitSafeModeResponseProto);
+
+  rpc startReplicationManager(StartReplicationManagerRequestProto)
+  returns (StartReplicationManagerResponseProto);
+
+  rpc stopReplicationManager(StopReplicationManagerRequestProto)
+  returns (StopReplicationManagerResponseProto);
+
+  rpc getReplicationManagerStatus(ReplicationManagerStatusRequestProto)
+  returns (ReplicationManagerStatusResponseProto);
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
index a8dff40..cb34f8d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
@@ -105,15 +105,15 @@ public class ReplicationManager {
   private final Map<ContainerID, List<InflightAction>> inflightDeletion;
 
   /**
-   * ReplicationMonitor thread is the one which wakes up at configured
-   * interval and processes all the containers.
+   * ReplicationManager specific configuration.
    */
-  private final Thread replicationMonitor;
+  private final ReplicationManagerConfiguration conf;
 
   /**
-   * ReplicationManager specific configuration.
+   * ReplicationMonitor thread is the one which wakes up at configured
+   * interval and processes all the containers.
    */
-  private final ReplicationManagerConfiguration conf;
+  private Thread replicationMonitor;
 
   /**
    * Flag used for checking if the ReplicationMonitor thread is running or
@@ -132,28 +132,28 @@ public class ReplicationManager {
   public ReplicationManager(final ReplicationManagerConfiguration conf,
                             final ContainerManager containerManager,
                             final ContainerPlacementPolicy containerPlacement,
-      final EventPublisher eventPublisher,
-      final LockManager lockManager) {
+                            final EventPublisher eventPublisher,
+                            final LockManager<ContainerID> lockManager) {
     this.containerManager = containerManager;
     this.containerPlacement = containerPlacement;
     this.eventPublisher = eventPublisher;
     this.lockManager = lockManager;
-    this.inflightReplication = new HashMap<>();
-    this.inflightDeletion = new HashMap<>();
-    this.replicationMonitor = new Thread(this::run);
-    this.replicationMonitor.setName("ReplicationMonitor");
-    this.replicationMonitor.setDaemon(true);
     this.conf = conf;
     this.running = false;
+    this.inflightReplication = new HashMap<>();
+    this.inflightDeletion = new HashMap<>();
   }
 
   /**
    * Starts Replication Monitor thread.
    */
   public synchronized void start() {
-    if (!running) {
+    if (!isRunning()) {
       LOG.info("Starting Replication Monitor Thread.");
       running = true;
+      replicationMonitor = new Thread(this::run);
+      replicationMonitor.setName("ReplicationMonitor");
+      replicationMonitor.setDaemon(true);
       replicationMonitor.start();
     } else {
       LOG.info("Replication Monitor Thread is already running.");
@@ -166,7 +166,13 @@ public class ReplicationManager {
    * @return true if running, false otherwise
    */
   public boolean isRunning() {
-    return replicationMonitor.isAlive();
+    if (!running) {
+      synchronized (this) {
+        return replicationMonitor != null
+            && replicationMonitor.isAlive();
+      }
+    }
+    return true;
   }
 
   /**
@@ -185,6 +191,8 @@ public class ReplicationManager {
   public synchronized void stop() {
     if (running) {
       LOG.info("Stopping Replication Monitor Thread.");
+      inflightReplication.clear();
+      inflightDeletion.clear();
       running = false;
       notify();
     } else {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index 06be8eb..bf75fef 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -469,6 +469,27 @@ public class SCMClientProtocolServer implements
     return scm.exitSafeMode();
   }
 
+  @Override
+  public void startReplicationManager() {
+    AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
+        SCMAction.START_REPLICATION_MANAGER, null));
+    scm.getReplicationManager().start();
+  }
+
+  @Override
+  public void stopReplicationManager() {
+    AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
+        SCMAction.STOP_REPLICATION_MANAGER, null));
+    scm.getReplicationManager().stop();
+  }
+
+  @Override
+  public boolean getReplicationManagerStatus() {
+    AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
+        SCMAction.GET_REPLICATION_MANAGER_STATUS, null));
+    return scm.getReplicationManager().isRunning();
+  }
+
   /**
    * Queries a list of Node that match a set of statuses.
    *
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
index 6a0e163..1631447 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
@@ -116,6 +116,22 @@ public class TestReplicationManager {
     Thread.sleep(100L);
   }
 
+
+  /**
+   * Checks if restarting of replication manager works.
+   */
+  @Test
+  public void testReplicationManagerRestart() throws InterruptedException {
+    Assert.assertTrue(replicationManager.isRunning());
+    replicationManager.stop();
+    // Stop is a non-blocking call, it might take sometime for the
+    // ReplicationManager to shutdown
+    Thread.sleep(500);
+    Assert.assertFalse(replicationManager.isRunning());
+    replicationManager.start();
+    Assert.assertTrue(replicationManager.isRunning());
+  }
+
   /**
    * Open containers are not handled by ReplicationManager.
    * This test-case makes sure that ReplicationManages doesn't take
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ReplicationManagerCommands.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ReplicationManagerCommands.java
new file mode 100644
index 0000000..f42a8f8
--- /dev/null
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ReplicationManagerCommands.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.cli;
+
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.cli.MissingSubcommandException;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.ParentCommand;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Subcommand to group replication manager related operations.
+ */
+@Command(
+    name = "replicationmanager",
+    description = "ReplicationManager specific operations",
+    mixinStandardHelpOptions = true,
+    versionProvider = HddsVersionProvider.class,
+    subcommands = {
+        ReplicationManagerStartSubcommand.class,
+        ReplicationManagerStopSubcommand.class,
+        ReplicationManagerStatusSubcommand.class
+    })
+public class ReplicationManagerCommands implements Callable<Void> {
+
+  @ParentCommand
+  private SCMCLI parent;
+
+  public SCMCLI getParent() {
+    return parent;
+  }
+
+  @Override
+  public Void call() throws Exception {
+    throw new MissingSubcommandException(
+        this.parent.getCmd().getSubcommands().get("replicationmanager"));
+  }
+}
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ReplicationManagerStartSubcommand.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ReplicationManagerStartSubcommand.java
new file mode 100644
index 0000000..1adec6b
--- /dev/null
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ReplicationManagerStartSubcommand.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.cli;
+
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.ParentCommand;
+
+import java.util.concurrent.Callable;
+
+/**
+ * This is the handler that process safe mode check command.
+ */
+@Command(
+    name = "start",
+    description = "Start ReplicationManager",
+    mixinStandardHelpOptions = true,
+    versionProvider = HddsVersionProvider.class)
+public class ReplicationManagerStartSubcommand implements Callable<Void> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ReplicationManagerStartSubcommand.class);
+
+  @ParentCommand
+  private ReplicationManagerCommands parent;
+
+  @Override
+  public Void call() throws Exception {
+    try (ScmClient scmClient = parent.getParent().createScmClient()) {
+      scmClient.startReplicationManager();
+      LOG.info("Starting ReplicationManager...");
+      return null;
+    }
+  }
+}
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ReplicationManagerStatusSubcommand.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ReplicationManagerStatusSubcommand.java
new file mode 100644
index 0000000..2ebf28c
--- /dev/null
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ReplicationManagerStatusSubcommand.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.cli;
+
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.ParentCommand;
+
+import java.util.concurrent.Callable;
+
+/**
+ * This is the handler that process safe mode check command.
+ */
+@Command(
+    name = "status",
+    description = "Check if ReplicationManager is running or not",
+    mixinStandardHelpOptions = true,
+    versionProvider = HddsVersionProvider.class)
+public class ReplicationManagerStatusSubcommand implements Callable<Void> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ReplicationManagerStatusSubcommand.class);
+
+  @ParentCommand
+  private ReplicationManagerCommands parent;
+
+  @Override
+  public Void call() throws Exception {
+    try (ScmClient scmClient = parent.getParent().createScmClient()) {
+
+      boolean execReturn = scmClient.getReplicationManagerStatus();
+
+      // Output data list
+      if(execReturn){
+        LOG.info("ReplicationManager is Running.");
+      } else {
+        LOG.info("ReplicationManager is Not Running.");
+      }
+      return null;
+    }
+  }
+}
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ReplicationManagerStopSubcommand.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ReplicationManagerStopSubcommand.java
new file mode 100644
index 0000000..7cafd01
--- /dev/null
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ReplicationManagerStopSubcommand.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.cli;
+
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.ParentCommand;
+
+import java.util.concurrent.Callable;
+
+/**
+ * This is the handler that process safe mode check command.
+ */
+@Command(
+    name = "stop",
+    description = "Stop ReplicationManager",
+    mixinStandardHelpOptions = true,
+    versionProvider = HddsVersionProvider.class)
+public class ReplicationManagerStopSubcommand implements Callable<Void> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ReplicationManagerStopSubcommand.class);
+
+  @ParentCommand
+  private ReplicationManagerCommands parent;
+
+  @Override
+  public Void call() throws Exception {
+    try (ScmClient scmClient = parent.getParent().createScmClient()) {
+      scmClient.stopReplicationManager();
+      LOG.info("Stopping ReplicationManager...");
+      LOG.info("Requested SCM to stop ReplicationManager, " +
+          "it might take sometime for the ReplicationManager to stop.");
+      return null;
+    }
+  }
+}
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/SCMCLI.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/SCMCLI.java
index 1a19a3c..3e8f3fa 100644
--- a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/SCMCLI.java
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/SCMCLI.java
@@ -85,7 +85,8 @@ import picocli.CommandLine.Option;
         CloseSubcommand.class,
         ListPipelinesSubcommand.class,
         ClosePipelineSubcommand.class,
-        TopologySubcommand.class
+        TopologySubcommand.class,
+        ReplicationManagerCommands.class
     },
     mixinStandardHelpOptions = true)
 public class SCMCLI extends GenericCli {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org