You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by na...@apache.org on 2020/10/25 17:23:48 UTC
[hadoop-ozone] 02/11: HDDS-3837. Add isLeader check in SCMHAManager.
This is an automated email from the ASF dual-hosted git repository.
nanda pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
commit 58394ebdd22db72b8f51a7ed700ad0c54eff4e3d
Author: Li Cheng <ti...@tencent.com>
AuthorDate: Sat Oct 24 20:55:36 2020 +0530
HDDS-3837. Add isLeader check in SCMHAManager.
---
.../hadoop/hdds/scm/block/BlockManagerImpl.java | 5 +-
.../scm/container/CloseContainerEventHandler.java | 4 +-
.../apache/hadoop/hdds/scm/ha/SCMHAManager.java | 13 ++
.../hadoop/hdds/scm/ha/SCMHAManagerImpl.java | 85 ++++++++++-
.../org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java | 6 +-
.../apache/hadoop/hdds/scm/ha/SCMRatisServer.java | 10 ++
.../hadoop/hdds/scm/ha/SCMRatisServerImpl.java | 15 ++
.../hadoop/hdds/scm/node/NewNodeHandler.java | 12 +-
.../scm/node/NonHealthyToHealthyNodeHandler.java | 12 +-
.../scm/pipeline/BackgroundPipelineCreator.java | 2 +-
.../hadoop/hdds/scm/pipeline/PipelineManager.java | 5 +-
.../hdds/scm/pipeline/PipelineManagerMXBean.java | 3 +-
.../hdds/scm/pipeline/PipelineManagerV2Impl.java | 65 +++++++--
.../hadoop/hdds/scm/ha/MockSCMHAManager.java | 53 ++++++-
.../scm/pipeline/TestPipelineActionHandler.java | 3 +-
.../hdds/scm/pipeline/TestPipelineManagerImpl.java | 156 ++++++++++++++++++---
.../hdds/scm/safemode/TestSCMSafeModeManager.java | 2 +-
17 files changed, 401 insertions(+), 50 deletions(-)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index b5b2aaf..ec0094b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -57,6 +57,8 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVI
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
+
+import org.apache.ratis.protocol.NotLeaderException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -256,7 +258,8 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
* @param containerInfo - Container Info.
* @return AllocatedBlock
*/
- private AllocatedBlock newBlock(ContainerInfo containerInfo) {
+ private AllocatedBlock newBlock(ContainerInfo containerInfo)
+ throws NotLeaderException {
try {
final Pipeline pipeline = pipelineManager
.getPipeline(containerInfo.getPipelineID());
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
index fd73711..a2b79fb 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.ratis.protocol.NotLeaderException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -98,7 +99,7 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> {
* @throws ContainerNotFoundException
*/
private List<DatanodeDetails> getNodes(final ContainerInfo container)
- throws ContainerNotFoundException {
+ throws ContainerNotFoundException, NotLeaderException {
try {
return pipelineManager.getPipeline(container.getPipelineID()).getNodes();
} catch (PipelineNotFoundException ex) {
@@ -109,5 +110,4 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> {
.collect(Collectors.toList());
}
}
-
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
index eb6c800..ade0ad9 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
@@ -17,6 +17,9 @@
package org.apache.hadoop.hdds.scm.ha;
+import org.apache.ratis.protocol.NotLeaderException;
+import org.apache.ratis.protocol.RaftPeer;
+
import java.io.IOException;
/**
@@ -40,7 +43,17 @@ public interface SCMHAManager {
SCMRatisServer getRatisServer();
/**
+ * Returns suggested leader from RaftServer.
+ */
+ RaftPeer getSuggestedLeader();
+
+ /**
* Stops the HA service.
*/
void shutdown() throws IOException;
+
+ /**
+ * Returns NotLeaderException with useful info.
+ */
+ NotLeaderException triggerNotLeaderException();
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
index 89ac714..8bb9457 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
@@ -17,7 +17,17 @@
package org.apache.hadoop.hdds.scm.ha;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.ratis.protocol.NotLeaderException;
+import org.apache.ratis.protocol.RaftGroupMemberId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerProxy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
@@ -31,14 +41,17 @@ import java.io.IOException;
*/
public class SCMHAManagerImpl implements SCMHAManager {
- private static boolean isLeader = true;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SCMHAManagerImpl.class);
private final SCMRatisServerImpl ratisServer;
+ private final ConfigurationSource conf;
/**
* Creates SCMHAManager instance.
*/
public SCMHAManagerImpl(final ConfigurationSource conf) throws IOException {
+ this.conf = conf;
this.ratisServer = new SCMRatisServerImpl(
conf.getObject(SCMHAConfiguration.class), conf);
}
@@ -56,7 +69,28 @@ public class SCMHAManagerImpl implements SCMHAManager {
*/
@Override
public boolean isLeader() {
- return isLeader;
+ if (!SCMHAUtils.isSCMHAEnabled(conf)) {
+ // When SCM HA is not enabled, the current SCM is always the leader.
+ return true;
+ }
+ RaftServer server = ratisServer.getServer();
+ Preconditions.checkState(server instanceof RaftServerProxy);
+ RaftServerImpl serverImpl = null;
+ try {
+ // SCM only has one raft group.
+ serverImpl = ((RaftServerProxy) server)
+ .getImpl(ratisServer.getRaftGroupId());
+ if (serverImpl != null) {
+ // Only when it's sure the current SCM is the leader, otherwise
+ // it should all return false.
+ return serverImpl.isLeader();
+ }
+ } catch (IOException ioe) {
+ LOG.error("Fail to get RaftServer impl and therefore it's not clear " +
+ "whether it's leader. ", ioe);
+ }
+
+ return false;
}
/**
@@ -67,6 +101,42 @@ public class SCMHAManagerImpl implements SCMHAManager {
return ratisServer;
}
+ private RaftPeerId getPeerIdFromRoleInfo(RaftServerImpl serverImpl) {
+ if (serverImpl.isLeader()) {
+ return RaftPeerId.getRaftPeerId(
+ serverImpl.getRoleInfoProto().getLeaderInfo().toString());
+ } else if (serverImpl.isFollower()) {
+ return RaftPeerId.valueOf(
+ serverImpl.getRoleInfoProto().getFollowerInfo()
+ .getLeaderInfo().getId().getId());
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public RaftPeer getSuggestedLeader() {
+ RaftServer server = ratisServer.getServer();
+ Preconditions.checkState(server instanceof RaftServerProxy);
+ RaftServerImpl serverImpl = null;
+ try {
+ // SCM only has one raft group.
+ serverImpl = ((RaftServerProxy) server)
+ .getImpl(ratisServer.getRaftGroupId());
+ if (serverImpl != null) {
+ RaftPeerId peerId = getPeerIdFromRoleInfo(serverImpl);
+ if (peerId != null) {
+ return new RaftPeer(peerId);
+ }
+ return null;
+ }
+ } catch (IOException ioe) {
+ LOG.error("Fail to get RaftServer impl and therefore it's not clear " +
+ "whether it's leader. ", ioe);
+ }
+ return null;
+ }
+
/**
* {@inheritDoc}
*/
@@ -75,4 +145,15 @@ public class SCMHAManagerImpl implements SCMHAManager {
ratisServer.stop();
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public NotLeaderException triggerNotLeaderException() {
+ return new NotLeaderException(RaftGroupMemberId.valueOf(
+ ratisServer.getServer().getId(),
+ ratisServer.getRaftGroupId()),
+ getSuggestedLeader(),
+ ratisServer.getRaftPeers());
+ }
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java
index eb22566..0f71744 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java
@@ -20,7 +20,7 @@ package org.apache.hadoop.hdds.scm.ha;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.ScmUtils;
import org.apache.hadoop.hdds.scm.server.ratis.SCMRatisServer;
@@ -37,12 +37,12 @@ public final class SCMHAUtils {
}
// Check if SCM HA is enabled.
- public static boolean isSCMHAEnabled(OzoneConfiguration conf) {
+ public static boolean isSCMHAEnabled(ConfigurationSource conf) {
return conf.getBoolean(ScmConfigKeys.OZONE_SCM_HA_ENABLE_KEY,
ScmConfigKeys.OZONE_SCM_HA_ENABLE_DEFAULT);
}
- public static File createSCMRatisDir(OzoneConfiguration conf)
+ public static File createSCMRatisDir(ConfigurationSource conf)
throws IllegalArgumentException {
String scmRatisDir = SCMRatisServer.getSCMRatisDirectory(conf);
if (scmRatisDir == null || scmRatisDir.isEmpty()) {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
index 4ddbc7b..2f99776 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
@@ -18,8 +18,12 @@
package org.apache.hadoop.hdds.scm.ha;
import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.RaftServer;
import java.io.IOException;
+import java.util.List;
import java.util.concurrent.ExecutionException;
/**
@@ -35,4 +39,10 @@ public interface SCMRatisServer {
throws IOException, ExecutionException, InterruptedException;
void stop() throws IOException;
+
+ RaftServer getServer();
+
+ RaftGroupId getRaftGroupId();
+
+ List<RaftPeer> getRaftPeers();
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
index 45ae212..33ae109 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
@@ -110,4 +111,18 @@ public class SCMRatisServerImpl implements SCMRatisServer {
server.close();
}
+ @Override
+ public RaftServer getServer() {
+ return server;
+ }
+
+ @Override
+ public RaftGroupId getRaftGroupId() {
+ return raftGroupId;
+ }
+
+ @Override
+ public List<RaftPeer> getRaftPeers() {
+ return Collections.singletonList(new RaftPeer(raftPeerId));
+ }
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java
index a40a63a..42cada9 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java
@@ -23,11 +23,16 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.ratis.protocol.NotLeaderException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Handles New Node event.
*/
public class NewNodeHandler implements EventHandler<DatanodeDetails> {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(NewNodeHandler.class);
private final PipelineManager pipelineManager;
private final ConfigurationSource conf;
@@ -41,6 +46,11 @@ public class NewNodeHandler implements EventHandler<DatanodeDetails> {
@Override
public void onMessage(DatanodeDetails datanodeDetails,
EventPublisher publisher) {
- pipelineManager.triggerPipelineCreation();
+ try {
+ pipelineManager.triggerPipelineCreation();
+ } catch (NotLeaderException ex) {
+ LOG.debug("Not the current leader SCM and cannot start pipeline" +
+ " creation.");
+ }
}
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToHealthyNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToHealthyNodeHandler.java
index cc32f84..e73231b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToHealthyNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToHealthyNodeHandler.java
@@ -23,12 +23,17 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.ratis.protocol.NotLeaderException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Handles Stale node event.
*/
public class NonHealthyToHealthyNodeHandler
implements EventHandler<DatanodeDetails> {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(NonHealthyToHealthyNodeHandler.class);
private final PipelineManager pipelineManager;
private final ConfigurationSource conf;
@@ -42,6 +47,11 @@ public class NonHealthyToHealthyNodeHandler
@Override
public void onMessage(DatanodeDetails datanodeDetails,
EventPublisher publisher) {
- pipelineManager.triggerPipelineCreation();
+ try {
+ pipelineManager.triggerPipelineCreation();
+ } catch (NotLeaderException ex) {
+ LOG.debug("Not the current leader SCM and cannot start pipeline" +
+ " creation.");
+ }
}
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
index f240293..42b3a93 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
@@ -103,7 +103,7 @@ class BackgroundPipelineCreator {
}
}
- private void createPipelines() {
+ private void createPipelines() throws RuntimeException {
// TODO: #CLUTIL Different replication factor may need to be supported
HddsProtos.ReplicationType type = HddsProtos.ReplicationType.valueOf(
conf.get(OzoneConfigKeys.OZONE_REPLICATION_TYPE,
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index 9c997a8..ddd461b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.ratis.protocol.NotLeaderException;
/**
* Interface which exposes the api for pipeline management.
@@ -55,7 +56,7 @@ public interface PipelineManager extends Closeable, PipelineManagerMXBean,
ReplicationFactor factor);
List<Pipeline> getPipelines(ReplicationType type,
- Pipeline.PipelineState state);
+ Pipeline.PipelineState state) throws NotLeaderException;
List<Pipeline> getPipelines(ReplicationType type,
ReplicationFactor factor, Pipeline.PipelineState state);
@@ -84,7 +85,7 @@ public interface PipelineManager extends Closeable, PipelineManagerMXBean,
void startPipelineCreator();
- void triggerPipelineCreation();
+ void triggerPipelineCreation() throws NotLeaderException;
void incNumBlocksAllocatedMetric(PipelineID id);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerMXBean.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerMXBean.java
index 6d7d717..55e096b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerMXBean.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerMXBean.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hdds.scm.pipeline;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.ratis.protocol.NotLeaderException;
import java.util.Map;
@@ -33,6 +34,6 @@ public interface PipelineManagerMXBean {
* Returns the number of pipelines in different state.
* @return state to number of pipeline map
*/
- Map<String, Integer> getPipelineInfo();
+ Map<String, Integer> getPipelineInfo() throws NotLeaderException;
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
index 1241745..069540c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdds.utils.Scheduler;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.util.Time;
+import org.apache.ratis.protocol.NotLeaderException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,19 +76,21 @@ public final class PipelineManagerV2Impl implements PipelineManager {
private final SCMPipelineMetrics metrics;
private long pipelineWaitDefaultTimeout;
private final AtomicBoolean isInSafeMode;
+ private SCMHAManager scmhaManager;
// Used to track if the safemode pre-checks have completed. This is designed
// to prevent pipelines being created until sufficient nodes have registered.
private final AtomicBoolean pipelineCreationAllowed;
private PipelineManagerV2Impl(ConfigurationSource conf,
- NodeManager nodeManager,
- StateManager pipelineStateManager,
- PipelineFactory pipelineFactory,
+ SCMHAManager scmhaManager,
+ StateManager pipelineStateManager,
+ PipelineFactory pipelineFactory,
EventPublisher eventPublisher) {
this.lock = new ReentrantReadWriteLock();
this.pipelineFactory = pipelineFactory;
this.stateManager = pipelineStateManager;
this.conf = conf;
+ this.scmhaManager = scmhaManager;
this.eventPublisher = eventPublisher;
this.pmInfoBean = MBeans.register("SCMPipelineManager",
"SCMPipelineManagerInfo", this);
@@ -120,7 +123,7 @@ public final class PipelineManagerV2Impl implements PipelineManager {
nodeManager, stateManager, conf, eventPublisher);
// Create PipelineManager
PipelineManagerV2Impl pipelineManager = new PipelineManagerV2Impl(conf,
- nodeManager, stateManager, pipelineFactory, eventPublisher);
+ scmhaManager, stateManager, pipelineFactory, eventPublisher);
// Create background thread.
Scheduler scheduler = new Scheduler(
@@ -136,6 +139,7 @@ public final class PipelineManagerV2Impl implements PipelineManager {
@Override
public Pipeline createPipeline(ReplicationType type,
ReplicationFactor factor) throws IOException {
+ checkLeader();
if (!isPipelineCreationAllowed() && factor != ReplicationFactor.ONE) {
LOG.debug("Pipeline creation is not allowed until safe mode prechecks " +
"complete");
@@ -266,6 +270,7 @@ public final class PipelineManagerV2Impl implements PipelineManager {
@Override
public void addContainerToPipeline(
PipelineID pipelineID, ContainerID containerID) throws IOException {
+ checkLeader();
lock.writeLock().lock();
try {
stateManager.addContainerToPipeline(pipelineID, containerID);
@@ -277,6 +282,7 @@ public final class PipelineManagerV2Impl implements PipelineManager {
@Override
public void removeContainerFromPipeline(
PipelineID pipelineID, ContainerID containerID) throws IOException {
+ checkLeader();
lock.writeLock().lock();
try {
stateManager.removeContainerFromPipeline(pipelineID, containerID);
@@ -288,6 +294,7 @@ public final class PipelineManagerV2Impl implements PipelineManager {
@Override
public NavigableSet<ContainerID> getContainersInPipeline(
PipelineID pipelineID) throws IOException {
+ checkLeader();
lock.readLock().lock();
try {
return stateManager.getContainers(pipelineID);
@@ -298,11 +305,13 @@ public final class PipelineManagerV2Impl implements PipelineManager {
@Override
public int getNumberOfContainers(PipelineID pipelineID) throws IOException {
+ checkLeader();
return stateManager.getNumberOfContainers(pipelineID);
}
@Override
public void openPipeline(PipelineID pipelineId) throws IOException {
+ checkLeader();
lock.writeLock().lock();
try {
Pipeline pipeline = stateManager.getPipeline(pipelineId);
@@ -328,6 +337,7 @@ public final class PipelineManagerV2Impl implements PipelineManager {
* @throws IOException
*/
protected void removePipeline(Pipeline pipeline) throws IOException {
+ checkLeader();
pipelineFactory.close(pipeline.getType(), pipeline);
PipelineID pipelineID = pipeline.getId();
lock.writeLock().lock();
@@ -349,6 +359,7 @@ public final class PipelineManagerV2Impl implements PipelineManager {
*/
protected void closeContainersForPipeline(final PipelineID pipelineId)
throws IOException {
+ checkLeader();
Set<ContainerID> containerIDs = stateManager.getContainers(pipelineId);
for (ContainerID containerID : containerIDs) {
eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID);
@@ -364,6 +375,7 @@ public final class PipelineManagerV2Impl implements PipelineManager {
@Override
public void closePipeline(Pipeline pipeline, boolean onTimeout)
throws IOException {
+ checkLeader();
PipelineID pipelineID = pipeline.getId();
lock.writeLock().lock();
try {
@@ -393,6 +405,7 @@ public final class PipelineManagerV2Impl implements PipelineManager {
@Override
public void scrubPipeline(ReplicationType type, ReplicationFactor factor)
throws IOException {
+ checkLeader();
if (type != ReplicationType.RATIS || factor != ReplicationFactor.THREE) {
// Only srub pipeline for RATIS THREE pipeline
return;
@@ -439,7 +452,9 @@ public final class PipelineManagerV2Impl implements PipelineManager {
* Triggers pipeline creation after the specified time.
*/
@Override
- public void triggerPipelineCreation() {
+ public void triggerPipelineCreation() throws NotLeaderException {
+ // TODO add checkLeader once follower validates safemode
+ // before it becomes leader.
backgroundPipelineCreator.triggerPipelineCreation();
}
@@ -457,6 +472,7 @@ public final class PipelineManagerV2Impl implements PipelineManager {
@Override
public void activatePipeline(PipelineID pipelineID)
throws IOException {
+ checkLeader();
stateManager.updatePipelineState(pipelineID.getProtobuf(),
HddsProtos.PipelineState.PIPELINE_OPEN);
}
@@ -470,6 +486,7 @@ public final class PipelineManagerV2Impl implements PipelineManager {
@Override
public void deactivatePipeline(PipelineID pipelineID)
throws IOException {
+ checkLeader();
stateManager.updatePipelineState(pipelineID.getProtobuf(),
HddsProtos.PipelineState.PIPELINE_DORMANT);
}
@@ -484,6 +501,7 @@ public final class PipelineManagerV2Impl implements PipelineManager {
@Override
public void waitPipelineReady(PipelineID pipelineID, long timeout)
throws IOException {
+ checkLeader();
long st = Time.monotonicNow();
if (timeout == 0) {
timeout = pipelineWaitDefaultTimeout;
@@ -515,7 +533,8 @@ public final class PipelineManagerV2Impl implements PipelineManager {
}
@Override
- public Map<String, Integer> getPipelineInfo() {
+ public Map<String, Integer> getPipelineInfo() throws NotLeaderException {
+ checkLeader();
final Map<String, Integer> pipelineInfo = new HashMap<>();
for (Pipeline.PipelineState state : Pipeline.PipelineState.values()) {
pipelineInfo.put(state.toString(), 0);
@@ -564,13 +583,21 @@ public final class PipelineManagerV2Impl implements PipelineManager {
// Trigger pipeline creation only if the preCheck status has changed to
// complete.
- if (isPipelineCreationAllowed() && !currentAllowPipelines) {
- triggerPipelineCreation();
- }
- // Start the pipeline creation thread only when safemode switches off
- if (!getSafeModeStatus() && currentlyInSafeMode) {
- startPipelineCreator();
+
+ try {
+ if (isPipelineCreationAllowed() && !currentAllowPipelines) {
+ triggerPipelineCreation();
+ }
+ // Start the pipeline creation thread only when safemode switches off
+ if (!getSafeModeStatus() && currentlyInSafeMode) {
+ startPipelineCreator();
+ }
+ } catch (NotLeaderException ex) {
+ LOG.warn("Not the current leader SCM and cannot process pipeline" +
+ " creation. Suggested leader is: ",
+ scmhaManager.getSuggestedLeader().getAddress());
}
+
}
@VisibleForTesting
@@ -593,6 +620,20 @@ public final class PipelineManagerV2Impl implements PipelineManager {
public StateManager getStateManager() {
return stateManager;
}
+
+ public void setScmhaManager(SCMHAManager scmhaManager) {
+ this.scmhaManager = scmhaManager;
+ }
+
+ /**
+ * Check if scm is current leader.
+ * @throws NotLeaderException when it's not the current leader.
+ */
+ private void checkLeader() throws NotLeaderException {
+ if (!scmhaManager.isLeader()) {
+ throw scmhaManager.triggerNotLeaderException();
+ }
+ }
private void setBackgroundPipelineCreator(
BackgroundPipelineCreator backgroundPipelineCreator) {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
index c3b14fb..ce48c11 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
@@ -28,11 +28,14 @@ import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.NotLeaderException;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
+import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.StateMachineException;
+import org.apache.ratis.server.RaftServer;
/**
* Mock SCMHAManager implementation for testing.
@@ -40,16 +43,30 @@ import org.apache.ratis.protocol.StateMachineException;
public final class MockSCMHAManager implements SCMHAManager {
private final SCMRatisServer ratisServer;
+ private boolean isLeader;
public static SCMHAManager getInstance() {
return new MockSCMHAManager();
}
+ public static SCMHAManager getLeaderInstance() {
+ MockSCMHAManager mockSCMHAManager = new MockSCMHAManager();
+ mockSCMHAManager.setIsLeader(true);
+ return mockSCMHAManager;
+ }
+
+ public static SCMHAManager getFollowerInstance() {
+ MockSCMHAManager mockSCMHAManager = new MockSCMHAManager();
+ mockSCMHAManager.setIsLeader(false);
+ return mockSCMHAManager;
+ }
+
/**
* Creates MockSCMHAManager instance.
*/
private MockSCMHAManager() {
this.ratisServer = new MockRatisServer();
+ this.isLeader = true;
}
@Override
@@ -62,7 +79,16 @@ public final class MockSCMHAManager implements SCMHAManager {
*/
@Override
public boolean isLeader() {
- return true;
+ return isLeader;
+ }
+
+ public void setIsLeader(boolean isLeader) {
+ this.isLeader = isLeader;
+ }
+
+ @Override
+ public RaftPeer getSuggestedLeader() {
+ throw new UnsupportedOperationException();
}
/**
@@ -81,6 +107,16 @@ public final class MockSCMHAManager implements SCMHAManager {
ratisServer.stop();
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public NotLeaderException triggerNotLeaderException() {
+ return new NotLeaderException(RaftGroupMemberId.valueOf(
+ RaftPeerId.valueOf("peer"), RaftGroupId.randomId()),
+ null, new ArrayList<>());
+ }
+
private static class MockRatisServer implements SCMRatisServer {
private Map<RequestType, Object> handlers =
@@ -141,6 +177,21 @@ public final class MockSCMHAManager implements SCMHAManager {
}
@Override
+ public RaftServer getServer() {
+ return null;
+ }
+
+ @Override
+ public RaftGroupId getRaftGroupId() {
+ return null;
+ }
+
+ @Override
+ public List<RaftPeer> getRaftPeers() {
+ return new ArrayList<>();
+ }
+
+ @Override
public void stop() {
}
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineActionHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineActionHandler.java
index 99443c3..e40c8ba 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineActionHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineActionHandler.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineActionsFromDatanode;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.ratis.protocol.NotLeaderException;
import org.junit.Test;
import org.mockito.Mockito;
@@ -37,7 +38,7 @@ public class TestPipelineActionHandler {
@Test
public void testCloseActionForMissingPipeline()
- throws PipelineNotFoundException {
+ throws PipelineNotFoundException, NotLeaderException {
final PipelineManager manager = Mockito.mock(PipelineManager.class);
final EventQueue queue = Mockito.mock(EventQueue.class);
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
index e1f9104..a8f03bb 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
@@ -37,6 +38,7 @@ import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.ratis.protocol.NotLeaderException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -66,7 +68,6 @@ public class TestPipelineManagerImpl {
private DBStore dbStore;
private static MockNodeManager nodeManager;
private static int maxPipelineCount;
- private static EventQueue eventQueue;
@Before
public void init() throws Exception {
@@ -76,7 +77,6 @@ public class TestPipelineManagerImpl {
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
dbStore = DBStoreBuilder.createDBStore(conf, new SCMDBDefinition());
nodeManager = new MockNodeManager(true, 20);
- eventQueue = new EventQueue();
maxPipelineCount = nodeManager.getNodeCount(HddsProtos.NodeState.HEALTHY) *
conf.getInt(OZONE_DATANODE_PIPELINE_LIMIT,
OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT) /
@@ -91,17 +91,23 @@ public class TestPipelineManagerImpl {
FileUtil.fullyDelete(testDir);
}
- private PipelineManagerV2Impl createPipelineManager()
+ private PipelineManagerV2Impl createPipelineManager(boolean leader)
throws IOException {
- return PipelineManagerV2Impl.newPipelineManager(
- conf, MockSCMHAManager.getInstance(),
- nodeManager,
- SCMDBDefinition.PIPELINES.getTable(dbStore), eventQueue);
+ SCMHAManager scmhaManager;
+ if (leader) {
+ scmhaManager = MockSCMHAManager.getLeaderInstance();
+ } else {
+ scmhaManager = MockSCMHAManager.getFollowerInstance();
+ }
+ return PipelineManagerV2Impl.newPipelineManager(conf, scmhaManager,
+ new MockNodeManager(true, 20),
+ SCMDBDefinition.PIPELINES.getTable(dbStore),
+ new EventQueue());
}
@Test
public void testCreatePipeline() throws Exception {
- PipelineManagerV2Impl pipelineManager = createPipelineManager();
+ PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
Assert.assertTrue(pipelineManager.getPipelines().isEmpty());
pipelineManager.allowPipelineCreation();
Pipeline pipeline1 = pipelineManager.createPipeline(
@@ -115,7 +121,7 @@ public class TestPipelineManagerImpl {
Assert.assertTrue(pipelineManager.containsPipeline(pipeline2.getId()));
pipelineManager.close();
- PipelineManagerV2Impl pipelineManager2 = createPipelineManager();
+ PipelineManagerV2Impl pipelineManager2 = createPipelineManager(true);
// Should be able to load previous pipelines.
Assert.assertFalse(pipelineManager.getPipelines().isEmpty());
Assert.assertEquals(2, pipelineManager.getPipelines().size());
@@ -129,8 +135,24 @@ public class TestPipelineManagerImpl {
}
@Test
+ public void testCreatePipelineShouldFailOnFollower() throws Exception {
+ PipelineManagerV2Impl pipelineManager = createPipelineManager(false);
+ Assert.assertTrue(pipelineManager.getPipelines().isEmpty());
+ pipelineManager.allowPipelineCreation();
+ try {
+ pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.THREE);
+ } catch (NotLeaderException ex) {
+ pipelineManager.close();
+ return;
+ }
+ // Should not reach here.
+ Assert.fail();
+ }
+
+ @Test
public void testUpdatePipelineStates() throws Exception {
- PipelineManagerV2Impl pipelineManager = createPipelineManager();
+ PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
pipelineManager.allowPipelineCreation();
Pipeline pipeline = pipelineManager.createPipeline(
HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE);
@@ -164,8 +186,71 @@ public class TestPipelineManagerImpl {
}
@Test
+ public void testOpenPipelineShouldFailOnFollower() throws Exception {
+ PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
+ pipelineManager.allowPipelineCreation();
+ Pipeline pipeline = pipelineManager.createPipeline(
+ HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE);
+ Assert.assertEquals(1, pipelineManager.getPipelines().size());
+ Assert.assertTrue(pipelineManager.containsPipeline(pipeline.getId()));
+ Assert.assertEquals(ALLOCATED, pipeline.getPipelineState());
+ // Change to follower
+ pipelineManager.setScmhaManager(MockSCMHAManager.getFollowerInstance());
+ try {
+ pipelineManager.openPipeline(pipeline.getId());
+ } catch (NotLeaderException ex) {
+ pipelineManager.close();
+ return;
+ }
+ // Should not reach here.
+ Assert.fail();
+ }
+
+ @Test
+ public void testActivatePipelineShouldFailOnFollower() throws Exception {
+ PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
+ pipelineManager.allowPipelineCreation();
+ Pipeline pipeline = pipelineManager.createPipeline(
+ HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE);
+ Assert.assertEquals(1, pipelineManager.getPipelines().size());
+ Assert.assertTrue(pipelineManager.containsPipeline(pipeline.getId()));
+ Assert.assertEquals(ALLOCATED, pipeline.getPipelineState());
+ // Change to follower
+ pipelineManager.setScmhaManager(MockSCMHAManager.getFollowerInstance());
+ try {
+ pipelineManager.activatePipeline(pipeline.getId());
+ } catch (NotLeaderException ex) {
+ pipelineManager.close();
+ return;
+ }
+ // Should not reach here.
+ Assert.fail();
+ }
+
+ @Test
+ public void testDeactivatePipelineShouldFailOnFollower() throws Exception {
+ PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
+ pipelineManager.allowPipelineCreation();
+ Pipeline pipeline = pipelineManager.createPipeline(
+ HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE);
+ Assert.assertEquals(1, pipelineManager.getPipelines().size());
+ Assert.assertTrue(pipelineManager.containsPipeline(pipeline.getId()));
+ Assert.assertEquals(ALLOCATED, pipeline.getPipelineState());
+ // Change to follower
+ pipelineManager.setScmhaManager(MockSCMHAManager.getFollowerInstance());
+ try {
+ pipelineManager.deactivatePipeline(pipeline.getId());
+ } catch (NotLeaderException ex) {
+ pipelineManager.close();
+ return;
+ }
+ // Should not reach here.
+ Assert.fail();
+ }
+
+ @Test
public void testRemovePipeline() throws Exception {
- PipelineManagerV2Impl pipelineManager = createPipelineManager();
+ PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
pipelineManager.allowPipelineCreation();
// Create a pipeline
Pipeline pipeline = pipelineManager.createPipeline(
@@ -207,12 +292,33 @@ public class TestPipelineManagerImpl {
}
@Test
+ public void testClosePipelineShouldFailOnFollower() throws Exception {
+ PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
+ pipelineManager.allowPipelineCreation();
+ Pipeline pipeline = pipelineManager.createPipeline(
+ HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE);
+ Assert.assertEquals(1, pipelineManager.getPipelines().size());
+ Assert.assertTrue(pipelineManager.containsPipeline(pipeline.getId()));
+ Assert.assertEquals(ALLOCATED, pipeline.getPipelineState());
+ // Change to follower
+ pipelineManager.setScmhaManager(MockSCMHAManager.getFollowerInstance());
+ try {
+ pipelineManager.closePipeline(pipeline, false);
+ } catch (NotLeaderException ex) {
+ pipelineManager.close();
+ return;
+ }
+ // Should not reach here.
+ Assert.fail();
+ }
+
+ @Test
public void testPipelineReport() throws Exception {
- PipelineManagerV2Impl pipelineManager = createPipelineManager();
+ PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
pipelineManager.allowPipelineCreation();
SCMSafeModeManager scmSafeModeManager =
new SCMSafeModeManager(conf, new ArrayList<>(), pipelineManager,
- eventQueue);
+ new EventQueue());
Pipeline pipeline = pipelineManager
.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
@@ -258,7 +364,7 @@ public class TestPipelineManagerImpl {
@Test
public void testPipelineCreationFailedMetric() throws Exception {
- PipelineManagerV2Impl pipelineManager = createPipelineManager();
+ PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
pipelineManager.allowPipelineCreation();
// No pipeline at start
@@ -313,7 +419,7 @@ public class TestPipelineManagerImpl {
@Test
public void testPipelineOpenOnlyWhenLeaderReported() throws Exception {
- PipelineManagerV2Impl pipelineManager = createPipelineManager();
+ PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
pipelineManager.allowPipelineCreation();
pipelineManager.onMessage(
@@ -324,13 +430,13 @@ public class TestPipelineManagerImpl {
// close manager
pipelineManager.close();
// new pipeline manager loads the pipelines from the db in ALLOCATED state
- pipelineManager = createPipelineManager();
+ pipelineManager = createPipelineManager(true);
Assert.assertEquals(Pipeline.PipelineState.ALLOCATED,
pipelineManager.getPipeline(pipeline.getId()).getPipelineState());
SCMSafeModeManager scmSafeModeManager =
new SCMSafeModeManager(new OzoneConfiguration(),
- new ArrayList<>(), pipelineManager, eventQueue);
+ new ArrayList<>(), pipelineManager, new EventQueue());
PipelineReportHandler pipelineReportHandler =
new PipelineReportHandler(scmSafeModeManager, pipelineManager, conf);
@@ -362,7 +468,7 @@ public class TestPipelineManagerImpl {
OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, -1,
TimeUnit.MILLISECONDS);
- PipelineManagerV2Impl pipelineManager = createPipelineManager();
+ PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
pipelineManager.allowPipelineCreation();
Pipeline pipeline = pipelineManager
.createPipeline(HddsProtos.ReplicationType.RATIS,
@@ -388,6 +494,14 @@ public class TestPipelineManagerImpl {
pipelineManager.close();
}
+ @Test (expected = NotLeaderException.class)
+ public void testScrubPipelineShouldFailOnFollower() throws Exception {
+ PipelineManagerV2Impl pipelineManager = createPipelineManager(false);
+ pipelineManager.allowPipelineCreation();
+ pipelineManager.scrubPipeline(HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.THREE);
+ }
+
@Test
public void testPipelineNotCreatedUntilSafeModePrecheck() throws Exception {
// No timeout for pipeline scrubber.
@@ -395,7 +509,7 @@ public class TestPipelineManagerImpl {
OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, -1,
TimeUnit.MILLISECONDS);
- PipelineManagerV2Impl pipelineManager = createPipelineManager();
+ PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
try {
pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
@@ -433,7 +547,7 @@ public class TestPipelineManagerImpl {
OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, -1,
TimeUnit.MILLISECONDS);
- PipelineManagerV2Impl pipelineManager = createPipelineManager();
+ PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
Assert.assertTrue(pipelineManager.getSafeModeStatus());
Assert.assertFalse(pipelineManager.isPipelineCreationAllowed());
// First pass pre-check as true, but safemode still on
@@ -456,6 +570,6 @@ public class TestPipelineManagerImpl {
boolean isLeader) {
SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode report =
TestUtils.getPipelineReportFromDatanode(dn, pipeline.getId(), isLeader);
- pipelineReportHandler.onMessage(report, eventQueue);
+ pipelineReportHandler.onMessage(report, new EventQueue());
}
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
index 935dc77..0febf06 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
@@ -485,7 +485,7 @@ public class TestSCMSafeModeManager {
@Test
- public void testDisableSafeMode() {
+ public void testDisableSafeMode() throws IOException {
OzoneConfiguration conf = new OzoneConfiguration(config);
conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED, false);
PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org