You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by xy...@apache.org on 2020/08/06 01:10:25 UTC

[hadoop-ozone] branch HDDS-2823 updated: HDDS-3837 Add isLeader check in SCMHAManager. (#1191)

This is an automated email from the ASF dual-hosted git repository.

xyao pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/HDDS-2823 by this push:
     new 2e3e064  HDDS-3837 Add isLeader check in SCMHAManager. (#1191)
2e3e064 is described below

commit 2e3e0649a59d51a35cd4cf15a89dd41d845644cd
Author: Li Cheng <bl...@gmail.com>
AuthorDate: Thu Aug 6 09:10:13 2020 +0800

    HDDS-3837 Add isLeader check in SCMHAManager. (#1191)
---
 .../interface-server/src/main/proto/proto.lock     | 100 +++++++++++++
 .../hadoop/hdds/scm/block/BlockManagerImpl.java    |   5 +-
 .../scm/container/CloseContainerEventHandler.java  |   4 +-
 .../apache/hadoop/hdds/scm/ha/SCMHAManager.java    |  13 ++
 .../hadoop/hdds/scm/ha/SCMHAManagerImpl.java       |  85 ++++++++++-
 .../org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java  |   6 +-
 .../apache/hadoop/hdds/scm/ha/SCMRatisServer.java  |  10 ++
 .../hadoop/hdds/scm/ha/SCMRatisServerImpl.java     |  15 ++
 .../hadoop/hdds/scm/node/NewNodeHandler.java       |  12 +-
 .../scm/node/NonHealthyToHealthyNodeHandler.java   |  12 +-
 .../scm/pipeline/BackgroundPipelineCreator.java    |   2 +-
 .../hadoop/hdds/scm/pipeline/PipelineManager.java  |   5 +-
 .../hdds/scm/pipeline/PipelineManagerMXBean.java   |   3 +-
 .../hdds/scm/pipeline/PipelineManagerV2Impl.java   |  65 +++++++--
 .../hadoop/hdds/scm/ha/MockSCMHAManager.java       |  53 ++++++-
 .../scm/pipeline/TestPipelineActionHandler.java    |   3 +-
 .../hdds/scm/pipeline/TestPipelineManagerImpl.java | 156 ++++++++++++++++++---
 .../hdds/scm/safemode/TestSCMSafeModeManager.java  |   2 +-
 18 files changed, 501 insertions(+), 50 deletions(-)

diff --git a/hadoop-hdds/interface-server/src/main/proto/proto.lock b/hadoop-hdds/interface-server/src/main/proto/proto.lock
index 5492e00..c0296f7 100644
--- a/hadoop-hdds/interface-server/src/main/proto/proto.lock
+++ b/hadoop-hdds/interface-server/src/main/proto/proto.lock
@@ -1,6 +1,106 @@
 {
   "definitions": [
     {
+      "protopath": "SCMRatisProtocol.proto",
+      "def": {
+        "enums": [
+          {
+            "name": "RequestType",
+            "enum_fields": [
+              {
+                "name": "PIPELINE",
+                "integer": 1
+              },
+              {
+                "name": "CONTAINER",
+                "integer": 2
+              }
+            ]
+          }
+        ],
+        "messages": [
+          {
+            "name": "Method",
+            "fields": [
+              {
+                "id": 1,
+                "name": "name",
+                "type": "string"
+              },
+              {
+                "id": 2,
+                "name": "args",
+                "type": "MethodArgument",
+                "is_repeated": true
+              }
+            ]
+          },
+          {
+            "name": "MethodArgument",
+            "fields": [
+              {
+                "id": 1,
+                "name": "type",
+                "type": "string"
+              },
+              {
+                "id": 2,
+                "name": "value",
+                "type": "bytes"
+              }
+            ]
+          },
+          {
+            "name": "SCMRatisRequestProto",
+            "fields": [
+              {
+                "id": 1,
+                "name": "type",
+                "type": "RequestType"
+              },
+              {
+                "id": 2,
+                "name": "method",
+                "type": "Method"
+              }
+            ]
+          },
+          {
+            "name": "SCMRatisResponseProto",
+            "fields": [
+              {
+                "id": 2,
+                "name": "type",
+                "type": "string"
+              },
+              {
+                "id": 3,
+                "name": "value",
+                "type": "bytes"
+              }
+            ]
+          }
+        ],
+        "package": {
+          "name": "hadoop.hdds.security"
+        },
+        "options": [
+          {
+            "name": "java_package",
+            "value": "org.apache.hadoop.hdds.protocol.proto"
+          },
+          {
+            "name": "java_outer_classname",
+            "value": "SCMRatisProtocol"
+          },
+          {
+            "name": "java_generate_equals_and_hash",
+            "value": "true"
+          }
+        ]
+      }
+    },
+    {
       "protopath": "ScmServerDatanodeHeartbeatProtocol.proto",
       "def": {
         "enums": [
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index 7387585..f29d0a2 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -55,6 +55,8 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVI
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
+
+import org.apache.ratis.protocol.NotLeaderException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -250,7 +252,8 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
    * @param containerInfo - Container Info.
    * @return AllocatedBlock
    */
-  private AllocatedBlock newBlock(ContainerInfo containerInfo) {
+  private AllocatedBlock newBlock(ContainerInfo containerInfo)
+      throws NotLeaderException {
     try {
       final Pipeline pipeline = pipelineManager
           .getPipeline(containerInfo.getPipelineID());
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
index fd73711..a2b79fb 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.ratis.protocol.NotLeaderException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -98,7 +99,7 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> {
    * @throws ContainerNotFoundException
    */
   private List<DatanodeDetails> getNodes(final ContainerInfo container)
-      throws ContainerNotFoundException {
+      throws ContainerNotFoundException, NotLeaderException {
     try {
       return pipelineManager.getPipeline(container.getPipelineID()).getNodes();
     } catch (PipelineNotFoundException ex) {
@@ -109,5 +110,4 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> {
           .collect(Collectors.toList());
     }
   }
-
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
index eb6c800..ade0ad9 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
@@ -17,6 +17,9 @@
 
 package org.apache.hadoop.hdds.scm.ha;
 
+import org.apache.ratis.protocol.NotLeaderException;
+import org.apache.ratis.protocol.RaftPeer;
+
 import java.io.IOException;
 
 /**
@@ -40,7 +43,17 @@ public interface SCMHAManager {
   SCMRatisServer getRatisServer();
 
   /**
+   * Returns suggested leader from RaftServer.
+   */
+  RaftPeer getSuggestedLeader();
+
+  /**
    * Stops the HA service.
    */
   void shutdown() throws IOException;
+
+  /**
+   * Returns NotLeaderException with useful info.
+   */
+  NotLeaderException triggerNotLeaderException();
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
index 89ac714..8bb9457 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
@@ -17,7 +17,17 @@
 
 package org.apache.hadoop.hdds.scm.ha;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.ratis.protocol.NotLeaderException;
+import org.apache.ratis.protocol.RaftGroupMemberId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerProxy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
@@ -31,14 +41,17 @@ import java.io.IOException;
  */
 public class SCMHAManagerImpl implements SCMHAManager {
 
-  private static boolean isLeader = true;
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SCMHAManagerImpl.class);
 
   private final SCMRatisServerImpl ratisServer;
+  private final ConfigurationSource conf;
 
   /**
    * Creates SCMHAManager instance.
    */
   public SCMHAManagerImpl(final ConfigurationSource conf) throws IOException {
+    this.conf = conf;
     this.ratisServer = new SCMRatisServerImpl(
         conf.getObject(SCMHAConfiguration.class), conf);
   }
@@ -56,7 +69,28 @@ public class SCMHAManagerImpl implements SCMHAManager {
    */
   @Override
   public boolean isLeader() {
-    return isLeader;
+    if (!SCMHAUtils.isSCMHAEnabled(conf)) {
+      // When SCM HA is not enabled, the current SCM is always the leader.
+      return true;
+    }
+    RaftServer server = ratisServer.getServer();
+    Preconditions.checkState(server instanceof RaftServerProxy);
+    RaftServerImpl serverImpl = null;
+    try {
+      // SCM only has one raft group.
+      serverImpl = ((RaftServerProxy) server)
+          .getImpl(ratisServer.getRaftGroupId());
+      if (serverImpl != null) {
+        // Only when it's sure the current SCM is the leader, otherwise
+        // it should all return false.
+        return serverImpl.isLeader();
+      }
+    } catch (IOException ioe) {
+      LOG.error("Fail to get RaftServer impl and therefore it's not clear " +
+          "whether it's leader. ", ioe);
+    }
+
+    return false;
   }
 
   /**
@@ -67,6 +101,42 @@ public class SCMHAManagerImpl implements SCMHAManager {
     return ratisServer;
   }
 
+  private RaftPeerId getPeerIdFromRoleInfo(RaftServerImpl serverImpl) {
+    if (serverImpl.isLeader()) {
+      return RaftPeerId.getRaftPeerId(
+          serverImpl.getRoleInfoProto().getLeaderInfo().toString());
+    } else if (serverImpl.isFollower()) {
+      return RaftPeerId.valueOf(
+          serverImpl.getRoleInfoProto().getFollowerInfo()
+              .getLeaderInfo().getId().getId());
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public RaftPeer getSuggestedLeader() {
+    RaftServer server = ratisServer.getServer();
+    Preconditions.checkState(server instanceof RaftServerProxy);
+    RaftServerImpl serverImpl = null;
+    try {
+      // SCM only has one raft group.
+      serverImpl = ((RaftServerProxy) server)
+          .getImpl(ratisServer.getRaftGroupId());
+      if (serverImpl != null) {
+        RaftPeerId peerId =  getPeerIdFromRoleInfo(serverImpl);
+        if (peerId != null) {
+          return new RaftPeer(peerId);
+        }
+        return null;
+      }
+    } catch (IOException ioe) {
+      LOG.error("Fail to get RaftServer impl and therefore it's not clear " +
+          "whether it's leader. ", ioe);
+    }
+    return null;
+  }
+
   /**
    * {@inheritDoc}
    */
@@ -75,4 +145,15 @@ public class SCMHAManagerImpl implements SCMHAManager {
     ratisServer.stop();
   }
 
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public NotLeaderException triggerNotLeaderException() {
+    return new NotLeaderException(RaftGroupMemberId.valueOf(
+        ratisServer.getServer().getId(),
+        ratisServer.getRaftGroupId()),
+        getSuggestedLeader(),
+        ratisServer.getRaftPeers());
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java
index eb22566..0f71744 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java
@@ -20,7 +20,7 @@ package org.apache.hadoop.hdds.scm.ha;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.HddsConfigKeys;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.ScmUtils;
 import org.apache.hadoop.hdds.scm.server.ratis.SCMRatisServer;
@@ -37,12 +37,12 @@ public final class SCMHAUtils {
   }
 
   // Check if SCM HA is enabled.
-  public static boolean isSCMHAEnabled(OzoneConfiguration conf) {
+  public static boolean isSCMHAEnabled(ConfigurationSource conf) {
     return conf.getBoolean(ScmConfigKeys.OZONE_SCM_HA_ENABLE_KEY,
         ScmConfigKeys.OZONE_SCM_HA_ENABLE_DEFAULT);
   }
 
-  public static File createSCMRatisDir(OzoneConfiguration conf)
+  public static File createSCMRatisDir(ConfigurationSource conf)
       throws  IllegalArgumentException {
     String scmRatisDir = SCMRatisServer.getSCMRatisDirectory(conf);
     if (scmRatisDir == null || scmRatisDir.isEmpty()) {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
index 4ddbc7b..2f99776 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
@@ -18,8 +18,12 @@
 package org.apache.hadoop.hdds.scm.ha;
 
 import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.RaftServer;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.concurrent.ExecutionException;
 
 /**
@@ -35,4 +39,10 @@ public interface SCMRatisServer {
       throws IOException, ExecutionException, InterruptedException;
 
   void stop() throws IOException;
+
+  RaftServer getServer();
+
+  RaftGroupId getRaftGroupId();
+
+  List<RaftPeer> getRaftPeers();
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
index 45ae212..33ae109 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
@@ -110,4 +111,18 @@ public class SCMRatisServerImpl implements SCMRatisServer {
     server.close();
   }
 
+  @Override
+  public RaftServer getServer() {
+    return server;
+  }
+
+  @Override
+  public RaftGroupId getRaftGroupId() {
+    return raftGroupId;
+  }
+
+  @Override
+  public List<RaftPeer> getRaftPeers() {
+    return Collections.singletonList(new RaftPeer(raftPeerId));
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java
index a40a63a..42cada9 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java
@@ -23,11 +23,16 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.ratis.protocol.NotLeaderException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Handles New Node event.
  */
 public class NewNodeHandler implements EventHandler<DatanodeDetails> {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(NewNodeHandler.class);
 
   private final PipelineManager pipelineManager;
   private final ConfigurationSource conf;
@@ -41,6 +46,11 @@ public class NewNodeHandler implements EventHandler<DatanodeDetails> {
   @Override
   public void onMessage(DatanodeDetails datanodeDetails,
       EventPublisher publisher) {
-    pipelineManager.triggerPipelineCreation();
+    try {
+      pipelineManager.triggerPipelineCreation();
+    } catch (NotLeaderException ex) {
+      LOG.debug("Not the current leader SCM and cannot start pipeline" +
+          " creation.");
+    }
   }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToHealthyNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToHealthyNodeHandler.java
index cc32f84..e73231b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToHealthyNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToHealthyNodeHandler.java
@@ -23,12 +23,17 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.ratis.protocol.NotLeaderException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Handles Stale node event.
  */
 public class NonHealthyToHealthyNodeHandler
     implements EventHandler<DatanodeDetails> {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(NonHealthyToHealthyNodeHandler.class);
 
   private final PipelineManager pipelineManager;
   private final ConfigurationSource conf;
@@ -42,6 +47,11 @@ public class NonHealthyToHealthyNodeHandler
   @Override
   public void onMessage(DatanodeDetails datanodeDetails,
       EventPublisher publisher) {
-    pipelineManager.triggerPipelineCreation();
+    try {
+      pipelineManager.triggerPipelineCreation();
+    } catch (NotLeaderException ex) {
+      LOG.debug("Not the current leader SCM and cannot start pipeline" +
+          " creation.");
+    }
   }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
index f7f1d52..cc33c5c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
@@ -100,7 +100,7 @@ class BackgroundPipelineCreator {
     }
   }
 
-  private void createPipelines() {
+  private void createPipelines() throws RuntimeException {
     // TODO: #CLUTIL Different replication factor may need to be supported
     HddsProtos.ReplicationType type = HddsProtos.ReplicationType.valueOf(
         conf.get(OzoneConfigKeys.OZONE_REPLICATION_TYPE,
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index 02e195f..55500cd 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
 import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.ratis.protocol.NotLeaderException;
 
 /**
  * Interface which exposes the api for pipeline management.
@@ -55,7 +56,7 @@ public interface PipelineManager extends Closeable, PipelineManagerMXBean,
       ReplicationFactor factor);
 
   List<Pipeline> getPipelines(ReplicationType type,
-      Pipeline.PipelineState state);
+      Pipeline.PipelineState state) throws NotLeaderException;
 
   List<Pipeline> getPipelines(ReplicationType type,
       ReplicationFactor factor, Pipeline.PipelineState state);
@@ -84,7 +85,7 @@ public interface PipelineManager extends Closeable, PipelineManagerMXBean,
 
   void startPipelineCreator();
 
-  void triggerPipelineCreation();
+  void triggerPipelineCreation() throws NotLeaderException;
 
   void incNumBlocksAllocatedMetric(PipelineID id);
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerMXBean.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerMXBean.java
index 6d7d717..55e096b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerMXBean.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerMXBean.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdds.scm.pipeline;
 
 import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.ratis.protocol.NotLeaderException;
 
 import java.util.Map;
 
@@ -33,6 +34,6 @@ public interface PipelineManagerMXBean {
    * Returns the number of pipelines in different state.
    * @return state to number of pipeline map
    */
-  Map<String, Integer> getPipelineInfo();
+  Map<String, Integer> getPipelineInfo() throws NotLeaderException;
 
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
index 1241745..069540c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdds.utils.Scheduler;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.util.Time;
+import org.apache.ratis.protocol.NotLeaderException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,19 +76,21 @@ public final class PipelineManagerV2Impl implements PipelineManager {
   private final SCMPipelineMetrics metrics;
   private long pipelineWaitDefaultTimeout;
   private final AtomicBoolean isInSafeMode;
+  private SCMHAManager scmhaManager;
   // Used to track if the safemode pre-checks have completed. This is designed
   // to prevent pipelines being created until sufficient nodes have registered.
   private final AtomicBoolean pipelineCreationAllowed;
 
   private PipelineManagerV2Impl(ConfigurationSource conf,
-                               NodeManager nodeManager,
-                               StateManager pipelineStateManager,
-                               PipelineFactory pipelineFactory,
+                                SCMHAManager scmhaManager,
+                                StateManager pipelineStateManager,
+                                PipelineFactory pipelineFactory,
                                 EventPublisher eventPublisher) {
     this.lock = new ReentrantReadWriteLock();
     this.pipelineFactory = pipelineFactory;
     this.stateManager = pipelineStateManager;
     this.conf = conf;
+    this.scmhaManager = scmhaManager;
     this.eventPublisher = eventPublisher;
     this.pmInfoBean = MBeans.register("SCMPipelineManager",
         "SCMPipelineManagerInfo", this);
@@ -120,7 +123,7 @@ public final class PipelineManagerV2Impl implements PipelineManager {
         nodeManager, stateManager, conf, eventPublisher);
     // Create PipelineManager
     PipelineManagerV2Impl pipelineManager = new PipelineManagerV2Impl(conf,
-        nodeManager, stateManager, pipelineFactory, eventPublisher);
+        scmhaManager, stateManager, pipelineFactory, eventPublisher);
 
     // Create background thread.
     Scheduler scheduler = new Scheduler(
@@ -136,6 +139,7 @@ public final class PipelineManagerV2Impl implements PipelineManager {
   @Override
   public Pipeline createPipeline(ReplicationType type,
                                  ReplicationFactor factor) throws IOException {
+    checkLeader();
     if (!isPipelineCreationAllowed() && factor != ReplicationFactor.ONE) {
       LOG.debug("Pipeline creation is not allowed until safe mode prechecks " +
           "complete");
@@ -266,6 +270,7 @@ public final class PipelineManagerV2Impl implements PipelineManager {
   @Override
   public void addContainerToPipeline(
       PipelineID pipelineID, ContainerID containerID) throws IOException {
+    checkLeader();
     lock.writeLock().lock();
     try {
       stateManager.addContainerToPipeline(pipelineID, containerID);
@@ -277,6 +282,7 @@ public final class PipelineManagerV2Impl implements PipelineManager {
   @Override
   public void removeContainerFromPipeline(
       PipelineID pipelineID, ContainerID containerID) throws IOException {
+    checkLeader();
     lock.writeLock().lock();
     try {
       stateManager.removeContainerFromPipeline(pipelineID, containerID);
@@ -288,6 +294,7 @@ public final class PipelineManagerV2Impl implements PipelineManager {
   @Override
   public NavigableSet<ContainerID> getContainersInPipeline(
       PipelineID pipelineID) throws IOException {
+    checkLeader();
     lock.readLock().lock();
     try {
       return stateManager.getContainers(pipelineID);
@@ -298,11 +305,13 @@ public final class PipelineManagerV2Impl implements PipelineManager {
 
   @Override
   public int getNumberOfContainers(PipelineID pipelineID) throws IOException {
+    checkLeader();
     return stateManager.getNumberOfContainers(pipelineID);
   }
 
   @Override
   public void openPipeline(PipelineID pipelineId) throws IOException {
+    checkLeader();
     lock.writeLock().lock();
     try {
       Pipeline pipeline = stateManager.getPipeline(pipelineId);
@@ -328,6 +337,7 @@ public final class PipelineManagerV2Impl implements PipelineManager {
    * @throws IOException
    */
   protected void removePipeline(Pipeline pipeline) throws IOException {
+    checkLeader();
     pipelineFactory.close(pipeline.getType(), pipeline);
     PipelineID pipelineID = pipeline.getId();
     lock.writeLock().lock();
@@ -349,6 +359,7 @@ public final class PipelineManagerV2Impl implements PipelineManager {
    */
   protected void closeContainersForPipeline(final PipelineID pipelineId)
       throws IOException {
+    checkLeader();
     Set<ContainerID> containerIDs = stateManager.getContainers(pipelineId);
     for (ContainerID containerID : containerIDs) {
       eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID);
@@ -364,6 +375,7 @@ public final class PipelineManagerV2Impl implements PipelineManager {
   @Override
   public void closePipeline(Pipeline pipeline, boolean onTimeout)
       throws IOException {
+    checkLeader();
     PipelineID pipelineID = pipeline.getId();
     lock.writeLock().lock();
     try {
@@ -393,6 +405,7 @@ public final class PipelineManagerV2Impl implements PipelineManager {
   @Override
   public void scrubPipeline(ReplicationType type, ReplicationFactor factor)
       throws IOException {
+    checkLeader();
     if (type != ReplicationType.RATIS || factor != ReplicationFactor.THREE) {
       // Only srub pipeline for RATIS THREE pipeline
       return;
@@ -439,7 +452,9 @@ public final class PipelineManagerV2Impl implements PipelineManager {
    * Triggers pipeline creation after the specified time.
    */
   @Override
-  public void triggerPipelineCreation() {
+  public void triggerPipelineCreation() throws NotLeaderException {
+    // TODO add checkLeader once follower validates safemode
+    // before it becomes leader.
     backgroundPipelineCreator.triggerPipelineCreation();
   }
 
@@ -457,6 +472,7 @@ public final class PipelineManagerV2Impl implements PipelineManager {
   @Override
   public void activatePipeline(PipelineID pipelineID)
       throws IOException {
+    checkLeader();
     stateManager.updatePipelineState(pipelineID.getProtobuf(),
         HddsProtos.PipelineState.PIPELINE_OPEN);
   }
@@ -470,6 +486,7 @@ public final class PipelineManagerV2Impl implements PipelineManager {
   @Override
   public void deactivatePipeline(PipelineID pipelineID)
       throws IOException {
+    checkLeader();
     stateManager.updatePipelineState(pipelineID.getProtobuf(),
         HddsProtos.PipelineState.PIPELINE_DORMANT);
   }
@@ -484,6 +501,7 @@ public final class PipelineManagerV2Impl implements PipelineManager {
   @Override
   public void waitPipelineReady(PipelineID pipelineID, long timeout)
       throws IOException {
+    checkLeader();
     long st = Time.monotonicNow();
     if (timeout == 0) {
       timeout = pipelineWaitDefaultTimeout;
@@ -515,7 +533,8 @@ public final class PipelineManagerV2Impl implements PipelineManager {
   }
 
   @Override
-  public Map<String, Integer> getPipelineInfo() {
+  public Map<String, Integer> getPipelineInfo() throws NotLeaderException {
+    checkLeader();
     final Map<String, Integer> pipelineInfo = new HashMap<>();
     for (Pipeline.PipelineState state : Pipeline.PipelineState.values()) {
       pipelineInfo.put(state.toString(), 0);
@@ -564,13 +583,21 @@ public final class PipelineManagerV2Impl implements PipelineManager {
 
     // Trigger pipeline creation only if the preCheck status has changed to
     // complete.
-    if (isPipelineCreationAllowed() && !currentAllowPipelines) {
-      triggerPipelineCreation();
-    }
-    // Start the pipeline creation thread only when safemode switches off
-    if (!getSafeModeStatus() && currentlyInSafeMode) {
-      startPipelineCreator();
+
+    try {
+      if (isPipelineCreationAllowed() && !currentAllowPipelines) {
+        triggerPipelineCreation();
+      }
+      // Start the pipeline creation thread only when safemode switches off
+      if (!getSafeModeStatus() && currentlyInSafeMode) {
+        startPipelineCreator();
+      }
+    } catch (NotLeaderException ex) {
+      LOG.warn("Not the current leader SCM and cannot process pipeline" +
+              " creation. Suggested leader is: ",
+          scmhaManager.getSuggestedLeader().getAddress());
     }
+
   }
 
   @VisibleForTesting
@@ -593,6 +620,20 @@ public final class PipelineManagerV2Impl implements PipelineManager {
   public StateManager getStateManager() {
     return stateManager;
   }
+  
+  public void setScmhaManager(SCMHAManager scmhaManager) {
+    this.scmhaManager = scmhaManager;
+  }
+
+  /**
+   * Check if scm is current leader.
+   * @throws NotLeaderException when it's not the current leader.
+   */
+  private void checkLeader() throws NotLeaderException {
+    if (!scmhaManager.isLeader()) {
+      throw scmhaManager.triggerNotLeaderException();
+    }
+  }
 
   private void setBackgroundPipelineCreator(
       BackgroundPipelineCreator backgroundPipelineCreator) {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
index c3b14fb..ce48c11 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
@@ -28,11 +28,14 @@ import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.NotLeaderException;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftGroupMemberId;
+import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.StateMachineException;
+import org.apache.ratis.server.RaftServer;
 
 /**
  * Mock SCMHAManager implementation for testing.
@@ -40,16 +43,30 @@ import org.apache.ratis.protocol.StateMachineException;
 public final class MockSCMHAManager implements SCMHAManager {
 
   private final SCMRatisServer ratisServer;
+  private boolean isLeader;
 
   public static SCMHAManager getInstance() {
     return new MockSCMHAManager();
   }
 
+  public static SCMHAManager getLeaderInstance() {
+    MockSCMHAManager mockSCMHAManager = new MockSCMHAManager();
+    mockSCMHAManager.setIsLeader(true);
+    return mockSCMHAManager;
+  }
+
+  public static SCMHAManager getFollowerInstance() {
+    MockSCMHAManager mockSCMHAManager = new MockSCMHAManager();
+    mockSCMHAManager.setIsLeader(false);
+    return mockSCMHAManager;
+  }
+
   /**
    * Creates MockSCMHAManager instance.
    */
   private MockSCMHAManager() {
     this.ratisServer = new MockRatisServer();
+    this.isLeader = true;
   }
 
   @Override
@@ -62,7 +79,16 @@ public final class MockSCMHAManager implements SCMHAManager {
    */
   @Override
   public boolean isLeader() {
-    return true;
+    return isLeader;
+  }
+
+  public void setIsLeader(boolean isLeader) {
+    this.isLeader = isLeader;
+  }
+
+  @Override
+  public RaftPeer getSuggestedLeader() {
+    throw new UnsupportedOperationException();
   }
 
   /**
@@ -81,6 +107,16 @@ public final class MockSCMHAManager implements SCMHAManager {
     ratisServer.stop();
   }
 
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public NotLeaderException triggerNotLeaderException() {
+    return new NotLeaderException(RaftGroupMemberId.valueOf(
+        RaftPeerId.valueOf("peer"), RaftGroupId.randomId()),
+        null, new ArrayList<>());
+  }
+
   private static class MockRatisServer implements SCMRatisServer {
 
     private Map<RequestType, Object> handlers =
@@ -141,6 +177,21 @@ public final class MockSCMHAManager implements SCMHAManager {
     }
 
     @Override
+    public RaftServer getServer() {
+      return null;
+    }
+
+    @Override
+    public RaftGroupId getRaftGroupId() {
+      return null;
+    }
+
+    @Override
+    public List<RaftPeer> getRaftPeers() {
+      return new ArrayList<>();
+    }
+
+    @Override
     public void stop() {
     }
   }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineActionHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineActionHandler.java
index 99443c3..e40c8ba 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineActionHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineActionHandler.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineActionsFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.ratis.protocol.NotLeaderException;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -37,7 +38,7 @@ public class TestPipelineActionHandler {
 
   @Test
   public void testCloseActionForMissingPipeline()
-      throws PipelineNotFoundException {
+      throws PipelineNotFoundException, NotLeaderException {
     final PipelineManager manager = Mockito.mock(PipelineManager.class);
     final EventQueue queue = Mockito.mock(EventQueue.class);
 
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
index e1f9104..a8f03bb 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
 import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
 import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
@@ -37,6 +38,7 @@ import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.ozone.container.common.SCMTestUtils;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.ratis.protocol.NotLeaderException;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -66,7 +68,6 @@ public class TestPipelineManagerImpl {
   private DBStore dbStore;
   private static MockNodeManager nodeManager;
   private static int maxPipelineCount;
-  private static EventQueue eventQueue;
 
   @Before
   public void init() throws Exception {
@@ -76,7 +77,6 @@ public class TestPipelineManagerImpl {
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
     dbStore = DBStoreBuilder.createDBStore(conf, new SCMDBDefinition());
     nodeManager = new MockNodeManager(true, 20);
-    eventQueue = new EventQueue();
     maxPipelineCount = nodeManager.getNodeCount(HddsProtos.NodeState.HEALTHY) *
         conf.getInt(OZONE_DATANODE_PIPELINE_LIMIT,
             OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT) /
@@ -91,17 +91,23 @@ public class TestPipelineManagerImpl {
     FileUtil.fullyDelete(testDir);
   }
 
-  private PipelineManagerV2Impl createPipelineManager()
+  private PipelineManagerV2Impl createPipelineManager(boolean leader)
       throws IOException {
-    return PipelineManagerV2Impl.newPipelineManager(
-        conf, MockSCMHAManager.getInstance(),
-        nodeManager,
-        SCMDBDefinition.PIPELINES.getTable(dbStore), eventQueue);
+    SCMHAManager scmhaManager;
+    if (leader) {
+      scmhaManager = MockSCMHAManager.getLeaderInstance();
+    } else {
+      scmhaManager = MockSCMHAManager.getFollowerInstance();
+    }
+    return PipelineManagerV2Impl.newPipelineManager(conf, scmhaManager,
+        new MockNodeManager(true, 20),
+        SCMDBDefinition.PIPELINES.getTable(dbStore),
+        new EventQueue());
   }
 
   @Test
   public void testCreatePipeline() throws Exception {
-    PipelineManagerV2Impl pipelineManager = createPipelineManager();
+    PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
     Assert.assertTrue(pipelineManager.getPipelines().isEmpty());
     pipelineManager.allowPipelineCreation();
     Pipeline pipeline1 = pipelineManager.createPipeline(
@@ -115,7 +121,7 @@ public class TestPipelineManagerImpl {
     Assert.assertTrue(pipelineManager.containsPipeline(pipeline2.getId()));
     pipelineManager.close();
 
-    PipelineManagerV2Impl pipelineManager2 = createPipelineManager();
+    PipelineManagerV2Impl pipelineManager2 = createPipelineManager(true);
     // Should be able to load previous pipelines.
     Assert.assertFalse(pipelineManager.getPipelines().isEmpty());
     Assert.assertEquals(2, pipelineManager.getPipelines().size());
@@ -129,8 +135,24 @@ public class TestPipelineManagerImpl {
   }
 
   @Test
+  public void testCreatePipelineShouldFailOnFollower() throws Exception {
+    PipelineManagerV2Impl pipelineManager = createPipelineManager(false);
+    Assert.assertTrue(pipelineManager.getPipelines().isEmpty());
+    pipelineManager.allowPipelineCreation();
+    try {
+      pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
+          HddsProtos.ReplicationFactor.THREE);
+    } catch (NotLeaderException ex) {
+      pipelineManager.close();
+      return;
+    }
+    // Should not reach here.
+    Assert.fail();
+  }
+
+  @Test
   public void testUpdatePipelineStates() throws Exception {
-    PipelineManagerV2Impl pipelineManager = createPipelineManager();
+    PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
     pipelineManager.allowPipelineCreation();
     Pipeline pipeline = pipelineManager.createPipeline(
         HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE);
@@ -164,8 +186,71 @@ public class TestPipelineManagerImpl {
   }
 
   @Test
+  public void testOpenPipelineShouldFailOnFollower() throws Exception {
+    PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
+    pipelineManager.allowPipelineCreation();
+    Pipeline pipeline = pipelineManager.createPipeline(
+        HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE);
+    Assert.assertEquals(1, pipelineManager.getPipelines().size());
+    Assert.assertTrue(pipelineManager.containsPipeline(pipeline.getId()));
+    Assert.assertEquals(ALLOCATED, pipeline.getPipelineState());
+    // Change to follower
+    pipelineManager.setScmhaManager(MockSCMHAManager.getFollowerInstance());
+    try {
+      pipelineManager.openPipeline(pipeline.getId());
+    } catch (NotLeaderException ex) {
+      pipelineManager.close();
+      return;
+    }
+    // Should not reach here.
+    Assert.fail();
+  }
+
+  @Test
+  public void testActivatePipelineShouldFailOnFollower() throws Exception {
+    PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
+    pipelineManager.allowPipelineCreation();
+    Pipeline pipeline = pipelineManager.createPipeline(
+        HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE);
+    Assert.assertEquals(1, pipelineManager.getPipelines().size());
+    Assert.assertTrue(pipelineManager.containsPipeline(pipeline.getId()));
+    Assert.assertEquals(ALLOCATED, pipeline.getPipelineState());
+    // Change to follower
+    pipelineManager.setScmhaManager(MockSCMHAManager.getFollowerInstance());
+    try {
+      pipelineManager.activatePipeline(pipeline.getId());
+    } catch (NotLeaderException ex) {
+      pipelineManager.close();
+      return;
+    }
+    // Should not reach here.
+    Assert.fail();
+  }
+
+  @Test
+  public void testDeactivatePipelineShouldFailOnFollower() throws Exception {
+    PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
+    pipelineManager.allowPipelineCreation();
+    Pipeline pipeline = pipelineManager.createPipeline(
+        HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE);
+    Assert.assertEquals(1, pipelineManager.getPipelines().size());
+    Assert.assertTrue(pipelineManager.containsPipeline(pipeline.getId()));
+    Assert.assertEquals(ALLOCATED, pipeline.getPipelineState());
+    // Change to follower
+    pipelineManager.setScmhaManager(MockSCMHAManager.getFollowerInstance());
+    try {
+      pipelineManager.deactivatePipeline(pipeline.getId());
+    } catch (NotLeaderException ex) {
+      pipelineManager.close();
+      return;
+    }
+    // Should not reach here.
+    Assert.fail();
+  }
+
+  @Test
   public void testRemovePipeline() throws Exception {
-    PipelineManagerV2Impl pipelineManager = createPipelineManager();
+    PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
     pipelineManager.allowPipelineCreation();
     // Create a pipeline
     Pipeline pipeline = pipelineManager.createPipeline(
@@ -207,12 +292,33 @@ public class TestPipelineManagerImpl {
   }
 
   @Test
+  public void testClosePipelineShouldFailOnFollower() throws Exception {
+    PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
+    pipelineManager.allowPipelineCreation();
+    Pipeline pipeline = pipelineManager.createPipeline(
+        HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE);
+    Assert.assertEquals(1, pipelineManager.getPipelines().size());
+    Assert.assertTrue(pipelineManager.containsPipeline(pipeline.getId()));
+    Assert.assertEquals(ALLOCATED, pipeline.getPipelineState());
+    // Change to follower
+    pipelineManager.setScmhaManager(MockSCMHAManager.getFollowerInstance());
+    try {
+      pipelineManager.closePipeline(pipeline, false);
+    } catch (NotLeaderException ex) {
+      pipelineManager.close();
+      return;
+    }
+    // Should not reach here.
+    Assert.fail();
+  }
+
+  @Test
   public void testPipelineReport() throws Exception {
-    PipelineManagerV2Impl pipelineManager = createPipelineManager();
+    PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
     pipelineManager.allowPipelineCreation();
     SCMSafeModeManager scmSafeModeManager =
         new SCMSafeModeManager(conf, new ArrayList<>(), pipelineManager,
-            eventQueue);
+            new EventQueue());
     Pipeline pipeline = pipelineManager
         .createPipeline(HddsProtos.ReplicationType.RATIS,
             HddsProtos.ReplicationFactor.THREE);
@@ -258,7 +364,7 @@ public class TestPipelineManagerImpl {
 
   @Test
   public void testPipelineCreationFailedMetric() throws Exception {
-    PipelineManagerV2Impl pipelineManager = createPipelineManager();
+    PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
     pipelineManager.allowPipelineCreation();
 
     // No pipeline at start
@@ -313,7 +419,7 @@ public class TestPipelineManagerImpl {
 
   @Test
   public void testPipelineOpenOnlyWhenLeaderReported() throws Exception {
-    PipelineManagerV2Impl pipelineManager = createPipelineManager();
+    PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
     pipelineManager.allowPipelineCreation();
 
     pipelineManager.onMessage(
@@ -324,13 +430,13 @@ public class TestPipelineManagerImpl {
     // close manager
     pipelineManager.close();
     // new pipeline manager loads the pipelines from the db in ALLOCATED state
-    pipelineManager = createPipelineManager();
+    pipelineManager = createPipelineManager(true);
     Assert.assertEquals(Pipeline.PipelineState.ALLOCATED,
         pipelineManager.getPipeline(pipeline.getId()).getPipelineState());
 
     SCMSafeModeManager scmSafeModeManager =
         new SCMSafeModeManager(new OzoneConfiguration(),
-            new ArrayList<>(), pipelineManager, eventQueue);
+            new ArrayList<>(), pipelineManager, new EventQueue());
     PipelineReportHandler pipelineReportHandler =
         new PipelineReportHandler(scmSafeModeManager, pipelineManager, conf);
 
@@ -362,7 +468,7 @@ public class TestPipelineManagerImpl {
         OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, -1,
         TimeUnit.MILLISECONDS);
 
-    PipelineManagerV2Impl pipelineManager = createPipelineManager();
+    PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
     pipelineManager.allowPipelineCreation();
     Pipeline pipeline = pipelineManager
         .createPipeline(HddsProtos.ReplicationType.RATIS,
@@ -388,6 +494,14 @@ public class TestPipelineManagerImpl {
     pipelineManager.close();
   }
 
+  @Test (expected = NotLeaderException.class)
+  public void testScrubPipelineShouldFailOnFollower() throws Exception {
+    PipelineManagerV2Impl pipelineManager = createPipelineManager(false);
+    pipelineManager.allowPipelineCreation();
+    pipelineManager.scrubPipeline(HddsProtos.ReplicationType.RATIS,
+        HddsProtos.ReplicationFactor.THREE);
+  }
+
   @Test
   public void testPipelineNotCreatedUntilSafeModePrecheck() throws Exception {
     // No timeout for pipeline scrubber.
@@ -395,7 +509,7 @@ public class TestPipelineManagerImpl {
         OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, -1,
         TimeUnit.MILLISECONDS);
 
-    PipelineManagerV2Impl pipelineManager = createPipelineManager();
+    PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
     try {
       pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
               HddsProtos.ReplicationFactor.THREE);
@@ -433,7 +547,7 @@ public class TestPipelineManagerImpl {
         OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, -1,
         TimeUnit.MILLISECONDS);
 
-    PipelineManagerV2Impl pipelineManager = createPipelineManager();
+    PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
     Assert.assertTrue(pipelineManager.getSafeModeStatus());
     Assert.assertFalse(pipelineManager.isPipelineCreationAllowed());
     // First pass pre-check as true, but safemode still on
@@ -456,6 +570,6 @@ public class TestPipelineManagerImpl {
       boolean isLeader) {
     SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode report =
         TestUtils.getPipelineReportFromDatanode(dn, pipeline.getId(), isLeader);
-    pipelineReportHandler.onMessage(report, eventQueue);
+    pipelineReportHandler.onMessage(report, new EventQueue());
   }
 }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
index 935dc77..0febf06 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
@@ -485,7 +485,7 @@ public class TestSCMSafeModeManager {
 
 
   @Test
-  public void testDisableSafeMode() {
+  public void testDisableSafeMode() throws IOException {
     OzoneConfiguration conf = new OzoneConfiguration(config);
     conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED, false);
     PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org