You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sa...@apache.org on 2021/01/19 02:27:46 UTC
[ozone] branch HDDS-2823 updated: HDDS-4568. Add SCMContext to SCM
HA (#1737)
This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-2823 by this push:
new bb9c68f HDDS-4568. Add SCMContext to SCM HA (#1737)
bb9c68f is described below
commit bb9c68fa410aa7643486062e47c8734e5ed3df8d
Author: GlenGeng <gl...@tencent.com>
AuthorDate: Tue Jan 19 10:27:32 2021 +0800
HDDS-4568. Add SCMContext to SCM HA (#1737)
---
.../apache/hadoop/hdds/scm/block/BlockManager.java | 5 +-
.../hadoop/hdds/scm/block/BlockManagerImpl.java | 37 ++---
.../hdds/scm/block/SCMBlockDeletingService.java | 18 ++-
.../container/AbstractContainerReportHandler.java | 23 ++-
.../scm/container/CloseContainerEventHandler.java | 23 ++-
.../hdds/scm/container/ContainerReportHandler.java | 6 +-
.../IncrementalContainerReportHandler.java | 6 +-
.../hdds/scm/container/ReplicationManager.java | 25 +++-
.../hadoop/hdds/scm/ha/MockSCMHAManager.java | 10 +-
.../org/apache/hadoop/hdds/scm/ha/SCMContext.java | 164 +++++++++++++++++++++
.../apache/hadoop/hdds/scm/ha/SCMHAManager.java | 10 --
.../hadoop/hdds/scm/ha/SCMHAManagerImpl.java | 30 +---
.../hadoop/hdds/scm/ha/SCMRatisServerImpl.java | 8 +-
.../apache/hadoop/hdds/scm/ha/SCMStateMachine.java | 39 ++++-
.../hadoop/hdds/scm/node/SCMNodeManager.java | 50 +++----
.../hdds/scm/pipeline/PipelineActionHandler.java | 18 ++-
.../hadoop/hdds/scm/pipeline/PipelineFactory.java | 6 +-
.../hdds/scm/pipeline/PipelineManagerV2Impl.java | 12 +-
.../hdds/scm/pipeline/PipelineReportHandler.java | 17 ++-
.../hdds/scm/pipeline/RatisPipelineProvider.java | 23 ++-
.../hdds/scm/pipeline/SCMPipelineManager.java | 3 +-
...lockLocationProtocolServerSideTranslatorPB.java | 19 +--
...inerLocationProtocolServerSideTranslatorPB.java | 19 +--
.../hdds/scm/safemode/SCMSafeModeManager.java | 12 +-
.../hdds/scm/server/SCMBlockProtocolServer.java | 6 +-
.../hdds/scm/server/SCMClientProtocolServer.java | 27 +---
.../hadoop/hdds/scm/server/SCMConfigurator.java | 22 ++-
.../hdds/scm/server/StorageContainerManager.java | 54 +++++--
.../java/org/apache/hadoop/hdds/scm/TestUtils.java | 2 +
.../hadoop/hdds/scm/block/TestBlockManager.java | 17 ++-
.../container/TestCloseContainerEventHandler.java | 9 +-
.../TestIncrementalContainerReportHandler.java | 19 ++-
.../hdds/scm/container/TestReplicationManager.java | 3 +
.../scm/container/TestSCMContainerManager.java | 4 +-
.../scm/container/TestUnknownContainerReport.java | 3 +-
.../apache/hadoop/hdds/scm/ha/TestSCMContext.java | 67 +++++++++
.../hdds/scm/node/TestContainerPlacement.java | 6 +-
.../hdds/scm/node/TestNodeReportHandler.java | 5 +-
.../scm/pipeline/MockRatisPipelineProvider.java | 7 +-
.../scm/pipeline/TestPipelineActionHandler.java | 3 +-
.../hdds/scm/pipeline/TestPipelineManagerImpl.java | 10 +-
.../hdds/scm/pipeline/TestSCMPipelineManager.java | 7 +-
.../choose/algorithms/TestLeaderChoosePolicy.java | 7 +-
.../safemode/TestHealthyPipelineSafeModeRule.java | 10 +-
.../TestOneReplicaPipelineSafeModeRule.java | 4 +-
.../hdds/scm/safemode/TestSCMSafeModeManager.java | 21 ++-
.../scm/server/TestSCMBlockProtocolServer.java | 4 +-
.../hadoop/ozone/scm/node/TestSCMNodeMetrics.java | 3 +-
.../hdds/scm/pipeline/TestPipelineClose.java | 4 +-
.../ozone/container/TestContainerReplication.java | 8 +-
.../TestCloseContainerByPipeline.java | 29 +++-
.../commandhandler/TestCloseContainerHandler.java | 8 +-
.../commandhandler/TestDeleteContainerHandler.java | 28 ++--
.../apache/hadoop/ozone/om/TestKeyManagerImpl.java | 2 +
.../ReconIncrementalContainerReportHandler.java | 5 +-
.../hadoop/ozone/recon/scm/ReconNodeManager.java | 4 +-
.../recon/scm/ReconPipelineReportHandler.java | 4 +-
.../scm/ReconStorageContainerManagerFacade.java | 16 +-
.../scm/AbstractReconContainerManagerTest.java | 5 +-
...TestReconIncrementalContainerReportHandler.java | 9 +-
.../ozone/recon/scm/TestReconPipelineManager.java | 9 +-
.../recon/scm/TestReconPipelineReportHandler.java | 4 +-
62 files changed, 737 insertions(+), 301 deletions(-)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManager.java
index 77fe841..bfc68c7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManager.java
@@ -24,8 +24,6 @@ import java.util.List;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
-import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
-import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.ozone.common.BlockGroup;
/**
@@ -33,8 +31,7 @@ import org.apache.hadoop.ozone.common.BlockGroup;
* Block APIs.
* Container is transparent to these APIs.
*/
-public interface BlockManager extends Closeable,
- EventHandler<SafeModeStatus> {
+public interface BlockManager extends Closeable {
/**
* Allocates a new block for a given size.
* @param size - Block Size
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index ceca7bd..b215026 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -32,10 +32,8 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmOps;
import org.apache.hadoop.hdds.scm.PipelineRequestInformation;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.ScmUtils;
import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
@@ -46,10 +44,7 @@ import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
-import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
-import org.apache.hadoop.hdds.scm.safemode.SafeModePrecheck;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.utils.UniqueId;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.ozone.common.BlockGroup;
@@ -72,6 +67,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
// Currently only user of the block service is Ozone, CBlock manages blocks
// by itself and does not rely on the Block service offered by SCM.
+ private final StorageContainerManager scm;
private final PipelineManager pipelineManager;
private final ContainerManagerV2 containerManager;
@@ -81,7 +77,6 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
private final SCMBlockDeletingService blockDeletingService;
private ObjectName mxBean;
- private SafeModePrecheck safeModePrecheck;
private PipelineChoosePolicy pipelineChoosePolicy;
/**
@@ -94,6 +89,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
public BlockManagerImpl(final ConfigurationSource conf,
final StorageContainerManager scm) {
Objects.requireNonNull(scm, "SCM cannot be null");
+ this.scm = scm;
this.pipelineManager = scm.getPipelineManager();
this.containerManager = scm.getContainerManager();
this.pipelineChoosePolicy = scm.getPipelineChoosePolicy();
@@ -116,9 +112,8 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
TimeUnit.MILLISECONDS);
blockDeletingService =
new SCMBlockDeletingService(deletedBlockLog, containerManager,
- scm.getScmNodeManager(), scm.getEventQueue(), svcInterval,
- serviceTimeout, conf);
- safeModePrecheck = new SafeModePrecheck(conf);
+ scm.getScmNodeManager(), scm.getEventQueue(), scm.getScmContext(),
+ svcInterval, serviceTimeout, conf);
}
/**
@@ -158,7 +153,10 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
if (LOG.isTraceEnabled()) {
LOG.trace("Size : {} , type : {}, factor : {} ", size, type, factor);
}
- ScmUtils.preCheck(ScmOps.allocateBlock, safeModePrecheck);
+ if (scm.getScmContext().isInSafeMode()) {
+ throw new SCMException("SafeModePrecheck failed for allocateBlock",
+ SCMException.ResultCodes.SAFE_MODE_EXCEPTION);
+ }
if (size < 0 || size > containerSize) {
LOG.warn("Invalid block size requested : {}", size);
throw new SCMException("Unsupported block size: " + size,
@@ -294,8 +292,10 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
@Override
public void deleteBlocks(List<BlockGroup> keyBlocksInfoList)
throws IOException {
- ScmUtils.preCheck(ScmOps.deleteBlock, safeModePrecheck);
-
+ if (scm.getScmContext().isInSafeMode()) {
+ throw new SCMException("SafeModePrecheck failed for deleteBlocks",
+ SCMException.ResultCodes.SAFE_MODE_EXCEPTION);
+ }
Map<Long, List<Long>> containerBlocks = new HashMap<>();
// TODO: track the block size info so that we can reclaim the container
// TODO: used space when the block is deleted.
@@ -366,25 +366,12 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
}
/**
- * Returns status of scm safe mode determined by SAFE_MODE_STATUS event.
- * */
- public boolean isScmInSafeMode() {
- return this.safeModePrecheck.isInSafeMode();
- }
-
- /**
* Get class logger.
* */
public static Logger getLogger() {
return LOG;
}
- @Override
- public void onMessage(SafeModeStatus status,
- EventPublisher publisher) {
- this.safeModePrecheck.setInSafeMode(status.isInSafeMode());
- }
-
/**
* This class uses system current time milliseconds to generate unique id.
*/
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
index 908b4f1..c3028a4 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.server.events.EventPublisher;
@@ -40,10 +41,12 @@ import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,19 +67,22 @@ public class SCMBlockDeletingService extends BackgroundService {
private final ContainerManagerV2 containerManager;
private final NodeManager nodeManager;
private final EventPublisher eventPublisher;
+ private final SCMContext scmContext;
private int blockDeleteLimitSize;
+ @SuppressWarnings("parameternumber")
public SCMBlockDeletingService(DeletedBlockLog deletedBlockLog,
ContainerManagerV2 containerManager, NodeManager nodeManager,
- EventPublisher eventPublisher, Duration interval, long serviceTimeout,
- ConfigurationSource conf) {
+ EventPublisher eventPublisher, SCMContext scmContext,
+ Duration interval, long serviceTimeout, ConfigurationSource conf) {
super("SCMBlockDeletingService", interval.toMillis(), TimeUnit.MILLISECONDS,
BLOCK_DELETING_SERVICE_CORE_POOL_SIZE, serviceTimeout);
this.deletedBlockLog = deletedBlockLog;
this.containerManager = containerManager;
this.nodeManager = nodeManager;
this.eventPublisher = eventPublisher;
+ this.scmContext = scmContext;
blockDeleteLimitSize =
conf.getObject(ScmConfig.class).getBlockDeletionLimit();
@@ -146,9 +152,10 @@ public class SCMBlockDeletingService extends BackgroundService {
// We should stop caching new commands if num of un-processed
// command is bigger than a limit, e.g 50. In case datanode goes
// offline for sometime, the cached commands be flooded.
+ SCMCommand<?> command = new DeleteBlocksCommand(dnTXs);
+ command.setTerm(scmContext.getTerm());
eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
- new CommandForDatanode<>(dnId,
- new DeleteBlocksCommand(dnTXs)));
+ new CommandForDatanode<>(dnId, command));
if (LOG.isDebugEnabled()) {
LOG.debug(
"Added delete block command for datanode {} in the queue,"
@@ -170,6 +177,9 @@ public class SCMBlockDeletingService extends BackgroundService {
transactions.getBlocksDeleted(),
transactions.getDatanodeTransactionMap().size(),
Time.monotonicNow() - startTime);
+ } catch (NotLeaderException nle) {
+ LOG.warn("Skip current run, since not leader any more.", nle);
+ return EmptyTaskResult.newResult();
} catch (IOException e) {
// We may tolerate a number of failures for sometime
// but if it continues to fail, at some point we need to raise
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
index 1aa34cf..d71539d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
@@ -27,10 +27,13 @@ import org.apache.hadoop.hdds.protocol.proto
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.slf4j.Logger;
import java.io.IOException;
@@ -46,6 +49,7 @@ import java.util.function.Supplier;
public class AbstractContainerReportHandler {
private final ContainerManagerV2 containerManager;
+ private final SCMContext scmContext;
private final Logger logger;
/**
@@ -56,10 +60,13 @@ public class AbstractContainerReportHandler {
* @param logger Logger to be used for logging
*/
AbstractContainerReportHandler(final ContainerManagerV2 containerManager,
+ final SCMContext scmContext,
final Logger logger) {
Preconditions.checkNotNull(containerManager);
+ Preconditions.checkNotNull(scmContext);
Preconditions.checkNotNull(logger);
this.containerManager = containerManager;
+ this.scmContext = scmContext;
this.logger = logger;
}
@@ -317,11 +324,17 @@ public class AbstractContainerReportHandler {
protected void deleteReplica(ContainerID containerID, DatanodeDetails dn,
EventPublisher publisher, String reason) {
- final DeleteContainerCommand deleteCommand =
- new DeleteContainerCommand(containerID.getId(), true);
- final CommandForDatanode datanodeCommand = new CommandForDatanode<>(
- dn.getUuid(), deleteCommand);
- publisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
+ SCMCommand<?> command = new DeleteContainerCommand(
+ containerID.getId(), true);
+ try {
+ command.setTerm(scmContext.getTerm());
+ } catch (NotLeaderException nle) {
+ logger.warn("Skip sending delete container command," +
+ " since not leader SCM", nle);
+ return;
+ }
+ publisher.fireEvent(SCMEvents.DATANODE_COMMAND,
+ new CommandForDatanode<>(dn.getUuid(), command));
logger.info("Sending delete container command for " + reason +
" container {} to datanode {}", containerID.getId(), dn);
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
index 45a0e1f..3320d90 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
@@ -23,6 +23,7 @@ import java.util.stream.Collectors;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
import org.apache.hadoop.hdds.server.events.EventHandler;
@@ -30,6 +31,7 @@ import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,11 +53,14 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> {
private final PipelineManager pipelineManager;
private final ContainerManagerV2 containerManager;
+ private final SCMContext scmContext;
public CloseContainerEventHandler(final PipelineManager pipelineManager,
- final ContainerManagerV2 containerManager) {
+ final ContainerManagerV2 containerManager,
+ final SCMContext scmContext) {
this.pipelineManager = pipelineManager;
this.containerManager = containerManager;
+ this.scmContext = scmContext;
}
@Override
@@ -74,19 +79,21 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> {
.getContainer(containerID);
// Send close command to datanodes, if the container is in CLOSING state
if (container.getState() == LifeCycleState.CLOSING) {
+ SCMCommand<?> command = new CloseContainerCommand(
+ containerID.getId(), container.getPipelineID());
+ command.setTerm(scmContext.getTerm());
- final CloseContainerCommand closeContainerCommand =
- new CloseContainerCommand(
- containerID.getId(), container.getPipelineID());
-
- getNodes(container).forEach(node -> publisher.fireEvent(
- DATANODE_COMMAND,
- new CommandForDatanode<>(node.getUuid(), closeContainerCommand)));
+ getNodes(container).forEach(node ->
+ publisher.fireEvent(DATANODE_COMMAND,
+ new CommandForDatanode<>(node.getUuid(), command)));
} else {
LOG.warn("Cannot close container {}, which is in {} state.",
containerID, container.getState());
}
+ } catch (NotLeaderException nle) {
+ LOG.warn("Skip sending close container command,"
+ + " since current SCM is not leader.", nle);
} catch (IOException | InvalidStateTransitionException ex) {
LOG.error("Failed to close the container {}.", containerID, ex);
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
index 365750c..48603c0 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.protocol.proto
import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
@@ -73,8 +74,9 @@ public class ContainerReportHandler extends AbstractContainerReportHandler
*/
public ContainerReportHandler(final NodeManager nodeManager,
final ContainerManagerV2 containerManager,
+ final SCMContext scmContext,
OzoneConfiguration conf) {
- super(containerManager, LOG);
+ super(containerManager, scmContext, LOG);
this.nodeManager = nodeManager;
this.containerManager = containerManager;
@@ -88,7 +90,7 @@ public class ContainerReportHandler extends AbstractContainerReportHandler
public ContainerReportHandler(final NodeManager nodeManager,
final ContainerManagerV2 containerManager) {
- this(nodeManager, containerManager, null);
+ this(nodeManager, containerManager, SCMContext.emptyContext(), null);
}
/**
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java
index 0f0f0f1..1ad507d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos
.ContainerReplicaProto;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
@@ -47,8 +48,9 @@ public class IncrementalContainerReportHandler extends
public IncrementalContainerReportHandler(
final NodeManager nodeManager,
- final ContainerManagerV2 containerManager) {
- super(containerManager, LOG);
+ final ContainerManagerV2 containerManager,
+ final SCMContext scmContext) {
+ super(containerManager, scmContext, LOG);
this.nodeManager = nodeManager;
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
index 91e5f72..0559c3c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
@@ -72,6 +73,8 @@ import com.google.protobuf.GeneratedMessage;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
import static org.apache.hadoop.hdds.conf.ConfigTag.SCM;
+
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -106,6 +109,11 @@ public class ReplicationManager
private final EventPublisher eventPublisher;
/**
+ * SCMContext from StorageContainerManager.
+ */
+ private final SCMContext scmContext;
+
+ /**
* Used for locking a container using its ID while processing it.
*/
private final LockManager<ContainerID> lockManager;
@@ -161,11 +169,13 @@ public class ReplicationManager
final ContainerManagerV2 containerManager,
final PlacementPolicy containerPlacement,
final EventPublisher eventPublisher,
+ final SCMContext scmContext,
final LockManager<ContainerID> lockManager,
final NodeManager nodeManager) {
this.containerManager = containerManager;
this.containerPlacement = containerPlacement;
this.eventPublisher = eventPublisher;
+ this.scmContext = scmContext;
this.lockManager = lockManager;
this.nodeManager = nodeManager;
this.conf = conf;
@@ -957,10 +967,16 @@ public class ReplicationManager
LOG.info("Sending close container command for container {}" +
" to datanode {}.", container.containerID(), datanode);
-
CloseContainerCommand closeContainerCommand =
new CloseContainerCommand(container.getContainerID(),
container.getPipelineID(), force);
+ try {
+ closeContainerCommand.setTerm(scmContext.getTerm());
+ } catch (NotLeaderException nle) {
+ LOG.warn("Skip sending close container command,"
+ + " since current SCM is not leader.", nle);
+ return;
+ }
eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
new CommandForDatanode<>(datanode.getUuid(), closeContainerCommand));
}
@@ -1026,6 +1042,13 @@ public class ReplicationManager
final DatanodeDetails datanode,
final SCMCommand<T> command,
final Consumer<InflightAction> tracker) {
+ try {
+ command.setTerm(scmContext.getTerm());
+ } catch (NotLeaderException nle) {
+ LOG.warn("Skip sending datanode command,"
+ + " since current SCM is not leader.", nle);
+ return;
+ }
final CommandForDatanode<T> datanodeCommand =
new CommandForDatanode<>(datanode.getUuid(), command);
eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
index 08fd2b2..bf25ad5 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
@@ -24,7 +24,6 @@ import java.util.Arrays;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
@@ -65,11 +64,10 @@ public final class MockSCMHAManager implements SCMHAManager {
}
/**
- * {@inheritDoc}
+ * Informs MockRatisServe to behaviour as a leader SCM or a follower SCM.
*/
- @Override
- public Optional<Long> isLeader() {
- return isLeader ? Optional.of((long)0) : Optional.empty();
+ boolean isLeader() {
+ return isLeader;
}
public void setIsLeader(boolean isLeader) {
@@ -113,7 +111,7 @@ public final class MockSCMHAManager implements SCMHAManager {
final RaftGroupMemberId raftId = RaftGroupMemberId.valueOf(
RaftPeerId.valueOf("peer"), RaftGroupId.randomId());
RaftClientReply reply;
- if (isLeader().isPresent()) {
+ if (isLeader()) {
try {
final Message result = process(request);
reply = RaftClientReply.newBuilder()
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMContext.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMContext.java
new file mode 100644
index 0000000..17dad7e
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMContext.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * SCMContext is the single source of truth for some key information shared
+ * across all components within SCM, including:
+ * - RaftServer related info, e.g., isLeader, term.
+ * - SafeMode related info, e.g., inSafeMode, preCheckComplete.
+ */
+public class SCMContext implements EventHandler<SafeModeStatus> {
+ private static final Logger LOG = LoggerFactory.getLogger(SCMContext.class);
+
+ private static final SCMContext EMPTY_CONTEXT
+ = new SCMContext(true, 0, new SafeModeStatus(false, true), null);
+
+ /**
+ * Used by non-HA mode SCM, Recon and Unit Tests.
+ */
+ public static SCMContext emptyContext() {
+ return EMPTY_CONTEXT;
+ }
+
+ /**
+ * Raft related info.
+ */
+ private boolean isLeader;
+ private long term;
+
+ /**
+ * Safe mode related info.
+ */
+ private SafeModeStatus safeModeStatus;
+
+ private final StorageContainerManager scm;
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ SCMContext(boolean isLeader, long term,
+ final SafeModeStatus safeModeStatus,
+ final StorageContainerManager scm) {
+ this.isLeader = isLeader;
+ this.term = term;
+ this.safeModeStatus = safeModeStatus;
+ this.scm = scm;
+ }
+
+ /**
+ * Creates SCMContext instance from StorageContainerManager.
+ */
+ public SCMContext(final StorageContainerManager scm) {
+ this(false, 0, new SafeModeStatus(true, false), scm);
+ Preconditions.checkNotNull(scm, "scm is null");
+ }
+
+ /**
+ *
+ * @param newIsLeader : is leader or not
+ * @param newTerm : term if current SCM becomes leader
+ */
+ public void updateIsLeaderAndTerm(boolean newIsLeader, long newTerm) {
+ lock.writeLock().lock();
+ try {
+ LOG.info("update <isLeader,term> from <{},{}> to <{},{}>",
+ isLeader, term, newIsLeader, newTerm);
+
+ isLeader = newIsLeader;
+ term = newTerm;
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Check whether current SCM is leader or not.
+ *
+ * @return isLeader
+ */
+ public boolean isLeader() {
+ lock.readLock().lock();
+ try {
+ return isLeader;
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Get term of current leader SCM.
+ *
+ * @return term
+ * @throws NotLeaderException if isLeader is false
+ */
+ public long getTerm() throws NotLeaderException {
+ lock.readLock().lock();
+ try {
+ if (!isLeader) {
+ LOG.warn("getTerm is invoked when not leader.");
+ throw scm.getScmHAManager()
+ .getRatisServer()
+ .triggerNotLeaderException();
+ }
+ return term;
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public void onMessage(SafeModeStatus status,
+ EventPublisher publisher) {
+ lock.writeLock().lock();
+ try {
+ LOG.info("Update SafeModeStatus from {} to {}.", safeModeStatus, status);
+ safeModeStatus = status;
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ public boolean isInSafeMode() {
+ lock.readLock().lock();
+ try {
+ return safeModeStatus.isInSafeMode();
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ public boolean isPreCheckComplete() {
+ lock.readLock().lock();
+ try {
+ return safeModeStatus.isPreCheckComplete();
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
index 59410b1..8fe6d7f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hdds.scm.ha;
import java.io.IOException;
-import java.util.Optional;
/**
* SCMHAManager provides HA service for SCM.
@@ -31,15 +30,6 @@ public interface SCMHAManager {
void start() throws IOException;
/**
- * For HA mode, return an Optional that holds term of the
- * underlying RaftServer iff current SCM is in leader role.
- * Otherwise, return an empty optional.
- *
- * For non-HA mode, return an Optional that holds term 0.
- */
- Optional<Long> isLeader();
-
- /**
* Returns RatisServer instance associated with the SCM instance.
*/
SCMRatisServer getRatisServer();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
index bac6336..db5e937 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
@@ -18,12 +18,11 @@
package org.apache.hadoop.hdds.scm.ha;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.ratis.proto.RaftProtos;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.Optional;
/**
* SCMHAManagerImpl uses Apache Ratis for HA implementation. We will have 2N+1
@@ -44,10 +43,12 @@ public class SCMHAManagerImpl implements SCMHAManager {
/**
* Creates SCMHAManager instance.
*/
- public SCMHAManagerImpl(final ConfigurationSource conf) throws IOException {
+ public SCMHAManagerImpl(final ConfigurationSource conf,
+ final StorageContainerManager scm)
+ throws IOException {
this.conf = conf;
this.ratisServer = new SCMRatisServerImpl(
- conf.getObject(SCMHAConfiguration.class), conf);
+ conf.getObject(SCMHAConfiguration.class), conf, scm);
}
/**
@@ -58,27 +59,6 @@ public class SCMHAManagerImpl implements SCMHAManager {
ratisServer.start();
}
- /**
- * {@inheritDoc}
- */
- @Override
- public Optional<Long> isLeader() {
- if (!SCMHAUtils.isSCMHAEnabled(conf)) {
- // When SCM HA is not enabled, the current SCM is always the leader.
- return Optional.of((long)0);
- }
- RaftProtos.RoleInfoProto roleInfoProto
- = ratisServer.getDivision().getInfo().getRoleInfoProto();
-
- return roleInfoProto.hasLeaderInfo()
- ? Optional.of(roleInfoProto.getLeaderInfo().getTerm())
- : Optional.empty();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
public SCMRatisServer getRatisServer() {
return ratisServer;
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
index e959446..3a453e3 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftClientReply;
@@ -55,6 +56,7 @@ public class SCMRatisServerImpl implements SCMRatisServer {
LoggerFactory.getLogger(SCMRatisServerImpl.class);
private final RaftServer.Division division;
+ private final StorageContainerManager scm;
private final InetSocketAddress address;
private final ClientId clientId = ClientId.randomId();
private final AtomicLong callId = new AtomicLong();
@@ -62,8 +64,10 @@ public class SCMRatisServerImpl implements SCMRatisServer {
// TODO: Refactor and remove ConfigurationSource and use only
// SCMHAConfiguration.
SCMRatisServerImpl(final SCMHAConfiguration haConf,
- final ConfigurationSource conf)
+ final ConfigurationSource conf,
+ final StorageContainerManager scm)
throws IOException {
+ this.scm = scm;
this.address = haConf.getRatisBindAddress();
SCMHAGroupBuilder haGrpBuilder = new SCMHAGroupBuilder(haConf, conf);
@@ -75,7 +79,7 @@ public class SCMRatisServerImpl implements SCMRatisServer {
.setServerId(haGrpBuilder.getPeerId())
.setGroup(haGrpBuilder.getRaftGroup())
.setProperties(serverProperties)
- .setStateMachine(new SCMStateMachine())
+ .setStateMachine(new SCMStateMachine(scm, this))
.build();
this.division = server.getDivision(haGrpBuilder.getRaftGroupId());
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
index ee26e58..052bf4b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
@@ -20,26 +20,40 @@ package org.apache.hadoop.hdds.scm.ha;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftGroupMemberId;
+import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* TODO.
*/
public class SCMStateMachine extends BaseStateMachine {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SCMStateMachine.class);
+ private final StorageContainerManager scm;
+ private final SCMRatisServer ratisServer;
private final Map<RequestType, Object> handlers;
- public SCMStateMachine() {
+
+ public SCMStateMachine(final StorageContainerManager scm,
+ final SCMRatisServer ratisServer) {
+ this.scm = scm;
+ this.ratisServer = ratisServer;
this.handlers = new EnumMap<>(RequestType.class);
}
@@ -89,4 +103,27 @@ public class SCMStateMachine extends BaseStateMachine {
}
}
+ @Override
+ public void notifyNotLeader(Collection<TransactionContext> pendingEntries) {
+ LOG.info("current leader SCM steps down.");
+ scm.getScmContext().updateIsLeaderAndTerm(false, 0);
+ }
+
+ @Override
+ public void notifyLeaderChanged(RaftGroupMemberId groupMemberId,
+ RaftPeerId newLeaderId) {
+ if (!groupMemberId.getPeerId().equals(newLeaderId)) {
+ LOG.info("leader changed, yet current SCM is still follower.");
+ return;
+ }
+
+ long term = scm.getScmHAManager()
+ .getRatisServer()
+ .getDivision()
+ .getInfo()
+ .getCurrentTerm();
+
+ LOG.info("current SCM becomes leader of term {}.", term);
+ scm.getScmContext().updateIsLeaderAndTerm(true, term);
+ }
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index da51bea..4e6f53e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -25,7 +25,6 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.Collections;
@@ -49,7 +48,7 @@ import org.apache.hadoop.hdds.scm.VersionInfo;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
-import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
@@ -75,6 +74,7 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.hadoop.util.Time;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -111,16 +111,14 @@ public class SCMNodeManager implements NodeManager {
new ConcurrentHashMap<>();
private final int numPipelinesPerMetadataVolume;
private final int heavyNodeCriteria;
- private final SCMHAManager scmhaManager;
+ private final SCMContext scmContext;
/**
* Constructs SCM machine Manager.
*/
public SCMNodeManager(OzoneConfiguration conf,
- SCMStorageConfig scmStorageConfig,
- EventPublisher eventPublisher,
- NetworkTopology networkTopology,
- SCMHAManager scmhaManager) {
+ SCMStorageConfig scmStorageConfig, EventPublisher eventPublisher,
+ NetworkTopology networkTopology, SCMContext scmContext) {
this.nodeStateManager = new NodeStateManager(conf, eventPublisher);
this.version = VersionInfo.getLatestVersion();
this.commandQueue = new CommandQueue();
@@ -146,14 +144,7 @@ public class SCMNodeManager implements NodeManager {
ScmConfigKeys.OZONE_SCM_PIPELINE_PER_METADATA_VOLUME_DEFAULT);
String dnLimit = conf.get(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT);
this.heavyNodeCriteria = dnLimit == null ? 0 : Integer.parseInt(dnLimit);
- this.scmhaManager = scmhaManager;
- }
-
- public SCMNodeManager(OzoneConfiguration conf,
- SCMStorageConfig scmStorageConfig,
- EventPublisher eventPublisher,
- NetworkTopology networkTopology) {
- this(conf, scmStorageConfig, eventPublisher, networkTopology, null);
+ this.scmContext = scmContext;
}
private void registerMXBean() {
@@ -436,10 +427,19 @@ public class SCMNodeManager implements NodeManager {
reportedDn.getPersistedOpStateExpiryEpochSec(),
scmStatus.getOperationalState(),
scmStatus.getOpStateExpiryEpochSeconds());
- addDatanodeCommand(reportedDn.getUuid(),
- new SetNodeOperationalStateCommand(
- Time.monotonicNow(), scmStatus.getOperationalState(),
- scmStatus.getOpStateExpiryEpochSeconds()));
+
+ try {
+ SCMCommand<?> command = new SetNodeOperationalStateCommand(
+ Time.monotonicNow(),
+ scmStatus.getOperationalState(),
+ scmStatus.getOpStateExpiryEpochSeconds());
+ command.setTerm(scmContext.getTerm());
+ addDatanodeCommand(reportedDn.getUuid(), command);
+ } catch (NotLeaderException nle) {
+ LOG.warn("Skip sending SetNodeOperationalStateCommand,"
+ + " since current SCM is not leader.", nle);
+ return;
+ }
}
DatanodeDetails scmDnd = nodeStateManager.getNode(reportedDn);
scmDnd.setPersistedOpStateExpiryEpochSec(
@@ -802,18 +802,6 @@ public class SCMNodeManager implements NodeManager {
// Refactor and remove all the usage of this method and delete this method.
@Override
public void addDatanodeCommand(UUID dnId, SCMCommand command) {
- if (scmhaManager != null && command.getTerm() == 0) {
- Optional<Long> termOpt = scmhaManager.isLeader();
-
- if (!termOpt.isPresent()) {
- LOG.warn("Not leader, drop SCMCommand {}.", command);
- return;
- }
-
- LOG.warn("Help set term {} for SCMCommand {}. It is not an accurate " +
- "way to set term of SCMCommand.", termOpt.get(), command);
- command.setTerm(termOpt.get());
- }
this.commandQueue.addCommand(dnId, command);
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
index e719adb..89d2833 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
@@ -23,12 +23,15 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineActionsFromDatanode;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,11 +47,13 @@ public class PipelineActionHandler
LoggerFactory.getLogger(PipelineActionHandler.class);
private final PipelineManager pipelineManager;
+ private final SCMContext scmContext;
private final ConfigurationSource ozoneConf;
public PipelineActionHandler(PipelineManager pipelineManager,
- OzoneConfiguration conf) {
+ SCMContext scmContext, OzoneConfiguration conf) {
this.pipelineManager = pipelineManager;
+ this.scmContext = scmContext;
this.ozoneConf = conf;
}
@@ -87,9 +92,16 @@ public class PipelineActionHandler
} catch (PipelineNotFoundException e) {
LOG.warn("Pipeline action {} received for unknown pipeline {}, " +
"firing close pipeline event.", action, pid);
+ SCMCommand<?> command = new ClosePipelineCommand(pid);
+ try {
+ command.setTerm(scmContext.getTerm());
+ } catch (NotLeaderException nle) {
+ LOG.warn("Skip sending ClosePipelineCommand for pipeline {}," +
+ " since not leader SCM.", pid);
+ return;
+ }
publisher.fireEvent(SCMEvents.DATANODE_COMMAND,
- new CommandForDatanode<>(datanode.getUuid(),
- new ClosePipelineCommand(pid)));
+ new CommandForDatanode<>(datanode.getUuid(), command));
} catch (IOException ioe) {
LOG.error("Could not execute pipeline action={} pipeline={}",
action, pid, ioe);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
index 6bf1d4e..ed73a64 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
@@ -40,14 +41,15 @@ public class PipelineFactory {
private Map<ReplicationType, PipelineProvider> providers;
PipelineFactory(NodeManager nodeManager, StateManager stateManager,
- ConfigurationSource conf, EventPublisher eventPublisher) {
+ ConfigurationSource conf, EventPublisher eventPublisher,
+ SCMContext scmContext) {
providers = new HashMap<>();
providers.put(ReplicationType.STAND_ALONE,
new SimplePipelineProvider(nodeManager, stateManager));
providers.put(ReplicationType.RATIS,
new RatisPipelineProvider(nodeManager,
stateManager, conf,
- eventPublisher));
+ eventPublisher, scmContext));
}
protected PipelineFactory() {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
index 823c9bd..286bdc9 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
@@ -112,9 +113,12 @@ public final class PipelineManagerV2Impl implements PipelineManager {
}
public static PipelineManagerV2Impl newPipelineManager(
- ConfigurationSource conf, SCMHAManager scmhaManager,
- NodeManager nodeManager, Table<PipelineID, Pipeline> pipelineStore,
- EventPublisher eventPublisher) throws IOException {
+ ConfigurationSource conf,
+ SCMHAManager scmhaManager,
+ NodeManager nodeManager,
+ Table<PipelineID, Pipeline> pipelineStore,
+ EventPublisher eventPublisher,
+ SCMContext scmContext) throws IOException {
// Create PipelineStateManager
StateManager stateManager = PipelineStateManagerV2Impl
.newBuilder().setPipelineStore(pipelineStore)
@@ -124,7 +128,7 @@ public final class PipelineManagerV2Impl implements PipelineManager {
// Create PipelineFactory
PipelineFactory pipelineFactory = new PipelineFactory(
- nodeManager, stateManager, conf, eventPublisher);
+ nodeManager, stateManager, conf, eventPublisher, scmContext);
// Create PipelineManager
PipelineManagerV2Impl pipelineManager = new PipelineManagerV2Impl(conf,
scmhaManager, nodeManager, stateManager, pipelineFactory,
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
index ac6a4ad..ca51433 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.protocol.proto
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.safemode.SafeModeManager;
import org.apache.hadoop.hdds.scm.server
.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
@@ -36,6 +37,7 @@ import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,14 +54,18 @@ public class PipelineReportHandler implements
private final PipelineManager pipelineManager;
private final ConfigurationSource conf;
private final SafeModeManager scmSafeModeManager;
+ private final SCMContext scmContext;
private final boolean pipelineAvailabilityCheck;
private final SCMPipelineMetrics metrics;
public PipelineReportHandler(SafeModeManager scmSafeModeManager,
- PipelineManager pipelineManager, ConfigurationSource conf) {
+ PipelineManager pipelineManager,
+ SCMContext scmContext,
+ ConfigurationSource conf) {
Preconditions.checkNotNull(pipelineManager);
this.scmSafeModeManager = scmSafeModeManager;
this.pipelineManager = pipelineManager;
+ this.scmContext = scmContext;
this.conf = conf;
this.metrics = SCMPipelineMetrics.create();
this.pipelineAvailabilityCheck = conf.getBoolean(
@@ -96,11 +102,10 @@ public class PipelineReportHandler implements
try {
pipeline = pipelineManager.getPipeline(pipelineID);
} catch (PipelineNotFoundException e) {
- final ClosePipelineCommand closeCommand =
- new ClosePipelineCommand(pipelineID);
- final CommandForDatanode datanodeCommand =
- new CommandForDatanode<>(dn.getUuid(), closeCommand);
- publisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
+ SCMCommand<?> command = new ClosePipelineCommand(pipelineID);
+ command.setTerm(scmContext.getTerm());
+ publisher.fireEvent(SCMEvents.DATANODE_COMMAND,
+ new CommandForDatanode<>(dn.getUuid(), command));
return;
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index c4f47a3..ede3b1e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
@@ -38,6 +39,7 @@ import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,14 +58,18 @@ public class RatisPipelineProvider extends PipelineProvider {
private int pipelineNumberLimit;
private int maxPipelinePerDatanode;
private final LeaderChoosePolicy leaderChoosePolicy;
+ private final SCMContext scmContext;
@VisibleForTesting
public RatisPipelineProvider(NodeManager nodeManager,
- StateManager stateManager, ConfigurationSource conf,
- EventPublisher eventPublisher) {
+ StateManager stateManager,
+ ConfigurationSource conf,
+ EventPublisher eventPublisher,
+ SCMContext scmContext) {
super(nodeManager, stateManager);
this.conf = conf;
this.eventPublisher = eventPublisher;
+ this.scmContext = scmContext;
this.placementPolicy =
new PipelinePlacementPolicy(nodeManager, stateManager, conf);
this.pipelineNumberLimit = conf.getInt(
@@ -157,6 +163,8 @@ public class RatisPipelineProvider extends PipelineProvider {
new CreatePipelineCommand(pipeline.getId(), pipeline.getType(),
factor, dns);
+ createCommand.setTerm(scmContext.getTerm());
+
dns.forEach(node -> {
LOG.info("Sending CreatePipelineCommand for pipeline:{} to datanode:{}",
pipeline.getId(), node.getUuidString());
@@ -187,14 +195,15 @@ public class RatisPipelineProvider extends PipelineProvider {
* Removes pipeline from SCM. Sends command to destroy pipeline on all
* the datanodes.
*
- * @param pipeline - Pipeline to be destroyed
- * @throws IOException
+ * @param pipeline - Pipeline to be destroyed
+ * @throws NotLeaderException - Send datanode command while not leader
*/
- public void close(Pipeline pipeline) {
+ public void close(Pipeline pipeline) throws NotLeaderException {
final ClosePipelineCommand closeCommand =
new ClosePipelineCommand(pipeline.getId());
- pipeline.getNodes().stream().forEach(node -> {
- final CommandForDatanode datanodeCommand =
+ closeCommand.setTerm(scmContext.getTerm());
+ pipeline.getNodes().forEach(node -> {
+ final CommandForDatanode<?> datanodeCommand =
new CommandForDatanode<>(node.getUuid(), closeCommand);
LOG.info("Send pipeline:{} close command to datanode {}",
pipeline.getId(), datanodeCommand.getDatanodeId());
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index dcae32e..892e3bc 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
import org.apache.hadoop.hdds.server.events.EventPublisher;
@@ -97,7 +98,7 @@ public class SCMPipelineManager implements PipelineManager {
this(conf, nodeManager, pipelineStore, eventPublisher, null, null);
this.stateManager = new PipelineStateManager();
this.pipelineFactory = new PipelineFactory(nodeManager,
- stateManager, conf, eventPublisher);
+ stateManager, conf, eventPublisher, SCMContext.emptyContext());
this.pipelineStore = pipelineStore;
initializePipelineState();
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
index ea6a148..ff68239 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
@@ -42,7 +42,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
-import org.apache.hadoop.hdds.scm.server.SCMBlockProtocolServer;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
@@ -64,6 +64,7 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB
implements ScmBlockLocationProtocolPB {
private final ScmBlockLocationProtocol impl;
+ private final StorageContainerManager scm;
private static final Logger LOG = LoggerFactory
.getLogger(ScmBlockLocationProtocolServerSideTranslatorPB.class);
@@ -79,9 +80,11 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB
*/
public ScmBlockLocationProtocolServerSideTranslatorPB(
ScmBlockLocationProtocol impl,
+ StorageContainerManager scm,
ProtocolMessageMetrics<ProtocolMessageEnum> metrics)
throws IOException {
this.impl = impl;
+ this.scm = scm;
dispatcher = new OzoneProtocolMessageDispatcher<>(
"BlockLocationProtocol", metrics, LOG);
@@ -95,19 +98,13 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB
.setTraceID(traceID);
}
- private boolean isLeader() throws ServiceException {
- if (!(impl instanceof SCMBlockProtocolServer)) {
- throw new ServiceException("Should be SCMBlockProtocolServer");
- } else {
- return ((SCMBlockProtocolServer) impl).getScm().checkLeader();
- }
- }
-
@Override
public SCMBlockLocationResponse send(RpcController controller,
SCMBlockLocationRequest request) throws ServiceException {
- if (!isLeader()) {
- throw new ServiceException(new IOException("SCM IS NOT LEADER"));
+ if (!scm.getScmContext().isLeader()) {
+ throw new ServiceException(scm.getScmHAManager()
+ .getRatisServer()
+ .triggerNotLeaderException());
}
return dispatcher.processRequest(
request,
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
index 1d964d2..d515d37 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -79,7 +79,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
-import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher;
import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics;
@@ -105,6 +105,7 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
StorageContainerLocationProtocolServerSideTranslatorPB.class);
private final StorageContainerLocationProtocol impl;
+ private final StorageContainerManager scm;
private OzoneProtocolMessageDispatcher<ScmContainerLocationRequest,
ScmContainerLocationResponse, ProtocolMessageEnum>
@@ -119,27 +120,23 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
*/
public StorageContainerLocationProtocolServerSideTranslatorPB(
StorageContainerLocationProtocol impl,
+ StorageContainerManager scm,
ProtocolMessageMetrics<ProtocolMessageEnum> protocolMetrics)
throws IOException {
this.impl = impl;
+ this.scm = scm;
this.dispatcher =
new OzoneProtocolMessageDispatcher<>("ScmContainerLocation",
protocolMetrics, LOG);
}
- private boolean isLeader() throws ServiceException {
- if (!(impl instanceof SCMClientProtocolServer)) {
- throw new ServiceException("Should be SCMClientProtocolServer");
- } else {
- return ((SCMClientProtocolServer) impl).getScm().checkLeader();
- }
- }
-
@Override
public ScmContainerLocationResponse submitRequest(RpcController controller,
ScmContainerLocationRequest request) throws ServiceException {
- if (!isLeader()) {
- throw new ServiceException(new IOException("SCM IS NOT LEADER"));
+ if (!scm.getScmContext().isLeader()) {
+ throw new ServiceException(scm.getScmHAManager()
+ .getRatisServer()
+ .triggerNotLeaderException());
}
return dispatcher
.processRequest(request, this::processRequest, request.getCmdType(),
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
index 6744194..26fb806 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
@@ -330,8 +330,8 @@ public class SCMSafeModeManager implements SafeModeManager {
*/
public static class SafeModeStatus {
- private boolean safeModeStatus;
- private boolean preCheckPassed;
+ private final boolean safeModeStatus;
+ private final boolean preCheckPassed;
public SafeModeStatus(boolean safeModeState, boolean preCheckPassed) {
this.safeModeStatus = safeModeState;
@@ -345,6 +345,14 @@ public class SCMSafeModeManager implements SafeModeManager {
public boolean isPreCheckComplete() {
return preCheckPassed;
}
+
+ @Override
+ public String toString() {
+ return "SafeModeStatus{" +
+ "safeModeStatus=" + safeModeStatus +
+ ", preCheckPassed=" + preCheckPassed +
+ '}';
+ }
}
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
index 170e0ee..d17675c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
@@ -115,7 +115,7 @@ public class SCMBlockProtocolServer implements
BlockingService blockProtoPbService =
ScmBlockLocationProtocolProtos.ScmBlockLocationProtocolService
.newReflectiveBlockingService(
- new ScmBlockLocationProtocolServerSideTranslatorPB(this,
+ new ScmBlockLocationProtocolServerSideTranslatorPB(this, scm,
protocolMessageMetrics));
final InetSocketAddress scmBlockAddress = HddsServerUtil
@@ -293,10 +293,6 @@ public class SCMBlockProtocolServer implements
}
}
- public StorageContainerManager getScm() {
- return scm;
- }
-
@Override
public List<DatanodeDetails> sortDatanodes(List<String> nodes,
String clientMachine) throws IOException {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index d303705..9df8430 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -39,15 +39,12 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmOps;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos;
import org.apache.hadoop.hdds.scm.ScmInfo;
-import org.apache.hadoop.hdds.scm.ScmUtils;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
-import org.apache.hadoop.hdds.scm.safemode.SafeModePrecheck;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
@@ -61,9 +58,6 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
-import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
-import org.apache.hadoop.hdds.server.events.EventHandler;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics;
import org.apache.hadoop.io.IOUtils;
@@ -93,8 +87,7 @@ import org.slf4j.LoggerFactory;
* The RPC server that listens to requests from clients.
*/
public class SCMClientProtocolServer implements
- StorageContainerLocationProtocol, Auditor,
- EventHandler<SafeModeStatus> {
+ StorageContainerLocationProtocol, Auditor {
private static final Logger LOG =
LoggerFactory.getLogger(SCMClientProtocolServer.class);
private static final AuditLogger AUDIT =
@@ -103,14 +96,12 @@ public class SCMClientProtocolServer implements
private final InetSocketAddress clientRpcAddress;
private final StorageContainerManager scm;
private final OzoneConfiguration conf;
- private SafeModePrecheck safeModePrecheck;
private final ProtocolMessageMetrics<ProtocolMessageEnum> protocolMetrics;
public SCMClientProtocolServer(OzoneConfiguration conf,
StorageContainerManager scm) throws IOException {
this.scm = scm;
this.conf = conf;
- safeModePrecheck = new SafeModePrecheck(conf);
final int handlerCount =
conf.getInt(OZONE_SCM_HANDLER_COUNT_KEY,
OZONE_SCM_HANDLER_COUNT_DEFAULT);
@@ -126,6 +117,7 @@ public class SCMClientProtocolServer implements
BlockingService storageProtoPbService =
newReflectiveBlockingService(
new StorageContainerLocationProtocolServerSideTranslatorPB(this,
+ scm,
protocolMetrics));
final InetSocketAddress scmAddress = HddsServerUtil
@@ -187,7 +179,10 @@ public class SCMClientProtocolServer implements
public ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType
replicationType, HddsProtos.ReplicationFactor factor,
String owner) throws IOException {
- ScmUtils.preCheck(ScmOps.allocateContainer, safeModePrecheck);
+ if (scm.getScmContext().isInSafeMode()) {
+ throw new SCMException("SafeModePrecheck failed for allocateContainer",
+ ResultCodes.SAFE_MODE_EXCEPTION);
+ }
getScm().checkAdminAccess(getRpcRemoteUsername());
final ContainerInfo container = scm.getContainerManager()
@@ -229,7 +224,7 @@ public class SCMClientProtocolServer implements
final ContainerInfo container = scm.getContainerManager()
.getContainer(cid);
- if (safeModePrecheck.isInSafeMode()) {
+ if (scm.getScmContext().isInSafeMode()) {
if (container.isOpen()) {
if (!hasRequiredReplicas(container)) {
throw new SCMException("Open container " + containerID + " doesn't"
@@ -635,7 +630,7 @@ public class SCMClientProtocolServer implements
* Set safe mode status based on .
*/
public boolean getSafeModeStatus() {
- return safeModePrecheck.isInSafeMode();
+ return scm.getScmContext().isInSafeMode();
}
@@ -688,10 +683,4 @@ public class SCMClientProtocolServer implements
public void close() throws IOException {
stop();
}
-
- @Override
- public void onMessage(SafeModeStatus status,
- EventPublisher publisher) {
- safeModePrecheck.setInSafeMode(status.isInSafeMode());
- }
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMConfigurator.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMConfigurator.java
index bbea99c..7cdd5c5 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMConfigurator.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMConfigurator.java
@@ -22,6 +22,7 @@ package org.apache.hadoop.hdds.scm.server;
import org.apache.hadoop.hdds.scm.block.BlockManager;
import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
@@ -53,7 +54,8 @@ import org.apache.hadoop.hdds.security.x509.certificate.authority
* SCMSafeModeManager scmSafeModeManager;
* CertificateServer certificateServer;
* SCMMetadata scmMetadataStore;
- * SCMHAManager scmHAManager.
+ * SCMHAManager scmHAManager;
+ * SCMContext scmContext.
*
* If any of these are *not* specified then the default version of these
* managers are used by SCM.
@@ -70,6 +72,7 @@ public final class SCMConfigurator {
private SCMMetadataStore metadataStore;
private NetworkTopology networkTopology;
private SCMHAManager scmHAManager;
+ private SCMContext scmContext;
/**
* Allows user to specify a version of Node manager to use with this SCM.
@@ -161,6 +164,15 @@ public final class SCMConfigurator {
}
/**
+ * Allows user to specify a custom version of SCMContext to be
+ * used with this SCM.
+ * @param scmContext - SCMContext.
+ */
+ public void setScmContext(SCMContext scmContext) {
+ this.scmContext = scmContext;
+ }
+
+ /**
* Gets SCM Node Manager.
* @return Node Manager.
*/
@@ -239,4 +251,12 @@ public final class SCMConfigurator {
public SCMHAManager getSCMHAManager() {
return scmHAManager;
}
+
+ /**
+ * Get SCMContext.
+ * @return SCMContext.
+ */
+ public SCMContext getScmContext() {
+ return scmContext;
+ }
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 22c9649..b0145bb 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerManagerImpl;
import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SCMHAManagerImpl;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
@@ -170,6 +171,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
private SCMMetadataStore scmMetadataStore;
private SCMHAManager scmHAManager;
+ private SCMContext scmContext;
private final EventQueue eventQueue;
/*
@@ -291,11 +293,13 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
initializeSystemManagers(conf, configurator);
CloseContainerEventHandler closeContainerHandler =
- new CloseContainerEventHandler(pipelineManager, containerManager);
+ new CloseContainerEventHandler(
+ pipelineManager, containerManager, scmContext);
NodeReportHandler nodeReportHandler =
new NodeReportHandler(scmNodeManager);
PipelineReportHandler pipelineReportHandler =
- new PipelineReportHandler(scmSafeModeManager, pipelineManager, conf);
+ new PipelineReportHandler(
+ scmSafeModeManager, pipelineManager, scmContext, conf);
CommandStatusReportHandler cmdStatusReportHandler =
new CommandStatusReportHandler();
@@ -314,14 +318,15 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
new PendingDeleteHandler(scmBlockManager.getSCMBlockDeletingService());
ContainerReportHandler containerReportHandler =
- new ContainerReportHandler(scmNodeManager, containerManager, conf);
+ new ContainerReportHandler(
+ scmNodeManager, containerManager, scmContext, conf);
IncrementalContainerReportHandler incrementalContainerReportHandler =
new IncrementalContainerReportHandler(
- scmNodeManager, containerManager);
+ scmNodeManager, containerManager, scmContext);
PipelineActionHandler pipelineActionHandler =
- new PipelineActionHandler(pipelineManager, conf);
+ new PipelineActionHandler(pipelineManager, scmContext, conf);
scmAdminUsernames = conf.getTrimmedStringCollection(OzoneConfigKeys
.OZONE_ADMINISTRATORS);
@@ -357,8 +362,9 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
(DeletedBlockLogImpl) scmBlockManager.getDeletedBlockLog());
eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS, pipelineActionHandler);
eventQueue.addHandler(SCMEvents.PIPELINE_REPORT, pipelineReportHandler);
- eventQueue.addHandler(SCMEvents.SAFE_MODE_STATUS, clientProtocolServer);
- eventQueue.addHandler(SCMEvents.SAFE_MODE_STATUS, scmBlockManager);
+ eventQueue.addHandler(SCMEvents.SAFE_MODE_STATUS, scmContext);
+ // TODO:
+ // handle replicationManager and pipelineManager in ServiceManager
eventQueue
.addHandler(SCMEvents.DELAYED_SAFE_MODE_STATUS, replicationManager);
eventQueue
@@ -425,14 +431,20 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
if (configurator.getSCMHAManager() != null) {
scmHAManager = configurator.getSCMHAManager();
} else {
- scmHAManager = new SCMHAManagerImpl(conf);
+ scmHAManager = new SCMHAManagerImpl(conf, this);
+ }
+
+ if (configurator.getScmContext() != null) {
+ scmContext = configurator.getScmContext();
+ } else {
+ scmContext = new SCMContext(this);
}
if(configurator.getScmNodeManager() != null) {
scmNodeManager = configurator.getScmNodeManager();
} else {
scmNodeManager = new SCMNodeManager(
- conf, scmStorageConfig, eventQueue, clusterMap, scmHAManager);
+ conf, scmStorageConfig, eventQueue, clusterMap, scmContext);
}
placementMetrics = SCMContainerPlacementMetrics.create();
@@ -449,7 +461,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
scmHAManager,
scmNodeManager,
scmMetadataStore.getPipelineTable(),
- eventQueue);
+ eventQueue,
+ scmContext);
}
if (configurator.getContainerManager() != null) {
@@ -473,6 +486,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
containerManager,
containerPlacementPolicy,
eventQueue,
+ scmContext,
new LockManager<>(conf),
scmNodeManager);
}
@@ -1005,6 +1019,13 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
}
/**
+ * Returns SCMHAManager.
+ */
+ public SCMHAManager getScmHAManager() {
+ return scmHAManager;
+ }
+
+ /**
* Returns SCM container manager.
*/
@VisibleForTesting
@@ -1052,7 +1073,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
* @return - if the current scm is the leader.
*/
public boolean checkLeader() {
- return scmHAManager.isLeader().isPresent();
+ return scmContext.isLeader();
}
public void checkAdminAccess(String remoteUser) throws IOException {
@@ -1140,6 +1161,13 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
}
/**
+ * Returns SCMContext.
+ */
+ public SCMContext getScmContext() {
+ return scmContext;
+ }
+
+ /**
* Force SCM out of safe mode.
*/
public boolean exitSafeMode() {
@@ -1208,8 +1236,4 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
public String getClusterId() {
return getScmStorageConfig().getClusterID();
}
-
- public SCMHAManager getScmHAManager() {
- return scmHAManager;
- }
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
index 4bac431..5bfa11a 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
@@ -493,6 +494,7 @@ public final class TestUtils {
throws IOException, AuthenticationException {
SCMConfigurator configurator = new SCMConfigurator();
configurator.setSCMHAManager(MockSCMHAManager.getInstance(true));
+ configurator.setScmContext(SCMContext.emptyContext());
return getScm(conf, configurator);
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
index afb9eba..c8c8243 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
@@ -53,6 +54,7 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerV2Impl;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
+import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.server.events.EventHandler;
@@ -88,6 +90,7 @@ public class TestBlockManager {
private static HddsProtos.ReplicationFactor factor;
private static HddsProtos.ReplicationType type;
private EventQueue eventQueue;
+ private SCMContext scmContext;
private int numContainerPerOwnerInPipeline;
private OzoneConfiguration conf;
@@ -117,6 +120,7 @@ public class TestBlockManager {
scmHAManager = MockSCMHAManager.getInstance(true);
eventQueue = new EventQueue();
+ scmContext = SCMContext.emptyContext();
scmMetadataStore = new SCMMetadataStoreImpl(conf);
scmMetadataStore.start(conf);
@@ -126,7 +130,8 @@ public class TestBlockManager {
scmHAManager,
nodeManager,
scmMetadataStore.getPipelineTable(),
- eventQueue);
+ eventQueue,
+ scmContext);
pipelineManager.allowPipelineCreation();
PipelineProvider mockRatisProvider =
@@ -154,6 +159,7 @@ public class TestBlockManager {
configurator.setScmSafeModeManager(safeModeManager);
configurator.setMetadataStore(scmMetadataStore);
configurator.setSCMHAManager(scmHAManager);
+ configurator.setScmContext(scmContext);
scm = TestUtils.getScm(conf, configurator);
// Initialize these fields so that the tests can pass.
@@ -162,14 +168,13 @@ public class TestBlockManager {
DatanodeCommandHandler handler = new DatanodeCommandHandler();
eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, handler);
CloseContainerEventHandler closeContainerHandler =
- new CloseContainerEventHandler(pipelineManager, mapping);
+ new CloseContainerEventHandler(pipelineManager, mapping, scmContext);
eventQueue.addHandler(SCMEvents.CLOSE_CONTAINER, closeContainerHandler);
factor = HddsProtos.ReplicationFactor.THREE;
type = HddsProtos.ReplicationType.RATIS;
-
- blockManager.onMessage(
- new SCMSafeModeManager.SafeModeStatus(false, false), null);
+ scm.getScmContext().onMessage(
+ new SafeModeStatus(false, true), null);
}
@After
@@ -455,7 +460,7 @@ public class TestBlockManager {
@Test
public void testAllocateBlockFailureInSafeMode() throws Exception {
- blockManager.onMessage(
+ scm.getScmContext().onMessage(
new SCMSafeModeManager.SafeModeStatus(true, true), null);
// Test1: In safe mode expect an SCMException.
thrown.expectMessage("SafeModePrecheck failed for "
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
index a29dc16..6d5fee2 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
@@ -61,6 +62,7 @@ public class TestCloseContainerEventHandler {
private static long size;
private static File testDir;
private static EventQueue eventQueue;
+ private static SCMContext scmContext;
private static SCMMetadataStore scmMetadataStore;
@BeforeClass
@@ -75,6 +77,7 @@ public class TestCloseContainerEventHandler {
configuration.setInt(ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT, 16);
nodeManager = new MockNodeManager(true, 10);
eventQueue = new EventQueue();
+ scmContext = SCMContext.emptyContext();
scmMetadataStore = new SCMMetadataStoreImpl(configuration);
pipelineManager =
@@ -83,7 +86,8 @@ public class TestCloseContainerEventHandler {
MockSCMHAManager.getInstance(true),
nodeManager,
scmMetadataStore.getPipelineTable(),
- eventQueue);
+ eventQueue,
+ scmContext);
pipelineManager.allowPipelineCreation();
PipelineProvider mockRatisProvider =
@@ -98,8 +102,7 @@ public class TestCloseContainerEventHandler {
pipelineManager.triggerPipelineCreation();
eventQueue.addHandler(CLOSE_CONTAINER,
new CloseContainerEventHandler(
- pipelineManager,
- containerManager));
+ pipelineManager, containerManager, scmContext));
eventQueue.addHandler(DATANODE_COMMAND, nodeManager);
// Move all pipelines created by background from ALLOCATED to OPEN state
Thread.sleep(2000);
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java
index 10627a3..7c0c1ec 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
import org.apache.hadoop.hdds.scm.node.NodeManager;
@@ -62,6 +63,7 @@ public class TestIncrementalContainerReportHandler {
private ContainerManagerV2 containerManager;
private ContainerStateManager containerStateManager;
private EventPublisher publisher;
+ private SCMContext scmContext = SCMContext.emptyContext();
@Before
public void setup() throws IOException, InvalidStateTransitionException {
@@ -74,13 +76,12 @@ public class TestIncrementalContainerReportHandler {
NetworkTopology clusterMap = new NetworkTopologyImpl(conf);
EventQueue eventQueue = new EventQueue();
SCMStorageConfig storageConfig = new SCMStorageConfig(conf);
- this.nodeManager =
- new SCMNodeManager(conf, storageConfig, eventQueue, clusterMap);
+ this.nodeManager = new SCMNodeManager(
+ conf, storageConfig, eventQueue, clusterMap, scmContext);
this.containerStateManager = new ContainerStateManager(conf);
this.publisher = Mockito.mock(EventPublisher.class);
-
Mockito.when(containerManager.getContainer(Mockito.any(ContainerID.class)))
.thenAnswer(invocation -> containerStateManager
.getContainer((ContainerID)invocation.getArguments()[0]));
@@ -119,7 +120,8 @@ public class TestIncrementalContainerReportHandler {
@Test
public void testClosingToClosed() throws IOException {
final IncrementalContainerReportHandler reportHandler =
- new IncrementalContainerReportHandler(nodeManager, containerManager);
+ new IncrementalContainerReportHandler(
+ nodeManager, containerManager, scmContext);
final ContainerInfo container = getContainer(LifeCycleState.CLOSING);
final DatanodeDetails datanodeOne = randomDatanodeDetails();
final DatanodeDetails datanodeTwo = randomDatanodeDetails();
@@ -156,7 +158,8 @@ public class TestIncrementalContainerReportHandler {
@Test
public void testClosingToQuasiClosed() throws IOException {
final IncrementalContainerReportHandler reportHandler =
- new IncrementalContainerReportHandler(nodeManager, containerManager);
+ new IncrementalContainerReportHandler(
+ nodeManager, containerManager, scmContext);
final ContainerInfo container = getContainer(LifeCycleState.CLOSING);
final DatanodeDetails datanodeOne = randomDatanodeDetails();
final DatanodeDetails datanodeTwo = randomDatanodeDetails();
@@ -194,7 +197,8 @@ public class TestIncrementalContainerReportHandler {
@Test
public void testQuasiClosedToClosed() throws IOException {
final IncrementalContainerReportHandler reportHandler =
- new IncrementalContainerReportHandler(nodeManager, containerManager);
+ new IncrementalContainerReportHandler(
+ nodeManager, containerManager, scmContext);
final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
final DatanodeDetails datanodeOne = randomDatanodeDetails();
final DatanodeDetails datanodeTwo = randomDatanodeDetails();
@@ -235,7 +239,8 @@ public class TestIncrementalContainerReportHandler {
@Test
public void testDeleteContainer() throws IOException {
final IncrementalContainerReportHandler reportHandler =
- new IncrementalContainerReportHandler(nodeManager, containerManager);
+ new IncrementalContainerReportHandler(
+ nodeManager, containerManager, scmContext);
final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
final DatanodeDetails datanodeOne = randomDatanodeDetails();
final DatanodeDetails datanodeTwo = randomDatanodeDetails();
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
index 7b5d1fa..d926a02 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementStatusDefault;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
@@ -151,6 +152,7 @@ public class TestReplicationManager {
containerManager,
containerPlacementPolicy,
eventQueue,
+ SCMContext.emptyContext(),
new LockManager<>(conf),
nodeManager);
replicationManager.start();
@@ -164,6 +166,7 @@ public class TestReplicationManager {
containerManager,
containerPlacementPolicy,
eventQueue,
+ SCMContext.emptyContext(),
new LockManager<ContainerID>(conf),
nodeManager);
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
index b45f9c1..ba0cba5 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -98,7 +99,8 @@ public class TestSCMContainerManager {
MockSCMHAManager.getInstance(true),
nodeManager,
scmMetadataStore.getPipelineTable(),
- new EventQueue());
+ new EventQueue(),
+ SCMContext.emptyContext());
pipelineManager.allowPipelineCreation();
containerManager = new SCMContainerManager(conf,
scmMetadataStore.getContainerTable(),
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestUnknownContainerReport.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestUnknownContainerReport.java
index 9d2f590..1e9d830 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestUnknownContainerReport.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestUnknownContainerReport.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.server
@@ -103,7 +104,7 @@ public class TestUnknownContainerReport {
*/
private void sendContainerReport(OzoneConfiguration conf) {
ContainerReportHandler reportHandler = new ContainerReportHandler(
- nodeManager, containerManager, conf);
+ nodeManager, containerManager, SCMContext.emptyContext(), conf);
ContainerInfo container = getContainer(LifeCycleState.CLOSED);
Iterator<DatanodeDetails> nodeIterator = nodeManager
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMContext.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMContext.java
new file mode 100644
index 0000000..a8e4c00
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMContext.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+/**
+ * Test for SCMContext.
+ */
+public class TestSCMContext {
+ @Test
+ public void testRaftOperations() {
+ // start as follower
+ SCMContext scmContext = new SCMContext(false, 0, null, null);
+ assertFalse(scmContext.isLeader());
+
+ // become leader
+ scmContext.updateIsLeaderAndTerm(true, 10);
+ assertTrue(scmContext.isLeader());
+ try {
+ assertEquals(scmContext.getTerm(), 10);
+ } catch (NotLeaderException e) {
+ fail("Should not throw nle.");
+ }
+
+ // step down
+ scmContext.updateIsLeaderAndTerm(false, 0);
+ assertFalse(scmContext.isLeader());
+ }
+
+ @Test
+ public void testSafeModeOperations() {
+ // in safe mode
+ SCMContext scmContext = new SCMContext(
+ true, 0, new SafeModeStatus(true, false), null);
+ assertTrue(scmContext.isInSafeMode());
+ assertFalse(scmContext.isPreCheckComplete());
+
+ // out of safe mode
+ scmContext.onMessage(new SafeModeStatus(false, true), null);
+ assertFalse(scmContext.isInSafeMode());
+ assertTrue(scmContext.isPreCheckComplete());
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
index 68ec266..5bcdf4b 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdds.scm.container.SCMContainerManager;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
@@ -107,7 +108,7 @@ public class TestContainerPlacement {
Mockito.when(storageConfig.getClusterID()).thenReturn("cluster1");
SCMNodeManager nodeManager = new SCMNodeManager(config,
- storageConfig, eventQueue, null);
+ storageConfig, eventQueue, null, SCMContext.emptyContext());
return nodeManager;
}
@@ -121,7 +122,8 @@ public class TestContainerPlacement {
MockSCMHAManager.getInstance(true),
scmNodeManager,
scmMetadataStore.getPipelineTable(),
- eventQueue);
+ eventQueue,
+ SCMContext.emptyContext());
return new SCMContainerManager(config, scmMetadataStore.getContainerTable(),
scmMetadataStore.getStore(),
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeReportHandler.java
index 69b031c..78da066 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeReportHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeReportHandler.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.NodeReportFromDatanode;
@@ -58,8 +59,8 @@ public class TestNodeReportHandler implements EventPublisher {
SCMStorageConfig storageConfig = Mockito.mock(SCMStorageConfig.class);
Mockito.when(storageConfig.getClusterID()).thenReturn("cluster1");
NetworkTopology clusterMap = new NetworkTopologyImpl(conf);
- nodeManager =
- new SCMNodeManager(conf, storageConfig, new EventQueue(), clusterMap);
+ nodeManager = new SCMNodeManager(conf, storageConfig,
+ new EventQueue(), clusterMap, SCMContext.emptyContext());
nodeReportHandler = new NodeReportHandler(nodeManager);
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
index 04d1403..0e34ae5 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.EventQueue;
@@ -40,7 +41,7 @@ public class MockRatisPipelineProvider extends RatisPipelineProvider {
ConfigurationSource conf, EventPublisher eventPublisher,
boolean autoOpen) {
super(nodeManager, stateManager,
- conf, eventPublisher);
+ conf, eventPublisher, SCMContext.emptyContext());
autoOpenPipeline = autoOpen;
}
@@ -48,14 +49,14 @@ public class MockRatisPipelineProvider extends RatisPipelineProvider {
StateManager stateManager,
ConfigurationSource conf) {
super(nodeManager, stateManager,
- conf, new EventQueue());
+ conf, new EventQueue(), SCMContext.emptyContext());
}
public MockRatisPipelineProvider(
NodeManager nodeManager, StateManager stateManager,
ConfigurationSource conf, EventPublisher eventPublisher) {
super(nodeManager, stateManager,
- conf, eventPublisher);
+ conf, eventPublisher, SCMContext.emptyContext());
autoOpenPipeline = true;
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineActionHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineActionHandler.java
index 4517b89..3578718 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineActionHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineActionHandler.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineActionsProto;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineActionsFromDatanode;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
@@ -46,7 +47,7 @@ public class TestPipelineActionHandler {
.thenThrow(new PipelineNotFoundException());
final PipelineActionHandler actionHandler =
- new PipelineActionHandler(manager, null);
+ new PipelineActionHandler(manager, SCMContext.emptyContext(), null);
final PipelineActionsProto actionsProto = PipelineActionsProto.newBuilder()
.addPipelineActions(PipelineAction.newBuilder()
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
index 10d978e..c2dc995 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
@@ -99,7 +100,8 @@ public class TestPipelineManagerImpl {
MockSCMHAManager.getInstance(isLeader),
new MockNodeManager(true, 20),
SCMDBDefinition.PIPELINES.getTable(dbStore),
- new EventQueue());
+ new EventQueue(),
+ SCMContext.emptyContext());
}
@Test
@@ -337,7 +339,8 @@ public class TestPipelineManagerImpl {
pipelineManager.getPipeline(pipeline.getId()).isHealthy());
// get pipeline report from each dn in the pipeline
PipelineReportHandler pipelineReportHandler =
- new PipelineReportHandler(scmSafeModeManager, pipelineManager, conf);
+ new PipelineReportHandler(scmSafeModeManager, pipelineManager,
+ SCMContext.emptyContext(), conf);
nodes.subList(0, 2).forEach(dn -> sendPipelineReport(dn, pipeline,
pipelineReportHandler, false));
sendPipelineReport(nodes.get(nodes.size() - 1), pipeline,
@@ -446,7 +449,8 @@ public class TestPipelineManagerImpl {
new SCMSafeModeManager(new OzoneConfiguration(),
new ArrayList<>(), pipelineManager, new EventQueue());
PipelineReportHandler pipelineReportHandler =
- new PipelineReportHandler(scmSafeModeManager, pipelineManager, conf);
+ new PipelineReportHandler(scmSafeModeManager, pipelineManager,
+ SCMContext.emptyContext(), conf);
// Report pipelines with leaders
List<DatanodeDetails> nodes = pipeline.getNodes();
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
index 1e264cb..2226a43 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.metadata.PipelineIDCodec;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
@@ -238,7 +239,8 @@ public class TestSCMPipelineManager {
pipelineManager.getPipeline(pipeline.getId()).isHealthy());
// get pipeline report from each dn in the pipeline
PipelineReportHandler pipelineReportHandler =
- new PipelineReportHandler(scmSafeModeManager, pipelineManager, conf);
+ new PipelineReportHandler(scmSafeModeManager, pipelineManager,
+ SCMContext.emptyContext(), conf);
nodes.subList(0, 2).forEach(dn -> sendPipelineReport(dn, pipeline,
pipelineReportHandler, false, eventQueue));
sendPipelineReport(nodes.get(nodes.size() - 1), pipeline,
@@ -495,7 +497,8 @@ public class TestSCMPipelineManager {
new SCMSafeModeManager(new OzoneConfiguration(),
new ArrayList<>(), pipelineManager, eventQueue);
PipelineReportHandler pipelineReportHandler =
- new PipelineReportHandler(scmSafeModeManager, pipelineManager, conf);
+ new PipelineReportHandler(scmSafeModeManager, pipelineManager,
+ SCMContext.emptyContext(), conf);
// Report pipelines with leaders
List<DatanodeDetails> nodes = pipeline.getNodes();
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/TestLeaderChoosePolicy.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/TestLeaderChoosePolicy.java
index 53905e7..13b7d99 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/TestLeaderChoosePolicy.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/TestLeaderChoosePolicy.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManager;
import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineProvider;
@@ -51,7 +52,8 @@ public class TestLeaderChoosePolicy {
mock(NodeManager.class),
mock(PipelineStateManager.class),
conf,
- mock(EventPublisher.class));
+ mock(EventPublisher.class),
+ SCMContext.emptyContext());
Assert.assertSame(
ratisPipelineProvider.getLeaderChoosePolicy().getClass(),
DefaultLeaderChoosePolicy.class);
@@ -67,7 +69,8 @@ public class TestLeaderChoosePolicy {
mock(NodeManager.class),
mock(PipelineStateManager.class),
conf,
- mock(EventPublisher.class));
+ mock(EventPublisher.class),
+ SCMContext.emptyContext());
// expecting exception
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
index ee1f06c..19f1f30 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
@@ -77,7 +78,8 @@ public class TestHealthyPipelineSafeModeRule {
MockSCMHAManager.getInstance(true),
nodeManager,
scmMetadataStore.getPipelineTable(),
- eventQueue);
+ eventQueue,
+ SCMContext.emptyContext());
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
pipelineManager.getStateManager(), config);
@@ -126,7 +128,8 @@ public class TestHealthyPipelineSafeModeRule {
MockSCMHAManager.getInstance(true),
nodeManager,
scmMetadataStore.getPipelineTable(),
- eventQueue);
+ eventQueue,
+ SCMContext.emptyContext());
pipelineManager.allowPipelineCreation();
PipelineProvider mockRatisProvider =
@@ -220,7 +223,8 @@ public class TestHealthyPipelineSafeModeRule {
MockSCMHAManager.getInstance(true),
nodeManager,
scmMetadataStore.getPipelineTable(),
- eventQueue);
+ eventQueue,
+ SCMContext.emptyContext());
pipelineManager.allowPipelineCreation();
PipelineProvider mockRatisProvider =
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
index 5e41289..b915899 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
@@ -87,7 +88,8 @@ public class TestOneReplicaPipelineSafeModeRule {
MockSCMHAManager.getInstance(true),
mockNodeManager,
scmMetadataStore.getPipelineTable(),
- eventQueue);
+ eventQueue,
+ SCMContext.emptyContext());
pipelineManager.allowPipelineCreation();
PipelineProvider mockRatisProvider =
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
index 0734eea..e8dbc2e 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
@@ -71,6 +72,7 @@ import org.mockito.Mockito;
public class TestSCMSafeModeManager {
private static EventQueue queue;
+ private SCMContext scmContext;
private SCMSafeModeManager scmSafeModeManager;
private static OzoneConfiguration config;
private List<ContainerInfo> containers = Collections.emptyList();
@@ -86,6 +88,7 @@ public class TestSCMSafeModeManager {
@Before
public void setUp() {
queue = new EventQueue();
+ scmContext = SCMContext.emptyContext();
config = new OzoneConfiguration();
config.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION,
false);
@@ -307,7 +310,8 @@ public class TestSCMSafeModeManager {
MockSCMHAManager.getInstance(true),
mockNodeManager,
scmMetadataStore.getPipelineTable(),
- queue);
+ queue,
+ scmContext);
scmSafeModeManager = new SCMSafeModeManager(
conf, containers, pipelineManager, queue);
fail("testFailWithIncorrectValueForHealthyPipelinePercent");
@@ -330,7 +334,8 @@ public class TestSCMSafeModeManager {
MockSCMHAManager.getInstance(true),
mockNodeManager,
scmMetadataStore.getPipelineTable(),
- queue);
+ queue,
+ scmContext);
scmSafeModeManager = new SCMSafeModeManager(
conf, containers, pipelineManager, queue);
fail("testFailWithIncorrectValueForOneReplicaPipelinePercent");
@@ -352,7 +357,8 @@ public class TestSCMSafeModeManager {
MockSCMHAManager.getInstance(true),
mockNodeManager,
scmMetadataStore.getPipelineTable(),
- queue);
+ queue,
+ scmContext);
scmSafeModeManager = new SCMSafeModeManager(
conf, containers, pipelineManager, queue);
fail("testFailWithIncorrectValueForSafeModePercent");
@@ -381,7 +387,8 @@ public class TestSCMSafeModeManager {
MockSCMHAManager.getInstance(true),
mockNodeManager,
scmMetadataStore.getPipelineTable(),
- queue);
+ queue,
+ scmContext);
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(mockNodeManager,
pipelineManager.getStateManager(), config);
@@ -631,7 +638,8 @@ public class TestSCMSafeModeManager {
MockSCMHAManager.getInstance(true),
nodeManager,
scmMetadataStore.getPipelineTable(),
- queue);
+ queue,
+ scmContext);
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
@@ -694,7 +702,8 @@ public class TestSCMSafeModeManager {
MockSCMHAManager.getInstance(true),
nodeManager,
scmMetadataStore.getPipelineTable(),
- queue);
+ queue,
+ scmContext);
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMBlockProtocolServer.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMBlockProtocolServer.java
index a87dde9..b2201e5 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMBlockProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMBlockProtocolServer.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocolServerSideTranslatorPB;
@@ -60,6 +61,7 @@ public class TestSCMBlockProtocolServer {
config.set(HddsConfigKeys.OZONE_METADATA_DIRS, dir.toString());
SCMConfigurator configurator = new SCMConfigurator();
configurator.setSCMHAManager(MockSCMHAManager.getInstance(true));
+ configurator.setScmContext(SCMContext.emptyContext());
scm = TestUtils.getScm(config, configurator);
scm.start();
scm.exitSafeMode();
@@ -70,7 +72,7 @@ public class TestSCMBlockProtocolServer {
}
server = scm.getBlockProtocolServer();
- service = new ScmBlockLocationProtocolServerSideTranslatorPB(server,
+ service = new ScmBlockLocationProtocolServerSideTranslatorPB(server, scm,
Mockito.mock(ProtocolMessageMetrics.class));
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodeMetrics.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodeMetrics.java
index fd65283..ab57b29 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodeMetrics.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodeMetrics.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
import org.apache.hadoop.hdds.scm.node.SCMNodeMetrics;
@@ -63,7 +64,7 @@ public class TestSCMNodeMetrics {
SCMStorageConfig config =
new SCMStorageConfig(NodeType.DATANODE, new File("/tmp"), "storage");
nodeManager = new SCMNodeManager(source, config, publisher,
- new NetworkTopologyImpl(source));
+ new NetworkTopologyImpl(source), SCMContext.emptyContext());
registeredDatanode = DatanodeDetails.newBuilder()
.setHostName("localhost")
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
index 7721ac3..efbf993 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineActionsFromDatanode;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
@@ -176,7 +177,8 @@ public class TestPipelineClose {
ratisContainer.getPipeline().getId());
// send closing action for pipeline
PipelineActionHandler pipelineActionHandler =
- new PipelineActionHandler(pipelineManager, conf);
+ new PipelineActionHandler(
+ pipelineManager, SCMContext.emptyContext(), conf);
pipelineActionHandler
.onMessage(pipelineActionsFromDatanode, new EventQueue());
Thread.sleep(5000);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java
index d5768aa..5e08f2d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.test.GenericTestUtils;
import static org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer
@@ -137,10 +138,13 @@ public class TestContainerReplication {
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
//WHEN: send the order to replicate the container
+ SCMCommand<?> command = new ReplicateContainerCommand(containerId,
+ sourcePipelines.getNodes());
+ command.setTerm(
+ cluster.getStorageContainerManager().getScmContext().getTerm());
cluster.getStorageContainerManager().getScmNodeManager()
.addDatanodeCommand(destinationDatanode.getDatanodeDetails().getUuid(),
- new ReplicateContainerCommand(containerId,
- sourcePipelines.getNodes()));
+ command);
DatanodeStateMachine destinationDatanodeDatanodeStateMachine =
destinationDatanode.getDatanodeStateMachine();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
index 853f2cd..00a338b 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
import org.junit.AfterClass;
@@ -143,9 +144,12 @@ public class TestCloseContainerByPipeline {
.getCloseContainerHandler();
int lastInvocationCount = closeContainerHandler.getInvocationCount();
//send the order to close the container
+ SCMCommand<?> command = new CloseContainerCommand(
+ containerID, pipeline.getId());
+ command.setTerm(
+ cluster.getStorageContainerManager().getScmContext().getTerm());
cluster.getStorageContainerManager().getScmNodeManager()
- .addDatanodeCommand(datanodeDetails.getUuid(),
- new CloseContainerCommand(containerID, pipeline.getId()));
+ .addDatanodeCommand(datanodeDetails.getUuid(), command);
GenericTestUtils
.waitFor(() -> isContainerClosed(cluster, containerID, datanodeDetails),
500, 5 * 1000);
@@ -191,9 +195,12 @@ public class TestCloseContainerByPipeline {
// Send the order to close the container, give random pipeline id so that
// the container will not be closed via RATIS
+ SCMCommand<?> command = new CloseContainerCommand(
+ containerID, pipeline.getId());
+ command.setTerm(
+ cluster.getStorageContainerManager().getScmContext().getTerm());
cluster.getStorageContainerManager().getScmNodeManager()
- .addDatanodeCommand(datanodeDetails.getUuid(),
- new CloseContainerCommand(containerID, pipeline.getId()));
+ .addDatanodeCommand(datanodeDetails.getUuid(), command);
//double check if it's really closed (waitFor also throws an exception)
// TODO: change the below line after implementing QUASI_CLOSED to CLOSED
@@ -242,9 +249,12 @@ public class TestCloseContainerByPipeline {
for (DatanodeDetails details : datanodes) {
Assert.assertFalse(isContainerClosed(cluster, containerID, details));
//send the order to close the container
+ SCMCommand<?> command = new CloseContainerCommand(
+ containerID, pipeline.getId());
+ command.setTerm(
+ cluster.getStorageContainerManager().getScmContext().getTerm());
cluster.getStorageContainerManager().getScmNodeManager()
- .addDatanodeCommand(details.getUuid(),
- new CloseContainerCommand(containerID, pipeline.getId()));
+ .addDatanodeCommand(details.getUuid(), command);
int index = cluster.getHddsDatanodeIndex(details);
Container dnContainer = cluster.getHddsDatanodes().get(index)
.getDatanodeStateMachine().getContainer().getContainerSet()
@@ -319,9 +329,12 @@ public class TestCloseContainerByPipeline {
// Send close container command from SCM to datanode with forced flag as
// true
+ SCMCommand<?> command = new CloseContainerCommand(
+ containerID, pipeline.getId(), true);
+ command.setTerm(
+ cluster.getStorageContainerManager().getScmContext().getTerm());
cluster.getStorageContainerManager().getScmNodeManager()
- .addDatanodeCommand(datanodeDetails.getUuid(),
- new CloseContainerCommand(containerID, pipeline.getId(), true));
+ .addDatanodeCommand(datanodeDetails.getUuid(), command);
GenericTestUtils
.waitFor(() -> isContainerClosed(
cluster, containerID, datanodeDetails), 500, 5 * 1000);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
index 8bd054b..870486a 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.test.GenericTestUtils;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE;
@@ -119,9 +120,12 @@ public class TestCloseContainerHandler {
DatanodeDetails datanodeDetails =
cluster.getHddsDatanodes().get(0).getDatanodeDetails();
//send the order to close the container
+ SCMCommand<?> command = new CloseContainerCommand(
+ containerId.getId(), pipeline.getId());
+ command.setTerm(
+ cluster.getStorageContainerManager().getScmContext().getTerm());
cluster.getStorageContainerManager().getScmNodeManager()
- .addDatanodeCommand(datanodeDetails.getUuid(),
- new CloseContainerCommand(containerId.getId(), pipeline.getId()));
+ .addDatanodeCommand(datanodeDetails.getUuid(), command);
GenericTestUtils.waitFor(() ->
isContainerClosed(cluster, containerId.getId()),
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerHandler.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerHandler.java
index 61c3369..40998e1 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerHandler.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerHandler.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -132,8 +133,11 @@ public class TestDeleteContainerHandler {
cluster.getStorageContainerManager().getScmNodeManager();
//send the order to close the container
- nodeManager.addDatanodeCommand(datanodeDetails.getUuid(),
- new CloseContainerCommand(containerId.getId(), pipeline.getId()));
+ SCMCommand<?> command = new CloseContainerCommand(
+ containerId.getId(), pipeline.getId());
+ command.setTerm(
+ cluster.getStorageContainerManager().getScmContext().getTerm());
+ nodeManager.addDatanodeCommand(datanodeDetails.getUuid(), command);
GenericTestUtils.waitFor(() ->
isContainerClosed(hddsDatanodeService, containerId.getId()),
@@ -148,8 +152,10 @@ public class TestDeleteContainerHandler {
containerId.getId()));
// send delete container to the datanode
- nodeManager.addDatanodeCommand(datanodeDetails.getUuid(),
- new DeleteContainerCommand(containerId.getId(), false));
+ command = new DeleteContainerCommand(containerId.getId(), false);
+ command.setTerm(
+ cluster.getStorageContainerManager().getScmContext().getTerm());
+ nodeManager.addDatanodeCommand(datanodeDetails.getUuid(), command);
GenericTestUtils.waitFor(() ->
isContainerDeleted(hddsDatanodeService, containerId.getId()),
@@ -183,8 +189,11 @@ public class TestDeleteContainerHandler {
cluster.getStorageContainerManager().getScmNodeManager();
// Send delete container command with force flag set to false.
- nodeManager.addDatanodeCommand(datanodeDetails.getUuid(),
- new DeleteContainerCommand(containerId.getId(), false));
+ SCMCommand<?> command = new DeleteContainerCommand(
+ containerId.getId(), false);
+ command.setTerm(
+ cluster.getStorageContainerManager().getScmContext().getTerm());
+ nodeManager.addDatanodeCommand(datanodeDetails.getUuid(), command);
// Here it should not delete it, and the container should exist in the
// containerset
@@ -205,9 +214,10 @@ public class TestDeleteContainerHandler {
// Now delete container with force flag set to true. now it should delete
// container
-
- nodeManager.addDatanodeCommand(datanodeDetails.getUuid(),
- new DeleteContainerCommand(containerId.getId(), true));
+ command = new DeleteContainerCommand(containerId.getId(), true);
+ command.setTerm(
+ cluster.getStorageContainerManager().getScmContext().getTerm());
+ nodeManager.addDatanodeCommand(datanodeDetails.getUuid(), command);
GenericTestUtils.waitFor(() ->
isContainerDeleted(hddsDatanodeService, containerId.getId()),
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
index 1bb0a0b..5f9cc2c 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
import org.apache.hadoop.hdds.scm.net.NodeSchema;
@@ -172,6 +173,7 @@ public class TestKeyManagerImpl {
configurator.setScmNodeManager(nodeManager);
configurator.setNetworkTopology(clusterMap);
configurator.setSCMHAManager(MockSCMHAManager.getInstance(true));
+ configurator.setScmContext(SCMContext.emptyContext());
scm = TestUtils.getScm(conf, configurator);
scm.start();
scm.exitSafeMode();
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java
index 77e5a38..0cc4926 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.IncrementalContainerReportHandler;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
@@ -44,8 +45,8 @@ public class ReconIncrementalContainerReportHandler
ReconIncrementalContainerReportHandler.class);
public ReconIncrementalContainerReportHandler(NodeManager nodeManager,
- ContainerManagerV2 containerManager) {
- super(nodeManager, containerManager);
+ ContainerManagerV2 containerManager, SCMContext scmContext) {
+ super(nodeManager, containerManager, scmContext);
}
@Override
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java
index d7a6104..624d782 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java
@@ -28,6 +28,7 @@ import java.util.UUID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
@@ -66,7 +67,8 @@ public class ReconNodeManager extends SCMNodeManager {
EventPublisher eventPublisher,
NetworkTopology networkTopology,
Table<UUID, DatanodeDetails> nodeDB) {
- super(conf, scmStorageConfig, eventPublisher, networkTopology);
+ super(conf, scmStorageConfig, eventPublisher, networkTopology,
+ SCMContext.emptyContext());
this.nodeDB = nodeDB;
loadExistingNodes();
}
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineReportHandler.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineReportHandler.java
index 246d9ba..589de00 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineReportHandler.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineReportHandler.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
@@ -46,9 +47,10 @@ public class ReconPipelineReportHandler extends PipelineReportHandler {
public ReconPipelineReportHandler(SafeModeManager scmSafeModeManager,
PipelineManager pipelineManager,
+ SCMContext scmContext,
ConfigurationSource conf,
StorageContainerServiceProvider scmServiceProvider) {
- super(scmSafeModeManager, pipelineManager, conf);
+ super(scmSafeModeManager, pipelineManager, scmContext, conf);
this.scmServiceProvider = scmServiceProvider;
}
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
index 14afa2f..a406779 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdds.scm.container.ReplicationManager;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicyFactory;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementMetrics;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
import org.apache.hadoop.hdds.scm.node.DeadNodeHandler;
@@ -78,6 +79,7 @@ public class ReconStorageContainerManagerFacade
private final OzoneConfiguration ozoneConfiguration;
private final ReconDatanodeProtocolServer datanodeProtocolServer;
private final EventQueue eventQueue;
+ private final SCMContext scmContext;
private final SCMStorageConfig scmStorageConfig;
private final DBStore dbStore;
@@ -98,6 +100,7 @@ public class ReconStorageContainerManagerFacade
throws IOException {
this.eventQueue = new EventQueue();
eventQueue.setSilent(true);
+ this.scmContext = SCMContext.emptyContext();
this.ozoneConfiguration = getReconScmConfiguration(conf);
this.scmStorageConfig = new ReconStorageConfig(conf);
this.clusterMap = new NetworkTopologyImpl(conf);
@@ -131,11 +134,11 @@ public class ReconStorageContainerManagerFacade
SafeModeManager safeModeManager = new ReconSafeModeManager();
ReconPipelineReportHandler pipelineReportHandler =
- new ReconPipelineReportHandler(
- safeModeManager, pipelineManager, conf, scmServiceProvider);
+ new ReconPipelineReportHandler(safeModeManager,
+ pipelineManager, scmContext, conf, scmServiceProvider);
PipelineActionHandler pipelineActionHandler =
- new PipelineActionHandler(pipelineManager, conf);
+ new PipelineActionHandler(pipelineManager, scmContext, conf);
StaleNodeHandler staleNodeHandler =
new StaleNodeHandler(nodeManager, pipelineManager, conf);
@@ -146,10 +149,11 @@ public class ReconStorageContainerManagerFacade
new ReconContainerReportHandler(nodeManager, containerManager);
IncrementalContainerReportHandler icrHandler =
- new ReconIncrementalContainerReportHandler(nodeManager,
- containerManager);
+ new ReconIncrementalContainerReportHandler(nodeManager,
+ containerManager, scmContext);
CloseContainerEventHandler closeContainerHandler =
- new CloseContainerEventHandler(pipelineManager, containerManager);
+ new CloseContainerEventHandler(
+ pipelineManager, containerManager, scmContext);
ContainerActionsHandler actionsHandler = new ContainerActionsHandler();
ReconNewNodeHandler newNodeHandler = new ReconNewNodeHandler(nodeManager);
diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java
index 15d16cb..f3592bc 100644
--- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java
+++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
import org.apache.hadoop.hdds.scm.node.NodeManager;
@@ -76,8 +77,8 @@ public class AbstractReconContainerManagerTest {
scmStorageConfig = new ReconStorageConfig(conf);
NetworkTopology clusterMap = new NetworkTopologyImpl(conf);
EventQueue eventQueue = new EventQueue();
- NodeManager nodeManager =
- new SCMNodeManager(conf, scmStorageConfig, eventQueue, clusterMap);
+ NodeManager nodeManager = new SCMNodeManager(conf, scmStorageConfig,
+ eventQueue, clusterMap, SCMContext.emptyContext());
pipelineManager = new ReconPipelineManager(conf, nodeManager,
ReconSCMDBDefinition.PIPELINES.getTable(store), eventQueue);
containerManager = new ReconContainerManager(
diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconIncrementalContainerReportHandler.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconIncrementalContainerReportHandler.java
index 064c85e..0b0bce5 100644
--- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconIncrementalContainerReportHandler.java
+++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconIncrementalContainerReportHandler.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
import org.apache.hadoop.hdds.scm.node.NodeManager;
@@ -83,14 +84,14 @@ public class TestReconIncrementalContainerReportHandler
NetworkTopology clusterMap = new NetworkTopologyImpl(conf);
EventQueue eventQueue = new EventQueue();
SCMStorageConfig storageConfig = new SCMStorageConfig(conf);
- NodeManager nodeManager =
- new SCMNodeManager(conf, storageConfig, eventQueue, clusterMap);
+ NodeManager nodeManager = new SCMNodeManager(conf, storageConfig,
+ eventQueue, clusterMap, SCMContext.emptyContext());
nodeManager.register(datanodeDetails, null, null);
ReconContainerManager containerManager = getContainerManager();
ReconIncrementalContainerReportHandler reconIcr =
new ReconIncrementalContainerReportHandler(nodeManager,
- null);
+ null, SCMContext.emptyContext());
EventPublisher eventPublisherMock = mock(EventPublisher.class);
reconIcr.onMessage(reportMock, eventPublisherMock);
@@ -133,7 +134,7 @@ public class TestReconIncrementalContainerReportHandler
when(reportMock.getReport()).thenReturn(containerReport);
ReconIncrementalContainerReportHandler reconIcr =
new ReconIncrementalContainerReportHandler(nodeManagerMock,
- null);
+ null, SCMContext.emptyContext());
reconIcr.onMessage(reportMock, mock(EventPublisher.class));
assertTrue(containerManager.containerExist(containerID));
diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconPipelineManager.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconPipelineManager.java
index b190810..2c4226f 100644
--- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconPipelineManager.java
+++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconPipelineManager.java
@@ -26,6 +26,7 @@ import java.util.List;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
import org.apache.hadoop.hdds.scm.node.NodeManager;
@@ -109,8 +110,8 @@ public class TestReconPipelineManager {
NetworkTopology clusterMap = new NetworkTopologyImpl(conf);
EventQueue eventQueue = new EventQueue();
- NodeManager nodeManager =
- new SCMNodeManager(conf, scmStorageConfig, eventQueue, clusterMap);
+ NodeManager nodeManager = new SCMNodeManager(conf, scmStorageConfig,
+ eventQueue, clusterMap, SCMContext.emptyContext());
try (ReconPipelineManager reconPipelineManager =
new ReconPipelineManager(conf, nodeManager,
@@ -145,8 +146,8 @@ public class TestReconPipelineManager {
Pipeline pipeline = getRandomPipeline();
NetworkTopology clusterMap = new NetworkTopologyImpl(conf);
EventQueue eventQueue = new EventQueue();
- NodeManager nodeManager =
- new SCMNodeManager(conf, scmStorageConfig, eventQueue, clusterMap);
+ NodeManager nodeManager = new SCMNodeManager(conf, scmStorageConfig,
+ eventQueue, clusterMap, SCMContext.emptyContext());
ReconPipelineManager reconPipelineManager =
new ReconPipelineManager(conf, nodeManager,
diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconPipelineReportHandler.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconPipelineReportHandler.java
index e72df36..fcf67a4 100644
--- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconPipelineReportHandler.java
+++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconPipelineReportHandler.java
@@ -29,6 +29,7 @@ import java.io.IOException;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.server.events.EventPublisher;
@@ -62,7 +63,8 @@ public class TestReconPipelineReportHandler {
ReconPipelineReportHandler handler =
new ReconPipelineReportHandler(new ReconSafeModeManager(),
- reconPipelineManagerMock, configuration, scmServiceProviderMock);
+ reconPipelineManagerMock, SCMContext.emptyContext(),
+ configuration, scmServiceProviderMock);
EventPublisher eventPublisherMock = mock(EventPublisher.class);
PipelineReport report = mock(PipelineReport.class);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org