You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by na...@apache.org on 2019/04/23 12:06:11 UTC
[hadoop] branch trunk updated: HDDS-1368. Cleanup old
ReplicationManager code from SCM.
This is an automated email from the ASF dual-hosted git repository.
nanda pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7e1f8d3 HDDS-1368. Cleanup old ReplicationManager code from SCM.
7e1f8d3 is described below
commit 7e1f8d3a1b7b48d3debcce1d7096ed4c46fdeb0f
Author: Nanda kumar <na...@apache.org>
AuthorDate: Tue Apr 23 17:35:39 2019 +0530
HDDS-1368. Cleanup old ReplicationManager code from SCM.
---
.../common/statemachine/StateContext.java | 32 +-
.../DeleteContainerCommandHandler.java | 4 -
.../ReplicateContainerCommandHandler.java | 28 +-
.../scm/command/CommandStatusReportHandler.java | 43 +--
.../container/DeleteContainerCommandWatcher.java | 56 ---
.../replication/ReplicationCommandWatcher.java | 56 ---
.../container/replication/ReplicationManager.java | 384 ---------------------
.../container/replication/ReplicationQueue.java | 73 ----
.../container/replication/ReplicationRequest.java | 123 -------
.../apache/hadoop/hdds/scm/events/SCMEvents.java | 65 +---
.../hadoop/hdds/scm/node/DeadNodeHandler.java | 186 ++++++----
.../hadoop/hdds/scm/node/SCMNodeManager.java | 4 +-
.../hdds/scm/server/StorageContainerManager.java | 2 +-
.../hadoop/hdds/scm/block/TestBlockManager.java | 11 +-
.../command/TestCommandStatusReportHandler.java | 5 -
.../replication/TestReplicationManager.java | 290 ----------------
.../replication/TestReplicationQueue.java | 134 -------
.../hadoop/hdds/scm/node/TestDeadNodeHandler.java | 63 +---
.../hadoop/hdds/scm/node/TestStatisticsUpdate.java | 4 +-
.../ozone/container/common/TestEndPoint.java | 13 +-
20 files changed, 150 insertions(+), 1426 deletions(-)
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 7e06473..56151f8 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -19,10 +19,11 @@ package org.apache.hadoop.ozone.container.common.statemachine;
import com.google.common.base.Preconditions;
import com.google.protobuf.GeneratedMessage;
import java.util.Map;
-import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineAction;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerAction;
@@ -34,8 +35,6 @@ import org.apache.hadoop.ozone.container.common.states.datanode
import org.apache.hadoop.ozone.container.common.states.datanode
.RunningDatanodeState;
import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
-import org.apache.hadoop.ozone.protocol.commands.CommandStatus
- .CommandStatusBuilder;
import org.apache.hadoop.ozone.protocol.commands
.DeleteBlockCommandStatus.DeleteBlockCommandStatusBuilder;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
@@ -432,27 +431,14 @@ public class StateContext {
* @param cmd - {@link SCMCommand}.
*/
public void addCmdStatus(SCMCommand cmd) {
- final Optional<CommandStatusBuilder> cmdStatusBuilder;
- switch (cmd.getType()) {
- case replicateContainerCommand:
- cmdStatusBuilder = Optional.of(CommandStatusBuilder.newBuilder());
- break;
- case deleteBlocksCommand:
- cmdStatusBuilder = Optional.of(
- DeleteBlockCommandStatusBuilder.newBuilder());
- break;
- case deleteContainerCommand:
- cmdStatusBuilder = Optional.of(CommandStatusBuilder.newBuilder());
- break;
- default:
- cmdStatusBuilder = Optional.empty();
+ if (cmd.getType() == SCMCommandProto.Type.deleteBlocksCommand) {
+ addCmdStatus(cmd.getId(),
+ DeleteBlockCommandStatusBuilder.newBuilder()
+ .setCmdId(cmd.getId())
+ .setStatus(Status.PENDING)
+ .setType(cmd.getType())
+ .build());
}
- cmdStatusBuilder.ifPresent(statusBuilder ->
- addCmdStatus(cmd.getId(), statusBuilder
- .setCmdId(cmd.getId())
- .setStatus(Status.PENDING)
- .setType(cmd.getType())
- .build()));
}
/**
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java
index 4a6787e..b54fb1a 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java
@@ -56,11 +56,7 @@ public class DeleteContainerCommandHandler implements CommandHandler {
final ContainerController controller = ozoneContainer.getController();
controller.deleteContainer(deleteContainerCommand.getContainerID(),
deleteContainerCommand.isForce());
- updateCommandStatus(context, command,
- (cmdStatus) -> cmdStatus.setStatus(true), LOG);
} catch (IOException e) {
- updateCommandStatus(context, command,
- (cmdStatus) -> cmdStatus.setStatus(false), LOG);
LOG.error("Exception occurred while deleting the container.", e);
} finally {
totalTime += Time.monotonicNow() - startTime;
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
index 81d162d..a028041 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
@@ -61,25 +61,17 @@ public class ReplicateContainerCommandHandler implements CommandHandler {
public void handle(SCMCommand command, OzoneContainer container,
StateContext context, SCMConnectionManager connectionManager) {
- ReplicateContainerCommand replicateCommand =
+ final ReplicateContainerCommand replicateCommand =
(ReplicateContainerCommand) command;
- try {
- List<DatanodeDetails> sourceDatanodes =
- replicateCommand.getSourceDatanodes();
- long containerID = replicateCommand.getContainerID();
-
- Preconditions.checkArgument(sourceDatanodes.size() > 0,
- String.format("Replication command is received for container %d "
- + "but the size of source datanodes was 0.", containerID));
-
- ReplicationTask replicationTask =
- new ReplicationTask(containerID, sourceDatanodes);
- supervisor.addTask(replicationTask);
-
- } finally {
- updateCommandStatus(context, command,
- (cmdStatus) -> cmdStatus.setStatus(true), LOG);
- }
+ final List<DatanodeDetails> sourceDatanodes =
+ replicateCommand.getSourceDatanodes();
+ final long containerID = replicateCommand.getContainerID();
+
+ Preconditions.checkArgument(sourceDatanodes.size() > 0,
+ String.format("Replication command is received for container %d "
+ + "but the size of source datanodes was 0.", containerID));
+
+ supervisor.addTask(new ReplicationTask(containerID, sourceDatanodes));
}
@Override
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
index 0ef02a3..d1479f7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
@@ -19,8 +19,9 @@ package org.apache.hadoop.hdds.scm.command;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.CommandStatus;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.CommandStatusReportFromDatanode;
@@ -54,32 +55,14 @@ public class CommandStatusReportHandler implements
cmdStatusList.forEach(cmdStatus -> {
LOGGER.trace("Emitting command status for id:{} type: {}", cmdStatus
.getCmdId(), cmdStatus.getType());
- switch (cmdStatus.getType()) {
- case replicateContainerCommand:
- publisher.fireEvent(SCMEvents.REPLICATION_STATUS, new
- ReplicationStatus(cmdStatus));
- if (cmdStatus.getStatus() == CommandStatus.Status.EXECUTED) {
- publisher.fireEvent(SCMEvents.REPLICATION_COMPLETE,
- new ReplicationManager.ReplicationCompleted(
- cmdStatus.getCmdId()));
- }
- break;
- case deleteBlocksCommand:
+ if (cmdStatus.getType() == SCMCommandProto.Type.deleteBlocksCommand) {
if (cmdStatus.getStatus() == CommandStatus.Status.EXECUTED) {
publisher.fireEvent(SCMEvents.DELETE_BLOCK_STATUS,
new DeleteBlockStatus(cmdStatus));
}
- break;
- case deleteContainerCommand:
- if (cmdStatus.getStatus() == CommandStatus.Status.EXECUTED) {
- publisher.fireEvent(SCMEvents.DELETE_CONTAINER_COMMAND_COMPLETE,
- new ReplicationManager.DeleteContainerCommandCompleted(
- cmdStatus.getCmdId()));
- }
- default:
+ } else {
LOGGER.debug("CommandStatus of type:{} not handled in " +
"CommandStatusReportHandler.", cmdStatus.getType());
- break;
}
});
}
@@ -110,24 +93,6 @@ public class CommandStatusReportHandler implements
}
/**
- * Wrapper event for Replicate Command.
- */
- public static class ReplicationStatus extends CommandStatusEvent {
- public ReplicationStatus(CommandStatus cmdStatus) {
- super(cmdStatus);
- }
- }
-
- /**
- * Wrapper event for CloseContainer Command.
- */
- public static class CloseContainerStatus extends CommandStatusEvent {
- public CloseContainerStatus(CommandStatus cmdStatus) {
- super(cmdStatus);
- }
- }
-
- /**
* Wrapper event for DeleteBlock Command.
*/
public static class DeleteBlockStatus extends CommandStatusEvent {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/DeleteContainerCommandWatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/DeleteContainerCommandWatcher.java
deleted file mode 100644
index 0b1e4c8..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/DeleteContainerCommandWatcher.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.hdds.scm.container;
-
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
- .DeletionRequestToRepeat;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
- .DeleteContainerCommandCompleted;
-import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.server.events.Event;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.hdds.server.events.EventWatcher;
-import org.apache.hadoop.ozone.lease.LeaseManager;
-
-/**
- * Command watcher to track the delete container commands.
- */
-public class DeleteContainerCommandWatcher extends
- EventWatcher<DeletionRequestToRepeat, DeleteContainerCommandCompleted> {
-
- public DeleteContainerCommandWatcher(
- Event<DeletionRequestToRepeat> startEvent,
- Event<DeleteContainerCommandCompleted> completionEvent,
- LeaseManager<Long> leaseManager) {
- super(startEvent, completionEvent, leaseManager);
- }
-
- @Override
- protected void onTimeout(EventPublisher publisher,
- DeletionRequestToRepeat payload) {
- //put back to the original queue
- publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER,
- payload.getRequest());
- }
-
-
- @Override
- protected void onFinished(EventPublisher publisher,
- DeletionRequestToRepeat payload) {
-
- }
-}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationCommandWatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationCommandWatcher.java
deleted file mode 100644
index 03a81a7..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationCommandWatcher.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.hdds.scm.container.replication;
-
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
- .ReplicationCompleted;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
- .ReplicationRequestToRepeat;
-import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.server.events.Event;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.hdds.server.events.EventWatcher;
-import org.apache.hadoop.ozone.lease.LeaseManager;
-
-/**
- * Command watcher to track the replication commands.
- */
-public class ReplicationCommandWatcher
- extends
- EventWatcher<ReplicationManager.ReplicationRequestToRepeat,
- ReplicationManager.ReplicationCompleted> {
-
- public ReplicationCommandWatcher(Event<ReplicationRequestToRepeat> startEvent,
- Event<ReplicationCompleted> completionEvent,
- LeaseManager<Long> leaseManager) {
- super(startEvent, completionEvent, leaseManager);
- }
-
- @Override
- protected void onTimeout(EventPublisher publisher,
- ReplicationRequestToRepeat payload) {
- //put back to the original queue
- publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER,
- payload.getRequest());
- }
-
- @Override
- protected void onFinished(EventPublisher publisher,
- ReplicationRequestToRepeat payload) {
-
- }
-}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
deleted file mode 100644
index b904666..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ /dev/null
@@ -1,384 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.hdds.scm.container.replication;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ThreadFactory;
-import java.util.stream.Collectors;
-
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
-import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.ContainerManager;
-import org.apache.hadoop.hdds.scm.container.ContainerReplica;
-import org.apache.hadoop.hdds.scm.container.DeleteContainerCommandWatcher;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms
- .ContainerPlacementPolicy;
-import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.hdds.server.events.EventQueue;
-import org.apache.hadoop.hdds.server.events.IdentifiableEventPayload;
-import org.apache.hadoop.ozone.lease.LeaseManager;
-import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
-import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
-import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import static org.apache.hadoop.hdds.scm.events.SCMEvents
- .TRACK_DELETE_CONTAINER_COMMAND;
-import static org.apache.hadoop.hdds.scm.events.SCMEvents
- .TRACK_REPLICATE_COMMAND;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Replication Manager manages the replication of the closed container.
- */
-public class ReplicationManager implements Runnable {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(ReplicationManager.class);
-
- private ReplicationQueue replicationQueue;
-
- private ContainerPlacementPolicy containerPlacement;
-
- private EventPublisher eventPublisher;
-
- private ReplicationCommandWatcher replicationCommandWatcher;
- private DeleteContainerCommandWatcher deleteContainerCommandWatcher;
-
- private boolean running = true;
-
- private ContainerManager containerManager;
-
- public ReplicationManager(ContainerPlacementPolicy containerPlacement,
- ContainerManager containerManager, EventQueue eventQueue,
- LeaseManager<Long> commandWatcherLeaseManager) {
-
- this.containerPlacement = containerPlacement;
- this.containerManager = containerManager;
- this.eventPublisher = eventQueue;
-
- this.replicationCommandWatcher =
- new ReplicationCommandWatcher(TRACK_REPLICATE_COMMAND,
- SCMEvents.REPLICATION_COMPLETE, commandWatcherLeaseManager);
-
- this.deleteContainerCommandWatcher =
- new DeleteContainerCommandWatcher(TRACK_DELETE_CONTAINER_COMMAND,
- SCMEvents.DELETE_CONTAINER_COMMAND_COMPLETE,
- commandWatcherLeaseManager);
-
- this.replicationQueue = new ReplicationQueue();
-
- eventQueue.addHandler(SCMEvents.REPLICATE_CONTAINER,
- (replicationRequest, publisher) -> replicationQueue
- .add(replicationRequest));
-
- this.replicationCommandWatcher.start(eventQueue);
-
- }
-
- public void start() {
-
- ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat("Replication Manager").build();
-
- threadFactory.newThread(this).start();
- }
-
- @Override
- public void run() {
-
- while (running) {
- ReplicationRequest request = null;
- try {
- //TODO: add throttling here
- request = replicationQueue.take();
-
- ContainerID containerID = new ContainerID(request.getContainerId());
- ContainerInfo container = containerManager.getContainer(containerID);
- final HddsProtos.LifeCycleState state = container.getState();
-
- if (state != LifeCycleState.CLOSED &&
- state != LifeCycleState.QUASI_CLOSED) {
- LOG.warn("Cannot replicate the container {} when in {} state.",
- containerID, state);
- continue;
- }
-
- //check the current replication
- List<ContainerReplica> containerReplicas =
- new ArrayList<>(getCurrentReplicas(request));
-
- if (containerReplicas.size() == 0) {
- LOG.warn(
- "Container {} should be replicated but can't find any existing "
- + "replicas",
- containerID);
- return;
- }
-
- final ReplicationRequest finalRequest = request;
-
- int inFlightReplications = replicationCommandWatcher.getTimeoutEvents(
- e -> e.getRequest().getContainerId()
- == finalRequest.getContainerId())
- .size();
-
- int inFlightDelete = deleteContainerCommandWatcher.getTimeoutEvents(
- e -> e.getRequest().getContainerId()
- == finalRequest.getContainerId())
- .size();
-
- int deficit =
- (request.getExpecReplicationCount() - containerReplicas.size())
- - (inFlightReplications - inFlightDelete);
-
- if (deficit > 0) {
-
- List<DatanodeDetails> datanodes = containerReplicas.stream()
- .sorted((r1, r2) ->
- r2.getSequenceId().compareTo(r1.getSequenceId()))
- .map(ContainerReplica::getDatanodeDetails)
- .collect(Collectors.toList());
- List<DatanodeDetails> selectedDatanodes = containerPlacement
- .chooseDatanodes(datanodes, deficit, container.getUsedBytes());
-
- //send the command
- for (DatanodeDetails datanode : selectedDatanodes) {
-
- LOG.info("Container {} is under replicated." +
- " Expected replica count is {}, but found {}." +
- " Re-replicating it on {}.",
- container.containerID(), request.getExpecReplicationCount(),
- containerReplicas.size(), datanode);
-
- ReplicateContainerCommand replicateCommand =
- new ReplicateContainerCommand(containerID.getId(), datanodes);
-
- eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
- new CommandForDatanode<>(
- datanode.getUuid(), replicateCommand));
-
- ReplicationRequestToRepeat timeoutEvent =
- new ReplicationRequestToRepeat(replicateCommand.getId(),
- request);
-
- eventPublisher.fireEvent(TRACK_REPLICATE_COMMAND, timeoutEvent);
-
- }
-
- } else if (deficit < 0) {
-
- int numberOfReplicasToDelete = Math.abs(deficit);
-
- final Map<UUID, List<DatanodeDetails>> originIdToDnMap =
- new LinkedHashMap<>();
-
- containerReplicas.stream()
- .sorted(Comparator.comparing(ContainerReplica::getSequenceId))
- .forEach(replica -> {
- originIdToDnMap.computeIfAbsent(
- replica.getOriginDatanodeId(), key -> new ArrayList<>());
- originIdToDnMap.get(replica.getOriginDatanodeId())
- .add(replica.getDatanodeDetails());
- });
-
- for (List<DatanodeDetails> listOfReplica : originIdToDnMap.values()) {
- if (listOfReplica.size() > 1) {
- final int toDelete = Math.min(listOfReplica.size() - 1,
- numberOfReplicasToDelete);
- final DeleteContainerCommand deleteContainer =
- new DeleteContainerCommand(containerID.getId(), true);
- for (int i = 0; i < toDelete; i++) {
- LOG.info("Container {} is over replicated." +
- " Expected replica count is {}, but found {}." +
- " Deleting the replica on {}.",
- container.containerID(), request.getExpecReplicationCount(),
- containerReplicas.size(), listOfReplica.get(i));
- eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
- new CommandForDatanode<>(listOfReplica.get(i).getUuid(),
- deleteContainer));
- DeletionRequestToRepeat timeoutEvent =
- new DeletionRequestToRepeat(deleteContainer.getId(),
- request);
-
- eventPublisher.fireEvent(
- TRACK_DELETE_CONTAINER_COMMAND, timeoutEvent);
- }
- numberOfReplicasToDelete -= toDelete;
- }
- if (numberOfReplicasToDelete == 0) {
- break;
- }
- }
-
- if (numberOfReplicasToDelete != 0) {
- final int expectedReplicaCount = container
- .getReplicationFactor().getNumber();
-
- LOG.warn("Not able to delete the container replica of Container" +
- " {} even though it is over replicated. Expected replica" +
- " count is {}, current replica count is {}.",
- containerID, expectedReplicaCount,
- expectedReplicaCount + numberOfReplicasToDelete);
- }
- }
-
- } catch (Exception e) {
- LOG.error("Can't replicate container {}", request, e);
- }
- }
-
- }
-
- @VisibleForTesting
- protected Set<ContainerReplica> getCurrentReplicas(ReplicationRequest request)
- throws IOException {
- return containerManager
- .getContainerReplicas(new ContainerID(request.getContainerId()));
- }
-
- @VisibleForTesting
- public ReplicationQueue getReplicationQueue() {
- return replicationQueue;
- }
-
- public void stop() {
- running = false;
- }
-
- /**
- * Event for the ReplicationCommandWatcher to repeat the embedded request.
- * in case fof timeout.
- */
- public static class ReplicationRequestToRepeat
- extends ContainerRequestToRepeat {
-
- public ReplicationRequestToRepeat(
- long commandId, ReplicationRequest request) {
- super(commandId, request);
- }
- }
-
- /**
- * Event for the DeleteContainerCommandWatcher to repeat the
- * embedded request. In case fof timeout.
- */
- public static class DeletionRequestToRepeat
- extends ContainerRequestToRepeat {
-
- public DeletionRequestToRepeat(
- long commandId, ReplicationRequest request) {
- super(commandId, request);
- }
- }
-
- /**
- * Container Request wrapper which will be used by ReplicationManager to
- * perform the intended operation.
- */
- public static class ContainerRequestToRepeat
- implements IdentifiableEventPayload {
-
- private final long commandId;
-
- private final ReplicationRequest request;
-
- ContainerRequestToRepeat(long commandId,
- ReplicationRequest request) {
- this.commandId = commandId;
- this.request = request;
- }
-
- public ReplicationRequest getRequest() {
- return request;
- }
-
- @Override
- public long getId() {
- return commandId;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- ContainerRequestToRepeat that = (ContainerRequestToRepeat) o;
- return Objects.equals(request, that.request);
- }
-
- @Override
- public int hashCode() {
-
- return Objects.hash(request);
- }
- }
-
- /**
- * Event which indicates that the replicate operation is completed.
- */
- public static class ReplicationCompleted
- implements IdentifiableEventPayload {
-
- private final long uuid;
-
- public ReplicationCompleted(long uuid) {
- this.uuid = uuid;
- }
-
- @Override
- public long getId() {
- return uuid;
- }
- }
-
- /**
- * Event which indicates that the container deletion operation is completed.
- */
- public static class DeleteContainerCommandCompleted
- implements IdentifiableEventPayload {
-
- private final long uuid;
-
- public DeleteContainerCommandCompleted(long uuid) {
- this.uuid = uuid;
- }
-
- @Override
- public long getId() {
- return uuid;
- }
- }
-}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java
deleted file mode 100644
index 4ca67be..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdds.scm.container.replication;
-
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.PriorityBlockingQueue;
-
-/**
- * Priority queue to handle under-replicated and over replicated containers
- * in ozone. ReplicationManager will consume these messages and decide
- * accordingly.
- */
-public class ReplicationQueue {
-
- private final BlockingQueue<ReplicationRequest> queue;
-
- public ReplicationQueue() {
- queue = new PriorityBlockingQueue<>();
- }
-
- public boolean add(ReplicationRequest repObj) {
- if (this.queue.contains(repObj)) {
- // Remove the earlier message and insert this one
- this.queue.remove(repObj);
- }
- return this.queue.add(repObj);
- }
-
- public boolean remove(ReplicationRequest repObj) {
- return queue.remove(repObj);
- }
-
- /**
- * Retrieves, but does not remove, the head of this queue,
- * or returns {@code null} if this queue is empty.
- *
- * @return the head of this queue, or {@code null} if this queue is empty
- */
- public ReplicationRequest peek() {
- return queue.peek();
- }
-
- /**
- * Retrieves and removes the head of this queue (blocking queue).
- */
- public ReplicationRequest take() throws InterruptedException {
- return queue.take();
- }
-
- public boolean removeAll(List<ReplicationRequest> repObjs) {
- return queue.removeAll(repObjs);
- }
-
- public int size() {
- return queue.size();
- }
-}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java
deleted file mode 100644
index d40cd9c..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdds.scm.container.replication;
-
-import java.io.Serializable;
-
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
-/**
- * Wrapper class for hdds replication queue. Implements its natural
- * ordering for priority queue.
- */
-public class ReplicationRequest implements Comparable<ReplicationRequest>,
- Serializable {
- private final long containerId;
- private final int replicationCount;
- private final int expecReplicationCount;
- private final long timestamp;
-
- public ReplicationRequest(long containerId, int replicationCount,
- long timestamp, int expecReplicationCount) {
- this.containerId = containerId;
- this.replicationCount = replicationCount;
- this.timestamp = timestamp;
- this.expecReplicationCount = expecReplicationCount;
- }
-
- public ReplicationRequest(long containerId, int replicationCount,
- int expecReplicationCount) {
- this(containerId, replicationCount, System.currentTimeMillis(),
- expecReplicationCount);
- }
-
- /**
- * Compares this object with the specified object for order. Returns a
- * negative integer, zero, or a positive integer as this object is less
- * than, equal to, or greater than the specified object.
- * @param o the object to be compared.
- * @return a negative integer, zero, or a positive integer as this object
- * is less than, equal to, or greater than the specified object.
- * @throws NullPointerException if the specified object is null
- * @throws ClassCastException if the specified object's type prevents it
- * from being compared to this object.
- */
- @Override
- public int compareTo(ReplicationRequest o) {
- if (o == null) {
- return 1;
- }
- if (this == o) {
- return 0;
- }
- int retVal = Integer
- .compare(getReplicationCount() - getExpecReplicationCount(),
- o.getReplicationCount() - o.getExpecReplicationCount());
- if (retVal != 0) {
- return retVal;
- }
- return Long.compare(getTimestamp(), o.getTimestamp());
- }
-
- @Override
- public int hashCode() {
- return new HashCodeBuilder(91, 1011)
- .append(getContainerId())
- .toHashCode();
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- ReplicationRequest that = (ReplicationRequest) o;
- return new EqualsBuilder().append(getContainerId(), that.getContainerId())
- .isEquals();
- }
-
- public long getContainerId() {
- return containerId;
- }
-
- public int getReplicationCount() {
- return replicationCount;
- }
-
- public long getTimestamp() {
- return timestamp;
- }
-
- public int getExpecReplicationCount() {
- return expecReplicationCount;
- }
-
- @Override
- public String toString() {
- return "ReplicationRequest{" +
- "containerId=" + containerId +
- ", replicationCount=" + replicationCount +
- ", expecReplicationCount=" + expecReplicationCount +
- ", timestamp=" + timestamp +
- '}';
- }
-}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
index ed9727a..43d396e0 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
@@ -23,8 +23,6 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler;
-import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
- .ReplicationStatus;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.IncrementalContainerReportFromDatanode;
@@ -40,14 +38,8 @@ import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.ContainerReportFromDatanode;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.NodeReportFromDatanode;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
- .DeleteContainerCommandCompleted;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
- .ReplicationCompleted;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
-
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer.NodeRegistrationContainerReport;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer
+ .NodeRegistrationContainerReport;
import org.apache.hadoop.hdds.server.events.Event;
import org.apache.hadoop.hdds.server.events.TypedEvent;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
@@ -186,12 +178,6 @@ public final class SCMEvents {
/**
* This event will be triggered by CommandStatusReportHandler whenever a
- * status for Replication SCMCommand is received.
- */
- public static final Event<ReplicationStatus> REPLICATION_STATUS = new
- TypedEvent<>(ReplicationStatus.class, "Replicate_Command_Status");
- /**
- * This event will be triggered by CommandStatusReportHandler whenever a
* status for DeleteBlock SCMCommand is received.
*/
public static final TypedEvent<CommandStatusReportHandler.DeleteBlockStatus>
@@ -207,53 +193,6 @@ public final class SCMEvents {
public static final Event<PendingDeleteStatusList> PENDING_DELETE_STATUS =
new TypedEvent<>(PendingDeleteStatusList.class, "Pending_Delete_Status");
- /**
- * This is the command for ReplicationManager to handle under/over
- * replication. Sent by the ContainerReportHandler after processing the
- * heartbeat.
- */
- public static final TypedEvent<ReplicationRequest> REPLICATE_CONTAINER =
- new TypedEvent<>(ReplicationRequest.class);
-
- /**
- * This event is sent by the ReplicaManager to the
- * ReplicationCommandWatcher to track the in-progress replication.
- */
- public static final TypedEvent<ReplicationManager.ReplicationRequestToRepeat>
- TRACK_REPLICATE_COMMAND =
- new TypedEvent<>(ReplicationManager.ReplicationRequestToRepeat.class);
-
- /**
- * This event is sent by the ReplicaManager to the
- * DeleteContainerCommandWatcher to track the in-progress delete commands.
- */
- public static final TypedEvent<ReplicationManager.DeletionRequestToRepeat>
- TRACK_DELETE_CONTAINER_COMMAND =
- new TypedEvent<>(ReplicationManager.DeletionRequestToRepeat.class);
- /**
- * This event comes from the Heartbeat dispatcher (in fact from the
- * datanode) to notify the scm that the replication is done. This is
- * received by the replicate command watcher to mark in-progress task as
- * finished.
- <p>
- * TODO: Temporary event, should be replaced by specific Heartbeat
- * ActionRequred event.
- */
- public static final TypedEvent<ReplicationCompleted> REPLICATION_COMPLETE =
- new TypedEvent<>(ReplicationCompleted.class);
-
- public static final TypedEvent<DeleteContainerCommandCompleted>
- DELETE_CONTAINER_COMMAND_COMPLETE =
- new TypedEvent<>(DeleteContainerCommandCompleted.class);
-
- /**
- * Signal for all the components (but especially for the replication
- * manager and container report handler) that the replication could be
- * started. Should be send only if (almost) all the container state are
- * available from the datanodes.
- */
- public static final TypedEvent<Boolean> START_REPLICATION =
- new TypedEvent<>(Boolean.class);
public static final TypedEvent<SafeModeStatus> SAFE_MODE_STATUS =
new TypedEvent<>(SafeModeStatus.class);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
index a75a51a..17e1fed 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
@@ -18,121 +18,155 @@
package org.apache.hadoop.hdds.scm.node;
-import java.util.Set;
+import java.io.IOException;
+import java.util.Optional;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ContainerException;
-import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
-import org.apache.hadoop.hdds.scm.container.ContainerReplica;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
-import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.hadoop.hdds.scm.events.SCMEvents.CLOSE_CONTAINER;
+
/**
* Handles Dead Node event.
*/
public class DeadNodeHandler implements EventHandler<DatanodeDetails> {
- private final ContainerManager containerManager;
-
private final NodeManager nodeManager;
+ private final PipelineManager pipelineManager;
+ private final ContainerManager containerManager;
private static final Logger LOG =
LoggerFactory.getLogger(DeadNodeHandler.class);
- public DeadNodeHandler(NodeManager nodeManager,
- ContainerManager containerManager) {
- this.containerManager = containerManager;
+ public DeadNodeHandler(final NodeManager nodeManager,
+ final PipelineManager pipelineManager,
+ final ContainerManager containerManager) {
this.nodeManager = nodeManager;
+ this.pipelineManager = pipelineManager;
+ this.containerManager = containerManager;
}
@Override
- public void onMessage(DatanodeDetails datanodeDetails,
- EventPublisher publisher) {
+ public void onMessage(final DatanodeDetails datanodeDetails,
+ final EventPublisher publisher) {
- // TODO: check if there are any pipeline on this node and fire close
- // pipeline event
- Set<ContainerID> ids =
- null;
try {
- ids = nodeManager.getContainers(datanodeDetails);
- } catch (NodeNotFoundException e) {
+
+ /*
+ * We should have already destroyed all the pipelines on this datanode
+ * when it was marked as stale. Destroy pipeline should also have closed
+ * all the containers on this datanode.
+ *
+ * Ideally we should not have any pipeline or OPEN containers now.
+ *
+ * To be on a safer side, we double check here and take appropriate
+ * action.
+ */
+
+ destroyPipelines(datanodeDetails);
+ closeContainers(datanodeDetails, publisher);
+
+ // Remove the container replicas associated with the dead node.
+ removeContainerReplicas(datanodeDetails);
+
+ } catch (NodeNotFoundException ex) {
// This should not happen, we cannot get a dead node event for an
- // unregistered node!
+ // unregistered datanode!
LOG.error("DeadNode event for a unregistered node: {}!", datanodeDetails);
}
- if (ids == null) {
- LOG.info("There's no containers in dead datanode {}, no replica will be"
- + " removed from the in-memory state.", datanodeDetails.getUuid());
- return;
- }
- LOG.info("Datanode {} is dead. Removing replications from the in-memory" +
- " state.", datanodeDetails.getUuid());
- for (ContainerID id : ids) {
- try {
- final ContainerInfo container = containerManager.getContainer(id);
- // TODO: For open containers, trigger close on other nodes
- if (!container.isOpen()) {
- Set<ContainerReplica> replicas = containerManager
- .getContainerReplicas(id);
- replicas.stream()
- .filter(r -> r.getDatanodeDetails().equals(datanodeDetails))
- .findFirst()
- .ifPresent(replica -> {
- try {
- containerManager.removeContainerReplica(id, replica);
- ContainerInfo containerInfo =
- containerManager.getContainer(id);
- replicateIfNeeded(containerInfo, publisher);
- } catch (ContainerException ex) {
- LOG.warn("Exception while removing container replica #{} " +
- "for container #{}.", replica, container, ex);
- }
- });
- }
- } catch (ContainerNotFoundException cnfe) {
- LOG.warn("Container Not found!", cnfe);
- }
- }
}
/**
- * Compare the existing replication number with the expected one.
+ * Destroys all the pipelines on the given datanode if there are any.
+ *
+ * @param datanodeDetails DatanodeDetails
*/
- private void replicateIfNeeded(ContainerInfo container,
- EventPublisher publisher) throws ContainerNotFoundException {
- // Replicate only closed and Quasi closed containers
- if (container.getState() == HddsProtos.LifeCycleState.CLOSED ||
- container.getState() == HddsProtos.LifeCycleState.QUASI_CLOSED) {
- final int existingReplicas = containerManager
- .getContainerReplicas(container.containerID()).size();
- final int expectedReplicas = container.getReplicationFactor().getNumber();
- if (existingReplicas != expectedReplicas) {
- LOG.debug("Replicate Request fired for container {}, exisiting " +
- "replica count {}, expected replica count {}",
- container.getContainerID(), existingReplicas, expectedReplicas);
- publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER,
- new ReplicationRequest(
- container.getContainerID(), existingReplicas,
- expectedReplicas));
- }
- }
+ private void destroyPipelines(final DatanodeDetails datanodeDetails) {
+ Optional.ofNullable(nodeManager.getPipelines(datanodeDetails))
+ .ifPresent(pipelines ->
+ pipelines.forEach(id -> {
+ try {
+ pipelineManager.finalizeAndDestroyPipeline(
+ pipelineManager.getPipeline(id), false);
+ } catch (PipelineNotFoundException ignore) {
+ // Pipeline is not there in pipeline manager,
+ // should we care?
+ } catch (IOException ex) {
+ LOG.warn("Exception while finalizing pipeline {}",
+ id, ex);
+ }
+ }));
+ }
+
+ /**
+ * Sends CloseContainerCommand to all the open containers on the
+ * given datanode.
+ *
+ * @param datanodeDetails DatanodeDetails
+ * @param publisher EventPublisher
+ * @throws NodeNotFoundException
+ */
+ private void closeContainers(final DatanodeDetails datanodeDetails,
+ final EventPublisher publisher)
+ throws NodeNotFoundException {
+ nodeManager.getContainers(datanodeDetails)
+ .forEach(id -> {
+ try {
+ final ContainerInfo container = containerManager.getContainer(id);
+ if (container.getState() == HddsProtos.LifeCycleState.OPEN) {
+ publisher.fireEvent(CLOSE_CONTAINER, id);
+ }
+ } catch (ContainerNotFoundException cnfe) {
+ LOG.warn("Container {} is not managed by ContainerManager.",
+ id, cnfe);
+ }
+ });
}
/**
- * Returns logger.
- * */
- // TODO: remove this.
- public static Logger getLogger() {
- return LOG;
+ * Removes the ContainerReplica of the dead datanode from the containers
+ * which are hosted by that datanode.
+ *
+ * @param datanodeDetails DatanodeDetails
+ * @throws NodeNotFoundException
+ */
+ private void removeContainerReplicas(final DatanodeDetails datanodeDetails)
+ throws NodeNotFoundException {
+ nodeManager.getContainers(datanodeDetails)
+ .forEach(id -> {
+ try {
+ final ContainerInfo container = containerManager.getContainer(id);
+ // Identify and remove the ContainerReplica of dead node
+ containerManager.getContainerReplicas(id)
+ .stream()
+ .filter(r -> r.getDatanodeDetails().equals(datanodeDetails))
+ .findFirst()
+ .ifPresent(replica -> {
+ try {
+ containerManager.removeContainerReplica(id, replica);
+ } catch (ContainerException ex) {
+ LOG.warn("Exception while removing container replica #{} " +
+ "of container {}.", replica, container, ex);
+ }
+ });
+ } catch (ContainerNotFoundException cnfe) {
+ LOG.warn("Container {} is not managed by ContainerManager.",
+ id, cnfe);
+ }
+ });
}
+
+
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 17f72f6..2ab7295 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hdds.scm.node;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos;
@@ -83,8 +82,7 @@ import java.util.stream.Collectors;
*/
public class SCMNodeManager implements NodeManager {
- @VisibleForTesting
- static final Logger LOG =
+ private static final Logger LOG =
LoggerFactory.getLogger(SCMNodeManager.class);
private final NodeStateManager nodeStateManager;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 100534a..270d356 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -297,7 +297,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
StaleNodeHandler staleNodeHandler =
new StaleNodeHandler(scmNodeManager, pipelineManager, conf);
DeadNodeHandler deadNodeHandler = new DeadNodeHandler(scmNodeManager,
- containerManager);
+ pipelineManager, containerManager);
NonHealthyToHealthyNodeHandler nonHealthyToHealthyNodeHandler =
new NonHealthyToHealthyNodeHandler(pipelineManager, conf);
ContainerActionsHandler actionsHandler = new ContainerActionsHandler();
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
index afa7fcb..6a98a34 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
@@ -37,8 +37,6 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
-import org.apache.hadoop.hdds.server.events.EventHandler;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.test.GenericTestUtils;
@@ -57,7 +55,7 @@ import static org.apache.hadoop.ozone.OzoneConsts.MB;
/**
* Tests for SCM Block Manager.
*/
-public class TestBlockManager implements EventHandler<Boolean> {
+public class TestBlockManager {
private StorageContainerManager scm;
private SCMContainerManager mapping;
private MockNodeManager nodeManager;
@@ -103,7 +101,8 @@ public class TestBlockManager implements EventHandler<Boolean> {
eventQueue = new EventQueue();
eventQueue.addHandler(SCMEvents.SAFE_MODE_STATUS,
scm.getSafeModeHandler());
- eventQueue.addHandler(SCMEvents.START_REPLICATION, this);
+ eventQueue.addHandler(SCMEvents.SAFE_MODE_STATUS,
+ scm.getSafeModeHandler());
CloseContainerEventHandler closeContainerHandler =
new CloseContainerEventHandler(pipelineManager, mapping);
eventQueue.addHandler(SCMEvents.CLOSE_CONTAINER, closeContainerHandler);
@@ -282,8 +281,4 @@ public class TestBlockManager implements EventHandler<Boolean> {
Assert.assertEquals(1, pipelineManager.getPipelines(type, factor).size());
}
- @Override
- public void onMessage(Boolean aBoolean, EventPublisher publisher) {
- System.out.println("test");
- }
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestCommandStatusReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestCommandStatusReportHandler.java
index 9fea7db..8877b2b 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestCommandStatusReportHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestCommandStatusReportHandler.java
@@ -74,13 +74,8 @@ public class TestCommandStatusReportHandler implements EventPublisher {
cmdStatusReportHandler.onMessage(report, this);
assertTrue(logCapturer.getOutput().contains("firing event of type " +
"Delete_Block_Status"));
- assertTrue(logCapturer.getOutput().contains("firing event of type " +
- "Replicate_Command_Status"));
-
assertTrue(logCapturer.getOutput().contains("type: " +
"deleteBlocksCommand"));
- assertTrue(logCapturer.getOutput().contains("type: " +
- "replicateContainerCommand"));
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
deleted file mode 100644
index fbe2641..0000000
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.hdds.scm.container.replication;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.stream.IntStream;
-
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ReplicateContainerCommandProto;
-import org.apache.hadoop.hdds.scm.TestUtils;
-import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.container.ContainerManager;
-import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
-import org.apache.hadoop.hdds.scm.container.ContainerReplica;
-import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms
- .ContainerPlacementPolicy;
-import org.apache.hadoop.hdds.scm.container.replication
- .ReplicationManager.ReplicationRequestToRepeat;
-import org.apache.hadoop.hdds.scm.container.replication
- .ReplicationManager.DeletionRequestToRepeat;
-import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.server.events.EventQueue;
-import org.apache.hadoop.ozone.lease.LeaseManager;
-import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
-
-import static org.apache.hadoop.hdds.scm.events.SCMEvents
- .TRACK_DELETE_CONTAINER_COMMAND;
-import static org.apache.hadoop.hdds.scm.events.SCMEvents
- .TRACK_REPLICATE_COMMAND;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import static org.mockito.Matchers.anyObject;
-import org.mockito.Mockito;
-import static org.mockito.Mockito.when;
-
-/**
- * Test behaviour of the TestReplication.
- */
-public class TestReplicationManager {
-
- private EventQueue queue;
-
- private List<ReplicationRequestToRepeat> trackReplicationEvents;
- private List<DeletionRequestToRepeat> trackDeleteEvents;
-
- private List<CommandForDatanode<ReplicateContainerCommandProto>> copyEvents;
-
- private ContainerManager containerManager;
-
- private ContainerPlacementPolicy containerPlacementPolicy;
- private List<DatanodeDetails> listOfDatanodeDetails;
- private List<ContainerReplica> listOfContainerReplica;
- private LeaseManager<Long> leaseManager;
- private ReplicationManager replicationManager;
-
- @Before
- public void initReplicationManager() throws IOException {
-
- listOfDatanodeDetails = new ArrayList<>();
- listOfContainerReplica = new ArrayList<>();
- IntStream.range(1, 6).forEach(i -> {
- DatanodeDetails dd = TestUtils.randomDatanodeDetails();
- listOfDatanodeDetails.add(dd);
- listOfContainerReplica.add(ContainerReplica.newBuilder()
- .setContainerID(ContainerID.valueof(i))
- .setContainerState(ContainerReplicaProto.State.CLOSED)
- .setSequenceId(10000L)
- .setOriginNodeId(dd.getUuid())
- .setDatanodeDetails(dd).build());
- });
-
- containerPlacementPolicy =
- (excludedNodes, nodesRequired, sizeRequired) -> listOfDatanodeDetails
- .subList(2, 2 + nodesRequired);
-
- containerManager = Mockito.mock(ContainerManager.class);
-
- ContainerInfo containerInfo = new ContainerInfo.Builder()
- .setState(LifeCycleState.CLOSED)
- .build();
-
- when(containerManager.getContainer(anyObject()))
- .thenReturn(containerInfo);
-
- when(containerManager.getContainerReplicas(new ContainerID(1L)))
- .thenReturn(new HashSet<>(Arrays.asList(
- listOfContainerReplica.get(0),
- listOfContainerReplica.get(1)
- )));
-
-
- when(containerManager.getContainerReplicas(new ContainerID(3L)))
- .thenReturn(new HashSet<>());
-
- queue = new EventQueue();
-
- trackReplicationEvents = new ArrayList<>();
- queue.addHandler(TRACK_REPLICATE_COMMAND,
- (event, publisher) -> trackReplicationEvents.add(event));
-
- trackDeleteEvents = new ArrayList<>();
- queue.addHandler(TRACK_DELETE_CONTAINER_COMMAND,
- (event, publisher) -> trackDeleteEvents.add(event));
-
- copyEvents = new ArrayList<>();
- queue.addHandler(SCMEvents.DATANODE_COMMAND,
- (event, publisher) -> copyEvents.add(event));
-
- leaseManager = new LeaseManager<>("Test", 100000L);
-
- replicationManager = new ReplicationManager(containerPlacementPolicy,
- containerManager, queue, leaseManager);
-
- }
-
- /**
- * Container should be replicated but no source replicas.
- */
- @Test()
- public void testNoExistingReplicas() throws InterruptedException {
- try {
- leaseManager.start();
- replicationManager.start();
-
- //WHEN
- queue.fireEvent(SCMEvents.REPLICATE_CONTAINER,
- new ReplicationRequest(3L, (short) 2, System.currentTimeMillis(),
- (short) 3));
-
- Thread.sleep(500L);
- queue.processAll(1000L);
-
- //THEN
- Assert.assertEquals(0, trackReplicationEvents.size());
- Assert.assertEquals(0, copyEvents.size());
-
- } finally {
- if (leaseManager != null) {
- leaseManager.shutdown();
- }
- }
- }
-
- @Test
- public void testOverReplication() throws ContainerNotFoundException,
- InterruptedException {
- try {
- leaseManager.start();
- replicationManager.start();
-
- final ContainerID containerID = ContainerID.valueof(5L);
-
- final ContainerReplica duplicateReplicaOne = ContainerReplica.newBuilder()
- .setContainerID(containerID)
- .setContainerState(ContainerReplicaProto.State.CLOSED)
- .setSequenceId(10000L)
- .setOriginNodeId(listOfDatanodeDetails.get(0).getUuid())
- .setDatanodeDetails(listOfDatanodeDetails.get(3))
- .build();
-
- final ContainerReplica duplicateReplicaTwo = ContainerReplica.newBuilder()
- .setContainerID(containerID)
- .setContainerState(ContainerReplicaProto.State.CLOSED)
- .setSequenceId(10000L)
- .setOriginNodeId(listOfDatanodeDetails.get(1).getUuid())
- .setDatanodeDetails(listOfDatanodeDetails.get(4))
- .build();
-
- when(containerManager.getContainerReplicas(new ContainerID(5L)))
- .thenReturn(new HashSet<>(Arrays.asList(
- listOfContainerReplica.get(0),
- listOfContainerReplica.get(1),
- listOfContainerReplica.get(2),
- duplicateReplicaOne,
- duplicateReplicaTwo
- )));
-
- queue.fireEvent(SCMEvents.REPLICATE_CONTAINER,
- new ReplicationRequest(5L, (short) 5, System.currentTimeMillis(),
- (short) 3));
- Thread.sleep(500L);
- queue.processAll(1000L);
-
- //THEN
- Assert.assertEquals(2, trackDeleteEvents.size());
- Assert.assertEquals(2, copyEvents.size());
-
- } finally {
- if (leaseManager != null) {
- leaseManager.shutdown();
- }
- }
- }
-
- @Test
- public void testEventSending() throws InterruptedException, IOException {
-
- //GIVEN
- try {
- leaseManager.start();
-
- replicationManager.start();
-
- //WHEN
- queue.fireEvent(SCMEvents.REPLICATE_CONTAINER,
- new ReplicationRequest(1L, (short) 2, System.currentTimeMillis(),
- (short) 3));
-
- Thread.sleep(500L);
- queue.processAll(1000L);
-
- //THEN
- Assert.assertEquals(1, trackReplicationEvents.size());
- Assert.assertEquals(1, copyEvents.size());
- } finally {
- if (leaseManager != null) {
- leaseManager.shutdown();
- }
- }
- }
-
- @Test
- public void testCommandWatcher() throws InterruptedException, IOException {
- LeaseManager<Long> rapidLeaseManager =
- new LeaseManager<>("Test", 1000L);
-
- replicationManager = new ReplicationManager(containerPlacementPolicy,
- containerManager, queue, rapidLeaseManager);
-
- try {
- leaseManager.start();
- rapidLeaseManager.start();
- replicationManager.start();
-
- queue.fireEvent(SCMEvents.REPLICATE_CONTAINER,
- new ReplicationRequest(1L, (short) 2, System.currentTimeMillis(),
- (short) 3));
-
- Thread.sleep(500L);
-
- queue.processAll(1000L);
-
- Assert.assertEquals(1, trackReplicationEvents.size());
- Assert.assertEquals(1, copyEvents.size());
-
- Assert.assertEquals(trackReplicationEvents.get(0).getId(),
- copyEvents.get(0).getCommand().getId());
-
- //event is timed out
- Thread.sleep(1500);
-
- queue.processAll(1000L);
-
- //original copy command + retry
- Assert.assertEquals(2, trackReplicationEvents.size());
- Assert.assertEquals(2, copyEvents.size());
-
- } finally {
- rapidLeaseManager.shutdown();
- if (leaseManager != null) {
- leaseManager.shutdown();
- }
- }
- }
-
-}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationQueue.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationQueue.java
deleted file mode 100644
index 9dd4fe3..0000000
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationQueue.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdds.scm.container.replication;
-
-import java.util.Random;
-import java.util.UUID;
-import org.apache.hadoop.util.Time;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Test class for ReplicationQueue.
- */
-public class TestReplicationQueue {
-
- private ReplicationQueue replicationQueue;
- private Random random;
-
- @Before
- public void setUp() {
- replicationQueue = new ReplicationQueue();
- random = new Random();
- }
-
- @Test
- public void testDuplicateAddOp() throws InterruptedException {
- long contId = random.nextLong();
- String nodeId = UUID.randomUUID().toString();
- ReplicationRequest obj1, obj2, obj3;
- long time = Time.monotonicNow();
- obj1 = new ReplicationRequest(contId, (short) 2, time, (short) 3);
- obj2 = new ReplicationRequest(contId, (short) 2, time + 1, (short) 3);
- obj3 = new ReplicationRequest(contId, (short) 1, time+2, (short) 3);
-
- replicationQueue.add(obj1);
- replicationQueue.add(obj2);
- replicationQueue.add(obj3);
- Assert.assertEquals("Should add only 1 msg as second one is duplicate",
- 1, replicationQueue.size());
- ReplicationRequest temp = replicationQueue.take();
- Assert.assertEquals(temp, obj3);
- }
-
- @Test
- public void testPollOp() throws InterruptedException {
- long contId = random.nextLong();
- String nodeId = UUID.randomUUID().toString();
- ReplicationRequest msg1, msg2, msg3, msg4, msg5;
- msg1 = new ReplicationRequest(contId, (short) 1, Time.monotonicNow(),
- (short) 3);
- long time = Time.monotonicNow();
- msg2 = new ReplicationRequest(contId + 1, (short) 4, time, (short) 3);
- msg3 = new ReplicationRequest(contId + 2, (short) 0, time, (short) 3);
- msg4 = new ReplicationRequest(contId, (short) 2, time, (short) 3);
- // Replication message for same container but different nodeId
- msg5 = new ReplicationRequest(contId + 1, (short) 2, time, (short) 3);
-
- replicationQueue.add(msg1);
- replicationQueue.add(msg2);
- replicationQueue.add(msg3);
- replicationQueue.add(msg4);
- replicationQueue.add(msg5);
- Assert.assertEquals("Should have 3 objects",
- 3, replicationQueue.size());
-
- // Since Priority queue orders messages according to replication count,
- // message with lowest replication should be first
- ReplicationRequest temp;
- temp = replicationQueue.take();
- Assert.assertEquals("Should have 2 objects",
- 2, replicationQueue.size());
- Assert.assertEquals(temp, msg3);
-
- temp = replicationQueue.take();
- Assert.assertEquals("Should have 1 objects",
- 1, replicationQueue.size());
- Assert.assertEquals(temp, msg5);
-
- // Message 2 should be ordered before message 5 as both have same
- // replication number but message 2 has earlier timestamp.
- temp = replicationQueue.take();
- Assert.assertEquals("Should have 0 objects",
- replicationQueue.size(), 0);
- Assert.assertEquals(temp, msg4);
- }
-
- @Test
- public void testRemoveOp() {
- long contId = random.nextLong();
- String nodeId = UUID.randomUUID().toString();
- ReplicationRequest obj1, obj2, obj3;
- obj1 = new ReplicationRequest(contId, (short) 1, Time.monotonicNow(),
- (short) 3);
- obj2 = new ReplicationRequest(contId + 1, (short) 2, Time.monotonicNow(),
- (short) 3);
- obj3 = new ReplicationRequest(contId + 2, (short) 3, Time.monotonicNow(),
- (short) 3);
-
- replicationQueue.add(obj1);
- replicationQueue.add(obj2);
- replicationQueue.add(obj3);
- Assert.assertEquals("Should have 3 objects",
- 3, replicationQueue.size());
-
- replicationQueue.remove(obj3);
- Assert.assertEquals("Should have 2 objects",
- 2, replicationQueue.size());
-
- replicationQueue.remove(obj2);
- Assert.assertEquals("Should have 1 objects",
- 1, replicationQueue.size());
-
- replicationQueue.remove(obj1);
- Assert.assertEquals("Should have 0 objects",
- 0, replicationQueue.size());
- }
-
-}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
index 6805210..7657b54 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
@@ -62,7 +63,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
-import org.slf4j.event.Level;
/**
* Test DeadNodeHandler.
@@ -95,7 +95,8 @@ public class TestDeadNodeHandler {
manager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
mockRatisProvider);
containerManager = scm.getContainerManager();
- deadNodeHandler = new DeadNodeHandler(nodeManager, containerManager);
+ deadNodeHandler = new DeadNodeHandler(nodeManager,
+ Mockito.mock(PipelineManager.class), containerManager);
eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler);
publisher = Mockito.mock(EventPublisher.class);
nodeReportHandler = new NodeReportHandler(nodeManager);
@@ -168,10 +169,6 @@ public class TestDeadNodeHandler {
TestUtils.closeContainer(containerManager, container2.containerID());
TestUtils.quasiCloseContainer(containerManager, container3.containerID());
- GenericTestUtils.setLogLevel(DeadNodeHandler.getLogger(), Level.DEBUG);
- GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
- .captureLogs(DeadNodeHandler.getLogger());
-
deadNodeHandler.onMessage(datanode1, publisher);
Set<ContainerReplica> container1Replicas = containerManager
@@ -191,60 +188,6 @@ public class TestDeadNodeHandler {
Assert.assertEquals(1, container3Replicas.size());
Assert.assertEquals(datanode3,
container3Replicas.iterator().next().getDatanodeDetails());
-
- // Replicate should be fired for container 1 and container 2 as now
- // datanode 1 is dead, these 2 will not match with expected replica count
- // and their state is one of CLOSED/QUASI_CLOSE.
- Assert.assertTrue(logCapturer.getOutput().contains(
- "Replicate Request fired for container " +
- container1.getContainerID()));
- Assert.assertTrue(logCapturer.getOutput().contains(
- "Replicate Request fired for container " +
- container2.getContainerID()));
-
- // as container4 is still in open state, replicate event should not have
- // fired for this.
- Assert.assertFalse(logCapturer.getOutput().contains(
- "Replicate Request fired for container " +
- container4.getContainerID()));
-
-
- }
-
- @Test
- public void testOnMessageReplicaFailure() throws Exception {
-
- DatanodeDetails datanode1 = TestUtils.randomDatanodeDetails();
- DatanodeDetails datanode2 = TestUtils.randomDatanodeDetails();
- DatanodeDetails datanode3 = TestUtils.randomDatanodeDetails();
-
- String storagePath = GenericTestUtils.getRandomizedTempPath()
- .concat("/" + datanode1.getUuidString());
-
- StorageReportProto storageOne = TestUtils.createStorageReport(
- datanode1.getUuid(), storagePath, 100, 10, 90, null);
-
- nodeManager.register(datanode1,
- TestUtils.createNodeReport(storageOne), null);
- nodeManager.register(datanode2,
- TestUtils.createNodeReport(storageOne), null);
- nodeManager.register(datanode3,
- TestUtils.createNodeReport(storageOne), null);
-
- DatanodeDetails dn1 = TestUtils.randomDatanodeDetails();
- GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
- .captureLogs(DeadNodeHandler.getLogger());
-
- nodeReportHandler.onMessage(getNodeReport(dn1, storageOne),
- Mockito.mock(EventPublisher.class));
-
- ContainerInfo container1 =
- TestUtils.allocateContainer(containerManager);
- TestUtils.closeContainer(containerManager, container1.containerID());
-
- deadNodeHandler.onMessage(dn1, eventQueue);
- Assert.assertTrue(logCapturer.getOutput().contains(
- "DeadNode event for a unregistered node"));
}
private void registerReplicas(ContainerManager contManager,
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestStatisticsUpdate.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestStatisticsUpdate.java
index e62295f..9bce94b 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestStatisticsUpdate.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestStatisticsUpdate.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.NodeReportFromDatanode;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
@@ -69,7 +70,8 @@ public class TestStatisticsUpdate {
final StorageContainerManager scm = HddsTestUtils.getScm(conf);
nodeManager = scm.getScmNodeManager();
final DeadNodeHandler deadNodeHandler = new DeadNodeHandler(
- nodeManager, scm.getContainerManager());
+ nodeManager, Mockito.mock(PipelineManager.class),
+ scm.getContainerManager());
eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler);
nodeReportHandler = new NodeReportHandler(nodeManager);
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
index bceec92..4b03474 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
@@ -421,15 +421,10 @@ public class TestEndPoint {
serverAddress, 3000);
Map<Long, CommandStatus> map = stateContext.getCommandStatusMap();
assertNotNull(map);
- assertEquals("Should have 2 objects", 2, map.size());
- assertTrue(map.containsKey(Long.valueOf(2)));
- assertTrue(map.containsKey(Long.valueOf(3)));
- assertTrue(map.get(Long.valueOf(2)).getType()
- .equals(Type.replicateContainerCommand));
- assertTrue(
- map.get(Long.valueOf(3)).getType().equals(Type.deleteBlocksCommand));
- assertTrue(map.get(Long.valueOf(2)).getStatus().equals(Status.PENDING));
- assertTrue(map.get(Long.valueOf(3)).getStatus().equals(Status.PENDING));
+ assertEquals("Should have 1 objects", 1, map.size());
+ assertTrue(map.containsKey(3L));
+ assertEquals(Type.deleteBlocksCommand, map.get(3L).getType());
+ assertEquals(Status.PENDING, map.get(3L).getStatus());
scmServerImpl.clearScmCommandRequests();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org