You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by na...@apache.org on 2019/04/23 12:06:11 UTC

[hadoop] branch trunk updated: HDDS-1368. Cleanup old ReplicationManager code from SCM.

This is an automated email from the ASF dual-hosted git repository.

nanda pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7e1f8d3  HDDS-1368. Cleanup old ReplicationManager code from SCM.
7e1f8d3 is described below

commit 7e1f8d3a1b7b48d3debcce1d7096ed4c46fdeb0f
Author: Nanda kumar <na...@apache.org>
AuthorDate: Tue Apr 23 17:35:39 2019 +0530

    HDDS-1368. Cleanup old ReplicationManager code from SCM.
---
 .../common/statemachine/StateContext.java          |  32 +-
 .../DeleteContainerCommandHandler.java             |   4 -
 .../ReplicateContainerCommandHandler.java          |  28 +-
 .../scm/command/CommandStatusReportHandler.java    |  43 +--
 .../container/DeleteContainerCommandWatcher.java   |  56 ---
 .../replication/ReplicationCommandWatcher.java     |  56 ---
 .../container/replication/ReplicationManager.java  | 384 ---------------------
 .../container/replication/ReplicationQueue.java    |  73 ----
 .../container/replication/ReplicationRequest.java  | 123 -------
 .../apache/hadoop/hdds/scm/events/SCMEvents.java   |  65 +---
 .../hadoop/hdds/scm/node/DeadNodeHandler.java      | 186 ++++++----
 .../hadoop/hdds/scm/node/SCMNodeManager.java       |   4 +-
 .../hdds/scm/server/StorageContainerManager.java   |   2 +-
 .../hadoop/hdds/scm/block/TestBlockManager.java    |  11 +-
 .../command/TestCommandStatusReportHandler.java    |   5 -
 .../replication/TestReplicationManager.java        | 290 ----------------
 .../replication/TestReplicationQueue.java          | 134 -------
 .../hadoop/hdds/scm/node/TestDeadNodeHandler.java  |  63 +---
 .../hadoop/hdds/scm/node/TestStatisticsUpdate.java |   4 +-
 .../ozone/container/common/TestEndPoint.java       |  13 +-
 20 files changed, 150 insertions(+), 1426 deletions(-)

diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 7e06473..56151f8 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -19,10 +19,11 @@ package org.apache.hadoop.ozone.container.common.statemachine;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.GeneratedMessage;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.PipelineAction;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerAction;
@@ -34,8 +35,6 @@ import org.apache.hadoop.ozone.container.common.states.datanode
 import org.apache.hadoop.ozone.container.common.states.datanode
     .RunningDatanodeState;
 import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
-import org.apache.hadoop.ozone.protocol.commands.CommandStatus
-    .CommandStatusBuilder;
 import org.apache.hadoop.ozone.protocol.commands
     .DeleteBlockCommandStatus.DeleteBlockCommandStatusBuilder;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
@@ -432,27 +431,14 @@ public class StateContext {
    * @param cmd - {@link SCMCommand}.
    */
   public void addCmdStatus(SCMCommand cmd) {
-    final Optional<CommandStatusBuilder> cmdStatusBuilder;
-    switch (cmd.getType()) {
-    case replicateContainerCommand:
-      cmdStatusBuilder = Optional.of(CommandStatusBuilder.newBuilder());
-      break;
-    case deleteBlocksCommand:
-      cmdStatusBuilder = Optional.of(
-          DeleteBlockCommandStatusBuilder.newBuilder());
-      break;
-    case deleteContainerCommand:
-      cmdStatusBuilder = Optional.of(CommandStatusBuilder.newBuilder());
-      break;
-    default:
-      cmdStatusBuilder = Optional.empty();
+    if (cmd.getType() == SCMCommandProto.Type.deleteBlocksCommand) {
+      addCmdStatus(cmd.getId(),
+          DeleteBlockCommandStatusBuilder.newBuilder()
+              .setCmdId(cmd.getId())
+              .setStatus(Status.PENDING)
+              .setType(cmd.getType())
+              .build());
     }
-    cmdStatusBuilder.ifPresent(statusBuilder ->
-        addCmdStatus(cmd.getId(), statusBuilder
-            .setCmdId(cmd.getId())
-            .setStatus(Status.PENDING)
-            .setType(cmd.getType())
-            .build()));
   }
 
   /**
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java
index 4a6787e..b54fb1a 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java
@@ -56,11 +56,7 @@ public class DeleteContainerCommandHandler implements CommandHandler {
       final ContainerController controller = ozoneContainer.getController();
       controller.deleteContainer(deleteContainerCommand.getContainerID(),
           deleteContainerCommand.isForce());
-      updateCommandStatus(context, command,
-          (cmdStatus) -> cmdStatus.setStatus(true), LOG);
     } catch (IOException e) {
-      updateCommandStatus(context, command,
-          (cmdStatus) -> cmdStatus.setStatus(false), LOG);
       LOG.error("Exception occurred while deleting the container.", e);
     } finally {
       totalTime += Time.monotonicNow() - startTime;
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
index 81d162d..a028041 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
@@ -61,25 +61,17 @@ public class ReplicateContainerCommandHandler implements CommandHandler {
   public void handle(SCMCommand command, OzoneContainer container,
       StateContext context, SCMConnectionManager connectionManager) {
 
-    ReplicateContainerCommand replicateCommand =
+    final ReplicateContainerCommand replicateCommand =
         (ReplicateContainerCommand) command;
-    try {
-      List<DatanodeDetails> sourceDatanodes =
-          replicateCommand.getSourceDatanodes();
-      long containerID = replicateCommand.getContainerID();
-
-      Preconditions.checkArgument(sourceDatanodes.size() > 0,
-          String.format("Replication command is received for container %d "
-              + "but the size of source datanodes was 0.", containerID));
-
-      ReplicationTask replicationTask =
-          new ReplicationTask(containerID, sourceDatanodes);
-      supervisor.addTask(replicationTask);
-
-    } finally {
-      updateCommandStatus(context, command,
-          (cmdStatus) -> cmdStatus.setStatus(true), LOG);
-    }
+    final List<DatanodeDetails> sourceDatanodes =
+        replicateCommand.getSourceDatanodes();
+    final long containerID = replicateCommand.getContainerID();
+
+    Preconditions.checkArgument(sourceDatanodes.size() > 0,
+        String.format("Replication command is received for container %d "
+            + "but the size of source datanodes was 0.", containerID));
+
+    supervisor.addTask(new ReplicationTask(containerID, sourceDatanodes));
   }
 
   @Override
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
index 0ef02a3..d1479f7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
@@ -19,8 +19,9 @@ package org.apache.hadoop.hdds.scm.command;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.CommandStatus;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
     .CommandStatusReportFromDatanode;
@@ -54,32 +55,14 @@ public class CommandStatusReportHandler implements
     cmdStatusList.forEach(cmdStatus -> {
       LOGGER.trace("Emitting command status for id:{} type: {}", cmdStatus
           .getCmdId(), cmdStatus.getType());
-      switch (cmdStatus.getType()) {
-      case replicateContainerCommand:
-        publisher.fireEvent(SCMEvents.REPLICATION_STATUS, new
-            ReplicationStatus(cmdStatus));
-        if (cmdStatus.getStatus() == CommandStatus.Status.EXECUTED) {
-          publisher.fireEvent(SCMEvents.REPLICATION_COMPLETE,
-              new ReplicationManager.ReplicationCompleted(
-                  cmdStatus.getCmdId()));
-        }
-        break;
-      case deleteBlocksCommand:
+      if (cmdStatus.getType() == SCMCommandProto.Type.deleteBlocksCommand) {
         if (cmdStatus.getStatus() == CommandStatus.Status.EXECUTED) {
           publisher.fireEvent(SCMEvents.DELETE_BLOCK_STATUS,
               new DeleteBlockStatus(cmdStatus));
         }
-        break;
-      case deleteContainerCommand:
-        if (cmdStatus.getStatus() == CommandStatus.Status.EXECUTED) {
-          publisher.fireEvent(SCMEvents.DELETE_CONTAINER_COMMAND_COMPLETE,
-              new ReplicationManager.DeleteContainerCommandCompleted(
-                  cmdStatus.getCmdId()));
-        }
-      default:
+      } else {
         LOGGER.debug("CommandStatus of type:{} not handled in " +
             "CommandStatusReportHandler.", cmdStatus.getType());
-        break;
       }
     });
   }
@@ -110,24 +93,6 @@ public class CommandStatusReportHandler implements
   }
 
   /**
-   * Wrapper event for Replicate Command.
-   */
-  public static class ReplicationStatus extends CommandStatusEvent {
-    public ReplicationStatus(CommandStatus cmdStatus) {
-      super(cmdStatus);
-    }
-  }
-
-  /**
-   * Wrapper event for CloseContainer Command.
-   */
-  public static class CloseContainerStatus extends CommandStatusEvent {
-    public CloseContainerStatus(CommandStatus cmdStatus) {
-      super(cmdStatus);
-    }
-  }
-
-  /**
    * Wrapper event for DeleteBlock Command.
    */
   public static class DeleteBlockStatus extends CommandStatusEvent {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/DeleteContainerCommandWatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/DeleteContainerCommandWatcher.java
deleted file mode 100644
index 0b1e4c8..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/DeleteContainerCommandWatcher.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.hdds.scm.container;
-
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
-    .DeletionRequestToRepeat;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
-    .DeleteContainerCommandCompleted;
-import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.server.events.Event;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.hdds.server.events.EventWatcher;
-import org.apache.hadoop.ozone.lease.LeaseManager;
-
-/**
- * Command watcher to track the delete container commands.
- */
-public class DeleteContainerCommandWatcher extends
-    EventWatcher<DeletionRequestToRepeat, DeleteContainerCommandCompleted> {
-
-  public DeleteContainerCommandWatcher(
-      Event<DeletionRequestToRepeat> startEvent,
-      Event<DeleteContainerCommandCompleted> completionEvent,
-      LeaseManager<Long> leaseManager) {
-    super(startEvent, completionEvent, leaseManager);
-  }
-
-  @Override
-  protected void onTimeout(EventPublisher publisher,
-      DeletionRequestToRepeat payload) {
-    //put back to the original queue
-    publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER,
-        payload.getRequest());
-  }
-
-
-  @Override
-  protected void onFinished(EventPublisher publisher,
-      DeletionRequestToRepeat payload) {
-
-  }
-}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationCommandWatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationCommandWatcher.java
deleted file mode 100644
index 03a81a7..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationCommandWatcher.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.hdds.scm.container.replication;
-
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
-    .ReplicationCompleted;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
-    .ReplicationRequestToRepeat;
-import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.server.events.Event;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.hdds.server.events.EventWatcher;
-import org.apache.hadoop.ozone.lease.LeaseManager;
-
-/**
- * Command watcher to track the replication commands.
- */
-public class ReplicationCommandWatcher
-    extends
-    EventWatcher<ReplicationManager.ReplicationRequestToRepeat,
-        ReplicationManager.ReplicationCompleted> {
-
-  public ReplicationCommandWatcher(Event<ReplicationRequestToRepeat> startEvent,
-      Event<ReplicationCompleted> completionEvent,
-      LeaseManager<Long> leaseManager) {
-    super(startEvent, completionEvent, leaseManager);
-  }
-
-  @Override
-  protected void onTimeout(EventPublisher publisher,
-      ReplicationRequestToRepeat payload) {
-    //put back to the original queue
-    publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER,
-        payload.getRequest());
-  }
-
-  @Override
-  protected void onFinished(EventPublisher publisher,
-      ReplicationRequestToRepeat payload) {
-
-  }
-}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
deleted file mode 100644
index b904666..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ /dev/null
@@ -1,384 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.hdds.scm.container.replication;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ThreadFactory;
-import java.util.stream.Collectors;
-
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
-import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.ContainerManager;
-import org.apache.hadoop.hdds.scm.container.ContainerReplica;
-import org.apache.hadoop.hdds.scm.container.DeleteContainerCommandWatcher;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms
-    .ContainerPlacementPolicy;
-import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.hdds.server.events.EventQueue;
-import org.apache.hadoop.hdds.server.events.IdentifiableEventPayload;
-import org.apache.hadoop.ozone.lease.LeaseManager;
-import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
-import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
-import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import static org.apache.hadoop.hdds.scm.events.SCMEvents
-    .TRACK_DELETE_CONTAINER_COMMAND;
-import static org.apache.hadoop.hdds.scm.events.SCMEvents
-    .TRACK_REPLICATE_COMMAND;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Replication Manager manages the replication of the closed container.
- */
-public class ReplicationManager implements Runnable {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(ReplicationManager.class);
-
-  private ReplicationQueue replicationQueue;
-
-  private ContainerPlacementPolicy containerPlacement;
-
-  private EventPublisher eventPublisher;
-
-  private ReplicationCommandWatcher replicationCommandWatcher;
-  private DeleteContainerCommandWatcher deleteContainerCommandWatcher;
-
-  private boolean running = true;
-
-  private ContainerManager containerManager;
-
-  public ReplicationManager(ContainerPlacementPolicy containerPlacement,
-      ContainerManager containerManager, EventQueue eventQueue,
-      LeaseManager<Long> commandWatcherLeaseManager) {
-
-    this.containerPlacement = containerPlacement;
-    this.containerManager = containerManager;
-    this.eventPublisher = eventQueue;
-
-    this.replicationCommandWatcher =
-        new ReplicationCommandWatcher(TRACK_REPLICATE_COMMAND,
-            SCMEvents.REPLICATION_COMPLETE, commandWatcherLeaseManager);
-
-    this.deleteContainerCommandWatcher =
-        new DeleteContainerCommandWatcher(TRACK_DELETE_CONTAINER_COMMAND,
-            SCMEvents.DELETE_CONTAINER_COMMAND_COMPLETE,
-            commandWatcherLeaseManager);
-
-    this.replicationQueue = new ReplicationQueue();
-
-    eventQueue.addHandler(SCMEvents.REPLICATE_CONTAINER,
-        (replicationRequest, publisher) -> replicationQueue
-            .add(replicationRequest));
-
-    this.replicationCommandWatcher.start(eventQueue);
-
-  }
-
-  public void start() {
-
-    ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
-        .setNameFormat("Replication Manager").build();
-
-    threadFactory.newThread(this).start();
-  }
-
-  @Override
-  public void run() {
-
-    while (running) {
-      ReplicationRequest request = null;
-      try {
-        //TODO: add throttling here
-        request = replicationQueue.take();
-
-        ContainerID containerID = new ContainerID(request.getContainerId());
-        ContainerInfo container = containerManager.getContainer(containerID);
-        final HddsProtos.LifeCycleState state = container.getState();
-
-        if (state != LifeCycleState.CLOSED &&
-            state != LifeCycleState.QUASI_CLOSED) {
-          LOG.warn("Cannot replicate the container {} when in {} state.",
-              containerID, state);
-          continue;
-        }
-
-        //check the current replication
-        List<ContainerReplica> containerReplicas =
-            new ArrayList<>(getCurrentReplicas(request));
-
-        if (containerReplicas.size() == 0) {
-          LOG.warn(
-              "Container {} should be replicated but can't find any existing "
-                  + "replicas",
-              containerID);
-          return;
-        }
-
-        final ReplicationRequest finalRequest = request;
-
-        int inFlightReplications = replicationCommandWatcher.getTimeoutEvents(
-            e -> e.getRequest().getContainerId()
-                == finalRequest.getContainerId())
-            .size();
-
-        int inFlightDelete = deleteContainerCommandWatcher.getTimeoutEvents(
-            e -> e.getRequest().getContainerId()
-                == finalRequest.getContainerId())
-            .size();
-
-        int deficit =
-            (request.getExpecReplicationCount() - containerReplicas.size())
-                - (inFlightReplications - inFlightDelete);
-
-        if (deficit > 0) {
-
-          List<DatanodeDetails> datanodes = containerReplicas.stream()
-              .sorted((r1, r2) ->
-                  r2.getSequenceId().compareTo(r1.getSequenceId()))
-              .map(ContainerReplica::getDatanodeDetails)
-              .collect(Collectors.toList());
-          List<DatanodeDetails> selectedDatanodes = containerPlacement
-              .chooseDatanodes(datanodes, deficit, container.getUsedBytes());
-
-          //send the command
-          for (DatanodeDetails datanode : selectedDatanodes) {
-
-            LOG.info("Container {} is under replicated." +
-                " Expected replica count is {}, but found {}." +
-                " Re-replicating it on {}.",
-                container.containerID(), request.getExpecReplicationCount(),
-                containerReplicas.size(), datanode);
-
-            ReplicateContainerCommand replicateCommand =
-                new ReplicateContainerCommand(containerID.getId(), datanodes);
-
-            eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
-                new CommandForDatanode<>(
-                    datanode.getUuid(), replicateCommand));
-
-            ReplicationRequestToRepeat timeoutEvent =
-                new ReplicationRequestToRepeat(replicateCommand.getId(),
-                    request);
-
-            eventPublisher.fireEvent(TRACK_REPLICATE_COMMAND, timeoutEvent);
-
-          }
-
-        } else if (deficit < 0) {
-
-          int numberOfReplicasToDelete = Math.abs(deficit);
-
-          final Map<UUID, List<DatanodeDetails>> originIdToDnMap =
-              new LinkedHashMap<>();
-
-          containerReplicas.stream()
-              .sorted(Comparator.comparing(ContainerReplica::getSequenceId))
-              .forEach(replica -> {
-                originIdToDnMap.computeIfAbsent(
-                    replica.getOriginDatanodeId(), key -> new ArrayList<>());
-                originIdToDnMap.get(replica.getOriginDatanodeId())
-                    .add(replica.getDatanodeDetails());
-              });
-
-          for (List<DatanodeDetails> listOfReplica : originIdToDnMap.values()) {
-            if (listOfReplica.size() > 1) {
-              final int toDelete = Math.min(listOfReplica.size() - 1,
-                  numberOfReplicasToDelete);
-              final DeleteContainerCommand deleteContainer =
-                  new DeleteContainerCommand(containerID.getId(), true);
-              for (int i = 0; i < toDelete; i++) {
-                LOG.info("Container {} is over replicated." +
-                    " Expected replica count is {}, but found {}." +
-                    " Deleting the replica on {}.",
-                    container.containerID(), request.getExpecReplicationCount(),
-                    containerReplicas.size(), listOfReplica.get(i));
-                eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
-                    new CommandForDatanode<>(listOfReplica.get(i).getUuid(),
-                        deleteContainer));
-                DeletionRequestToRepeat timeoutEvent =
-                    new DeletionRequestToRepeat(deleteContainer.getId(),
-                        request);
-
-                eventPublisher.fireEvent(
-                    TRACK_DELETE_CONTAINER_COMMAND, timeoutEvent);
-              }
-              numberOfReplicasToDelete -= toDelete;
-            }
-            if (numberOfReplicasToDelete == 0) {
-              break;
-            }
-          }
-
-          if (numberOfReplicasToDelete != 0) {
-            final int expectedReplicaCount = container
-                .getReplicationFactor().getNumber();
-
-            LOG.warn("Not able to delete the container replica of Container" +
-                " {} even though it is over replicated. Expected replica" +
-                " count is {}, current replica count is {}.",
-                containerID, expectedReplicaCount,
-                expectedReplicaCount + numberOfReplicasToDelete);
-          }
-        }
-
-      } catch (Exception e) {
-        LOG.error("Can't replicate container {}", request, e);
-      }
-    }
-
-  }
-
-  @VisibleForTesting
-  protected Set<ContainerReplica> getCurrentReplicas(ReplicationRequest request)
-      throws IOException {
-    return containerManager
-        .getContainerReplicas(new ContainerID(request.getContainerId()));
-  }
-
-  @VisibleForTesting
-  public ReplicationQueue getReplicationQueue() {
-    return replicationQueue;
-  }
-
-  public void stop() {
-    running = false;
-  }
-
-  /**
-   * Event for the ReplicationCommandWatcher to repeat the embedded request.
-   * in case fof timeout.
-   */
-  public static class ReplicationRequestToRepeat
-      extends ContainerRequestToRepeat {
-
-    public ReplicationRequestToRepeat(
-        long commandId, ReplicationRequest request) {
-      super(commandId, request);
-    }
-  }
-
-  /**
-   * Event for the DeleteContainerCommandWatcher to repeat the
-   * embedded request. In case fof timeout.
-   */
-  public static class DeletionRequestToRepeat
-      extends ContainerRequestToRepeat {
-
-    public DeletionRequestToRepeat(
-        long commandId, ReplicationRequest request) {
-      super(commandId, request);
-    }
-  }
-
-  /**
-   * Container Request wrapper which will be used by ReplicationManager to
-   * perform the intended operation.
-   */
-  public static class ContainerRequestToRepeat
-      implements IdentifiableEventPayload {
-
-    private final long commandId;
-
-    private final ReplicationRequest request;
-
-    ContainerRequestToRepeat(long commandId,
-        ReplicationRequest request) {
-      this.commandId = commandId;
-      this.request = request;
-    }
-
-    public ReplicationRequest getRequest() {
-      return request;
-    }
-
-    @Override
-    public long getId() {
-      return commandId;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-      ContainerRequestToRepeat that = (ContainerRequestToRepeat) o;
-      return Objects.equals(request, that.request);
-    }
-
-    @Override
-    public int hashCode() {
-
-      return Objects.hash(request);
-    }
-  }
-
-  /**
-   * Event which indicates that the replicate operation is completed.
-   */
-  public static class ReplicationCompleted
-      implements IdentifiableEventPayload {
-
-    private final long uuid;
-
-    public ReplicationCompleted(long uuid) {
-      this.uuid = uuid;
-    }
-
-    @Override
-    public long getId() {
-      return uuid;
-    }
-  }
-
-  /**
-   * Event which indicates that the container deletion operation is completed.
-   */
-  public static class DeleteContainerCommandCompleted
-      implements IdentifiableEventPayload {
-
-    private final long uuid;
-
-    public DeleteContainerCommandCompleted(long uuid) {
-      this.uuid = uuid;
-    }
-
-    @Override
-    public long getId() {
-      return uuid;
-    }
-  }
-}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java
deleted file mode 100644
index 4ca67be..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.hdds.scm.container.replication;
-
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.PriorityBlockingQueue;
-
-/**
- * Priority queue to handle under-replicated and over replicated containers
- * in ozone. ReplicationManager will consume these messages and decide
- * accordingly.
- */
-public class ReplicationQueue {
-
-  private final BlockingQueue<ReplicationRequest> queue;
-
-  public ReplicationQueue() {
-    queue = new PriorityBlockingQueue<>();
-  }
-
-  public boolean add(ReplicationRequest repObj) {
-    if (this.queue.contains(repObj)) {
-      // Remove the earlier message and insert this one
-      this.queue.remove(repObj);
-    }
-    return this.queue.add(repObj);
-  }
-
-  public boolean remove(ReplicationRequest repObj) {
-    return queue.remove(repObj);
-  }
-
-  /**
-   * Retrieves, but does not remove, the head of this queue,
-   * or returns {@code null} if this queue is empty.
-   *
-   * @return the head of this queue, or {@code null} if this queue is empty
-   */
-  public ReplicationRequest peek() {
-    return queue.peek();
-  }
-
-  /**
-   * Retrieves and removes the head of this queue (blocking queue).
-   */
-  public ReplicationRequest take() throws InterruptedException {
-    return queue.take();
-  }
-
-  public boolean removeAll(List<ReplicationRequest> repObjs) {
-    return queue.removeAll(repObjs);
-  }
-
-  public int size() {
-    return queue.size();
-  }
-}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java
deleted file mode 100644
index d40cd9c..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.hdds.scm.container.replication;
-
-import java.io.Serializable;
-
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
-/**
- * Wrapper class for hdds replication queue. Implements its natural
- * ordering for priority queue.
- */
-public class ReplicationRequest implements Comparable<ReplicationRequest>,
-    Serializable {
-  private final long containerId;
-  private final int replicationCount;
-  private final int expecReplicationCount;
-  private final long timestamp;
-
-  public ReplicationRequest(long containerId, int replicationCount,
-      long timestamp, int expecReplicationCount) {
-    this.containerId = containerId;
-    this.replicationCount = replicationCount;
-    this.timestamp = timestamp;
-    this.expecReplicationCount = expecReplicationCount;
-  }
-
-  public ReplicationRequest(long containerId, int replicationCount,
-      int expecReplicationCount) {
-    this(containerId, replicationCount, System.currentTimeMillis(),
-        expecReplicationCount);
-  }
-
-  /**
-   * Compares this object with the specified object for order.  Returns a
-   * negative integer, zero, or a positive integer as this object is less
-   * than, equal to, or greater than the specified object.
-   * @param o the object to be compared.
-   * @return a negative integer, zero, or a positive integer as this object
-   * is less than, equal to, or greater than the specified object.
-   * @throws NullPointerException if the specified object is null
-   * @throws ClassCastException   if the specified object's type prevents it
-   *                              from being compared to this object.
-   */
-  @Override
-  public int compareTo(ReplicationRequest o) {
-    if (o == null) {
-      return 1;
-    }
-    if (this == o) {
-      return 0;
-    }
-    int retVal = Integer
-        .compare(getReplicationCount() - getExpecReplicationCount(),
-            o.getReplicationCount() - o.getExpecReplicationCount());
-    if (retVal != 0) {
-      return retVal;
-    }
-    return Long.compare(getTimestamp(), o.getTimestamp());
-  }
-
-  @Override
-  public int hashCode() {
-    return new HashCodeBuilder(91, 1011)
-        .append(getContainerId())
-        .toHashCode();
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    ReplicationRequest that = (ReplicationRequest) o;
-    return new EqualsBuilder().append(getContainerId(), that.getContainerId())
-        .isEquals();
-  }
-
-  public long getContainerId() {
-    return containerId;
-  }
-
-  public int getReplicationCount() {
-    return replicationCount;
-  }
-
-  public long getTimestamp() {
-    return timestamp;
-  }
-
-  public int getExpecReplicationCount() {
-    return expecReplicationCount;
-  }
-
-  @Override
-  public String toString() {
-    return "ReplicationRequest{" +
-        "containerId=" + containerId +
-        ", replicationCount=" + replicationCount +
-        ", expecReplicationCount=" + expecReplicationCount +
-        ", timestamp=" + timestamp +
-        '}';
-  }
-}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
index ed9727a..43d396e0 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
@@ -23,8 +23,6 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList;
 import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
 import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler;
-import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
-    .ReplicationStatus;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
     .IncrementalContainerReportFromDatanode;
@@ -40,14 +38,8 @@ import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
     .ContainerReportFromDatanode;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
     .NodeReportFromDatanode;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
-    .DeleteContainerCommandCompleted;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
-    .ReplicationCompleted;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
-
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer.NodeRegistrationContainerReport;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer
+    .NodeRegistrationContainerReport;
 import org.apache.hadoop.hdds.server.events.Event;
 import org.apache.hadoop.hdds.server.events.TypedEvent;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
@@ -186,12 +178,6 @@ public final class SCMEvents {
 
   /**
    * This event will be triggered by CommandStatusReportHandler whenever a
-   * status for Replication SCMCommand is received.
-   */
-  public static final Event<ReplicationStatus> REPLICATION_STATUS = new
-      TypedEvent<>(ReplicationStatus.class, "Replicate_Command_Status");
-  /**
-   * This event will be triggered by CommandStatusReportHandler whenever a
    * status for DeleteBlock SCMCommand is received.
    */
   public static final TypedEvent<CommandStatusReportHandler.DeleteBlockStatus>
@@ -207,53 +193,6 @@ public final class SCMEvents {
   public static final Event<PendingDeleteStatusList> PENDING_DELETE_STATUS =
       new TypedEvent<>(PendingDeleteStatusList.class, "Pending_Delete_Status");
 
-  /**
-   * This is the command for ReplicationManager to handle under/over
-   * replication. Sent by the ContainerReportHandler after processing the
-   * heartbeat.
-   */
-  public static final TypedEvent<ReplicationRequest> REPLICATE_CONTAINER =
-      new TypedEvent<>(ReplicationRequest.class);
-
-  /**
-   * This event is sent by the ReplicaManager to the
-   * ReplicationCommandWatcher to track the in-progress replication.
-   */
-  public static final TypedEvent<ReplicationManager.ReplicationRequestToRepeat>
-      TRACK_REPLICATE_COMMAND =
-      new TypedEvent<>(ReplicationManager.ReplicationRequestToRepeat.class);
-
-  /**
-   * This event is sent by the ReplicaManager to the
-   * DeleteContainerCommandWatcher to track the in-progress delete commands.
-   */
-  public static final TypedEvent<ReplicationManager.DeletionRequestToRepeat>
-      TRACK_DELETE_CONTAINER_COMMAND =
-      new TypedEvent<>(ReplicationManager.DeletionRequestToRepeat.class);
-  /**
-   * This event comes from the Heartbeat dispatcher (in fact from the
-   * datanode) to notify the scm that the replication is done. This is
-   * received by the replicate command watcher to mark in-progress task as
-   * finished.
-    <p>
-   * TODO: Temporary event, should be replaced by specific Heartbeat
-   * ActionRequred event.
-   */
-  public static final TypedEvent<ReplicationCompleted> REPLICATION_COMPLETE =
-      new TypedEvent<>(ReplicationCompleted.class);
-
-  public static final TypedEvent<DeleteContainerCommandCompleted>
-      DELETE_CONTAINER_COMMAND_COMPLETE =
-      new TypedEvent<>(DeleteContainerCommandCompleted.class);
-
-  /**
-   * Signal for all the components (but especially for the replication
-   * manager and container report handler) that the replication could be
-   * started. Should be send only if (almost) all the container state are
-   * available from the datanodes.
-   */
-  public static final TypedEvent<Boolean> START_REPLICATION =
-      new TypedEvent<>(Boolean.class);
   public static final TypedEvent<SafeModeStatus> SAFE_MODE_STATUS =
       new TypedEvent<>(SafeModeStatus.class);
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
index a75a51a..17e1fed 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
@@ -18,121 +18,155 @@
 
 package org.apache.hadoop.hdds.scm.node;
 
-import java.util.Set;
+import java.io.IOException;
+import java.util.Optional;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.container.ContainerException;
-import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
-import org.apache.hadoop.hdds.scm.container.ContainerReplica;
-import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
-import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.hdds.scm.events.SCMEvents.CLOSE_CONTAINER;
+
 /**
  * Handles Dead Node event.
  */
 public class DeadNodeHandler implements EventHandler<DatanodeDetails> {
 
-  private final ContainerManager containerManager;
-
   private final NodeManager nodeManager;
+  private final PipelineManager pipelineManager;
+  private final ContainerManager containerManager;
 
   private static final Logger LOG =
       LoggerFactory.getLogger(DeadNodeHandler.class);
 
-  public DeadNodeHandler(NodeManager nodeManager,
-      ContainerManager containerManager) {
-    this.containerManager = containerManager;
+  public DeadNodeHandler(final NodeManager nodeManager,
+                         final PipelineManager pipelineManager,
+                         final ContainerManager containerManager) {
     this.nodeManager = nodeManager;
+    this.pipelineManager = pipelineManager;
+    this.containerManager = containerManager;
   }
 
   @Override
-  public void onMessage(DatanodeDetails datanodeDetails,
-      EventPublisher publisher) {
+  public void onMessage(final DatanodeDetails datanodeDetails,
+                        final EventPublisher publisher) {
 
-    // TODO: check if there are any pipeline on this node and fire close
-    // pipeline event
-    Set<ContainerID> ids =
-        null;
     try {
-      ids = nodeManager.getContainers(datanodeDetails);
-    } catch (NodeNotFoundException e) {
+
+      /*
+       * We should have already destroyed all the pipelines on this datanode
+       * when it was marked as stale. Destroy pipeline should also have closed
+       * all the containers on this datanode.
+       *
+       * Ideally we should not have any pipeline or OPEN containers now.
+       *
+       * To be on a safer side, we double check here and take appropriate
+       * action.
+       */
+
+      destroyPipelines(datanodeDetails);
+      closeContainers(datanodeDetails, publisher);
+
+      // Remove the container replicas associated with the dead node.
+      removeContainerReplicas(datanodeDetails);
+
+    } catch (NodeNotFoundException ex) {
       // This should not happen, we cannot get a dead node event for an
-      // unregistered node!
+      // unregistered datanode!
       LOG.error("DeadNode event for a unregistered node: {}!", datanodeDetails);
     }
-    if (ids == null) {
-      LOG.info("There's no containers in dead datanode {}, no replica will be"
-          + " removed from the in-memory state.", datanodeDetails.getUuid());
-      return;
-    }
-    LOG.info("Datanode {}  is dead. Removing replications from the in-memory" +
-            " state.", datanodeDetails.getUuid());
-    for (ContainerID id : ids) {
-      try {
-        final ContainerInfo container = containerManager.getContainer(id);
-        // TODO: For open containers, trigger close on other nodes
-        if (!container.isOpen()) {
-          Set<ContainerReplica> replicas = containerManager
-              .getContainerReplicas(id);
-          replicas.stream()
-              .filter(r -> r.getDatanodeDetails().equals(datanodeDetails))
-              .findFirst()
-              .ifPresent(replica -> {
-                try {
-                  containerManager.removeContainerReplica(id, replica);
-                  ContainerInfo containerInfo =
-                      containerManager.getContainer(id);
-                  replicateIfNeeded(containerInfo, publisher);
-                } catch (ContainerException ex) {
-                  LOG.warn("Exception while removing container replica #{} " +
-                      "for container #{}.", replica, container, ex);
-                }
-              });
-        }
-      } catch (ContainerNotFoundException cnfe) {
-        LOG.warn("Container Not found!", cnfe);
-      }
-    }
   }
 
   /**
-   * Compare the existing replication number with the expected one.
+   * Destroys all the pipelines on the given datanode if there are any.
+   *
+   * @param datanodeDetails DatanodeDetails
    */
-  private void replicateIfNeeded(ContainerInfo container,
-      EventPublisher publisher) throws ContainerNotFoundException {
-    // Replicate only closed and Quasi closed containers
-    if (container.getState() == HddsProtos.LifeCycleState.CLOSED ||
-        container.getState() == HddsProtos.LifeCycleState.QUASI_CLOSED) {
-      final int existingReplicas = containerManager
-          .getContainerReplicas(container.containerID()).size();
-      final int expectedReplicas = container.getReplicationFactor().getNumber();
-      if (existingReplicas != expectedReplicas) {
-        LOG.debug("Replicate Request fired for container {}, exisiting " +
-                "replica count {}, expected replica count {}",
-            container.getContainerID(), existingReplicas, expectedReplicas);
-        publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER,
-            new ReplicationRequest(
-                container.getContainerID(), existingReplicas,
-                expectedReplicas));
-      }
-    }
+  private void destroyPipelines(final DatanodeDetails datanodeDetails) {
+    Optional.ofNullable(nodeManager.getPipelines(datanodeDetails))
+        .ifPresent(pipelines ->
+            pipelines.forEach(id -> {
+              try {
+                pipelineManager.finalizeAndDestroyPipeline(
+                    pipelineManager.getPipeline(id), false);
+              } catch (PipelineNotFoundException ignore) {
+                // Pipeline is not there in pipeline manager,
+                // should we care?
+              } catch (IOException ex) {
+                LOG.warn("Exception while finalizing pipeline {}",
+                    id, ex);
+              }
+            }));
+  }
+
+  /**
+   * Sends CloseContainerCommand to all the open containers on the
+   * given datanode.
+   *
+   * @param datanodeDetails DatanodeDetails
+   * @param publisher EventPublisher
+   * @throws NodeNotFoundException
+   */
+  private void closeContainers(final DatanodeDetails datanodeDetails,
+                               final EventPublisher publisher)
+      throws NodeNotFoundException {
+    nodeManager.getContainers(datanodeDetails)
+        .forEach(id -> {
+          try {
+            final ContainerInfo container = containerManager.getContainer(id);
+            if (container.getState() == HddsProtos.LifeCycleState.OPEN) {
+              publisher.fireEvent(CLOSE_CONTAINER, id);
+            }
+          } catch (ContainerNotFoundException cnfe) {
+            LOG.warn("Container {} is not managed by ContainerManager.",
+                id, cnfe);
+          }
+        });
   }
 
   /**
-   * Returns logger.
-   * */
-  // TODO: remove this.
-  public static Logger getLogger() {
-    return LOG;
+   * Removes the ContainerReplica of the dead datanode from the containers
+   * which are hosted by that datanode.
+   *
+   * @param datanodeDetails DatanodeDetails
+   * @throws NodeNotFoundException
+   */
+  private void removeContainerReplicas(final DatanodeDetails datanodeDetails)
+      throws NodeNotFoundException {
+    nodeManager.getContainers(datanodeDetails)
+        .forEach(id -> {
+          try {
+            final ContainerInfo container = containerManager.getContainer(id);
+            // Identify and remove the ContainerReplica of dead node
+            containerManager.getContainerReplicas(id)
+                .stream()
+                .filter(r -> r.getDatanodeDetails().equals(datanodeDetails))
+                .findFirst()
+                .ifPresent(replica -> {
+                  try {
+                    containerManager.removeContainerReplica(id, replica);
+                  } catch (ContainerException ex) {
+                    LOG.warn("Exception while removing container replica #{} " +
+                        "of container {}.", replica, container, ex);
+                  }
+                });
+          } catch (ContainerNotFoundException cnfe) {
+            LOG.warn("Container {} is not managed by ContainerManager.",
+                id, cnfe);
+          }
+        });
   }
+
+
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 17f72f6..2ab7295 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hdds.scm.node;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos;
@@ -83,8 +82,7 @@ import java.util.stream.Collectors;
  */
 public class SCMNodeManager implements NodeManager {
 
-  @VisibleForTesting
-  static final Logger LOG =
+  private static final Logger LOG =
       LoggerFactory.getLogger(SCMNodeManager.class);
 
   private final NodeStateManager nodeStateManager;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 100534a..270d356 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -297,7 +297,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     StaleNodeHandler staleNodeHandler =
         new StaleNodeHandler(scmNodeManager, pipelineManager, conf);
     DeadNodeHandler deadNodeHandler = new DeadNodeHandler(scmNodeManager,
-        containerManager);
+        pipelineManager, containerManager);
     NonHealthyToHealthyNodeHandler nonHealthyToHealthyNodeHandler =
         new NonHealthyToHealthyNodeHandler(pipelineManager, conf);
     ContainerActionsHandler actionsHandler = new ContainerActionsHandler();
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
index afa7fcb..6a98a34 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
@@ -37,8 +37,6 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
-import org.apache.hadoop.hdds.server.events.EventHandler;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.ozone.container.common.SCMTestUtils;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -57,7 +55,7 @@ import static org.apache.hadoop.ozone.OzoneConsts.MB;
 /**
  * Tests for SCM Block Manager.
  */
-public class TestBlockManager implements EventHandler<Boolean> {
+public class TestBlockManager {
   private StorageContainerManager scm;
   private SCMContainerManager mapping;
   private MockNodeManager nodeManager;
@@ -103,7 +101,8 @@ public class TestBlockManager implements EventHandler<Boolean> {
     eventQueue = new EventQueue();
     eventQueue.addHandler(SCMEvents.SAFE_MODE_STATUS,
         scm.getSafeModeHandler());
-    eventQueue.addHandler(SCMEvents.START_REPLICATION, this);
+    eventQueue.addHandler(SCMEvents.SAFE_MODE_STATUS,
+        scm.getSafeModeHandler());
     CloseContainerEventHandler closeContainerHandler =
         new CloseContainerEventHandler(pipelineManager, mapping);
     eventQueue.addHandler(SCMEvents.CLOSE_CONTAINER, closeContainerHandler);
@@ -282,8 +281,4 @@ public class TestBlockManager implements EventHandler<Boolean> {
     Assert.assertEquals(1, pipelineManager.getPipelines(type, factor).size());
   }
 
-  @Override
-  public void onMessage(Boolean aBoolean, EventPublisher publisher) {
-    System.out.println("test");
-  }
 }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestCommandStatusReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestCommandStatusReportHandler.java
index 9fea7db..8877b2b 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestCommandStatusReportHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestCommandStatusReportHandler.java
@@ -74,13 +74,8 @@ public class TestCommandStatusReportHandler implements EventPublisher {
     cmdStatusReportHandler.onMessage(report, this);
     assertTrue(logCapturer.getOutput().contains("firing event of type " +
         "Delete_Block_Status"));
-    assertTrue(logCapturer.getOutput().contains("firing event of type " +
-        "Replicate_Command_Status"));
-
     assertTrue(logCapturer.getOutput().contains("type: " +
         "deleteBlocksCommand"));
-    assertTrue(logCapturer.getOutput().contains("type: " +
-        "replicateContainerCommand"));
 
   }
 
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
deleted file mode 100644
index fbe2641..0000000
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.hdds.scm.container.replication;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.stream.IntStream;
-
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ReplicateContainerCommandProto;
-import org.apache.hadoop.hdds.scm.TestUtils;
-import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.container.ContainerManager;
-import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
-import org.apache.hadoop.hdds.scm.container.ContainerReplica;
-import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms
-    .ContainerPlacementPolicy;
-import org.apache.hadoop.hdds.scm.container.replication
-    .ReplicationManager.ReplicationRequestToRepeat;
-import org.apache.hadoop.hdds.scm.container.replication
-    .ReplicationManager.DeletionRequestToRepeat;
-import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.server.events.EventQueue;
-import org.apache.hadoop.ozone.lease.LeaseManager;
-import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
-
-import static org.apache.hadoop.hdds.scm.events.SCMEvents
-    .TRACK_DELETE_CONTAINER_COMMAND;
-import static org.apache.hadoop.hdds.scm.events.SCMEvents
-    .TRACK_REPLICATE_COMMAND;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import static org.mockito.Matchers.anyObject;
-import org.mockito.Mockito;
-import static org.mockito.Mockito.when;
-
-/**
- * Test behaviour of the TestReplication.
- */
-public class TestReplicationManager {
-
-  private EventQueue queue;
-
-  private List<ReplicationRequestToRepeat> trackReplicationEvents;
-  private List<DeletionRequestToRepeat> trackDeleteEvents;
-
-  private List<CommandForDatanode<ReplicateContainerCommandProto>> copyEvents;
-
-  private ContainerManager containerManager;
-
-  private ContainerPlacementPolicy containerPlacementPolicy;
-  private List<DatanodeDetails> listOfDatanodeDetails;
-  private List<ContainerReplica> listOfContainerReplica;
-  private LeaseManager<Long> leaseManager;
-  private ReplicationManager replicationManager;
-
-  @Before
-  public void initReplicationManager() throws IOException {
-
-    listOfDatanodeDetails = new ArrayList<>();
-    listOfContainerReplica = new ArrayList<>();
-    IntStream.range(1, 6).forEach(i -> {
-      DatanodeDetails dd = TestUtils.randomDatanodeDetails();
-      listOfDatanodeDetails.add(dd);
-      listOfContainerReplica.add(ContainerReplica.newBuilder()
-          .setContainerID(ContainerID.valueof(i))
-          .setContainerState(ContainerReplicaProto.State.CLOSED)
-          .setSequenceId(10000L)
-          .setOriginNodeId(dd.getUuid())
-          .setDatanodeDetails(dd).build());
-    });
-
-    containerPlacementPolicy =
-        (excludedNodes, nodesRequired, sizeRequired) -> listOfDatanodeDetails
-            .subList(2, 2 + nodesRequired);
-
-    containerManager = Mockito.mock(ContainerManager.class);
-
-    ContainerInfo containerInfo = new ContainerInfo.Builder()
-        .setState(LifeCycleState.CLOSED)
-        .build();
-
-    when(containerManager.getContainer(anyObject()))
-        .thenReturn(containerInfo);
-
-    when(containerManager.getContainerReplicas(new ContainerID(1L)))
-        .thenReturn(new HashSet<>(Arrays.asList(
-            listOfContainerReplica.get(0),
-            listOfContainerReplica.get(1)
-        )));
-
-
-    when(containerManager.getContainerReplicas(new ContainerID(3L)))
-        .thenReturn(new HashSet<>());
-
-    queue = new EventQueue();
-
-    trackReplicationEvents = new ArrayList<>();
-    queue.addHandler(TRACK_REPLICATE_COMMAND,
-        (event, publisher) -> trackReplicationEvents.add(event));
-
-    trackDeleteEvents = new ArrayList<>();
-    queue.addHandler(TRACK_DELETE_CONTAINER_COMMAND,
-        (event, publisher) -> trackDeleteEvents.add(event));
-
-    copyEvents = new ArrayList<>();
-    queue.addHandler(SCMEvents.DATANODE_COMMAND,
-        (event, publisher) -> copyEvents.add(event));
-
-    leaseManager = new LeaseManager<>("Test", 100000L);
-
-    replicationManager = new ReplicationManager(containerPlacementPolicy,
-        containerManager, queue, leaseManager);
-
-  }
-
-  /**
-   * Container should be replicated but no source replicas.
-   */
-  @Test()
-  public void testNoExistingReplicas() throws InterruptedException {
-    try {
-      leaseManager.start();
-      replicationManager.start();
-
-      //WHEN
-      queue.fireEvent(SCMEvents.REPLICATE_CONTAINER,
-          new ReplicationRequest(3L, (short) 2, System.currentTimeMillis(),
-              (short) 3));
-
-      Thread.sleep(500L);
-      queue.processAll(1000L);
-
-      //THEN
-      Assert.assertEquals(0, trackReplicationEvents.size());
-      Assert.assertEquals(0, copyEvents.size());
-
-    } finally {
-      if (leaseManager != null) {
-        leaseManager.shutdown();
-      }
-    }
-  }
-
-  @Test
-  public void testOverReplication() throws ContainerNotFoundException,
-      InterruptedException {
-    try {
-      leaseManager.start();
-      replicationManager.start();
-
-      final ContainerID containerID = ContainerID.valueof(5L);
-
-      final ContainerReplica duplicateReplicaOne = ContainerReplica.newBuilder()
-          .setContainerID(containerID)
-          .setContainerState(ContainerReplicaProto.State.CLOSED)
-          .setSequenceId(10000L)
-          .setOriginNodeId(listOfDatanodeDetails.get(0).getUuid())
-          .setDatanodeDetails(listOfDatanodeDetails.get(3))
-          .build();
-
-      final ContainerReplica duplicateReplicaTwo = ContainerReplica.newBuilder()
-          .setContainerID(containerID)
-          .setContainerState(ContainerReplicaProto.State.CLOSED)
-          .setSequenceId(10000L)
-          .setOriginNodeId(listOfDatanodeDetails.get(1).getUuid())
-          .setDatanodeDetails(listOfDatanodeDetails.get(4))
-          .build();
-
-      when(containerManager.getContainerReplicas(new ContainerID(5L)))
-          .thenReturn(new HashSet<>(Arrays.asList(
-              listOfContainerReplica.get(0),
-              listOfContainerReplica.get(1),
-              listOfContainerReplica.get(2),
-              duplicateReplicaOne,
-              duplicateReplicaTwo
-          )));
-
-      queue.fireEvent(SCMEvents.REPLICATE_CONTAINER,
-          new ReplicationRequest(5L, (short) 5, System.currentTimeMillis(),
-              (short) 3));
-      Thread.sleep(500L);
-      queue.processAll(1000L);
-
-      //THEN
-      Assert.assertEquals(2, trackDeleteEvents.size());
-      Assert.assertEquals(2, copyEvents.size());
-
-    } finally {
-      if (leaseManager != null) {
-        leaseManager.shutdown();
-      }
-    }
-  }
-
-  @Test
-  public void testEventSending() throws InterruptedException, IOException {
-
-    //GIVEN
-    try {
-      leaseManager.start();
-
-      replicationManager.start();
-
-      //WHEN
-      queue.fireEvent(SCMEvents.REPLICATE_CONTAINER,
-          new ReplicationRequest(1L, (short) 2, System.currentTimeMillis(),
-              (short) 3));
-
-      Thread.sleep(500L);
-      queue.processAll(1000L);
-
-      //THEN
-      Assert.assertEquals(1, trackReplicationEvents.size());
-      Assert.assertEquals(1, copyEvents.size());
-    } finally {
-      if (leaseManager != null) {
-        leaseManager.shutdown();
-      }
-    }
-  }
-
-  @Test
-  public void testCommandWatcher() throws InterruptedException, IOException {
-    LeaseManager<Long> rapidLeaseManager =
-        new LeaseManager<>("Test", 1000L);
-
-    replicationManager = new ReplicationManager(containerPlacementPolicy,
-        containerManager, queue, rapidLeaseManager);
-
-    try {
-      leaseManager.start();
-      rapidLeaseManager.start();
-      replicationManager.start();
-
-      queue.fireEvent(SCMEvents.REPLICATE_CONTAINER,
-          new ReplicationRequest(1L, (short) 2, System.currentTimeMillis(),
-              (short) 3));
-
-      Thread.sleep(500L);
-
-      queue.processAll(1000L);
-
-      Assert.assertEquals(1, trackReplicationEvents.size());
-      Assert.assertEquals(1, copyEvents.size());
-
-      Assert.assertEquals(trackReplicationEvents.get(0).getId(),
-          copyEvents.get(0).getCommand().getId());
-
-      //event is timed out
-      Thread.sleep(1500);
-
-      queue.processAll(1000L);
-
-      //original copy command + retry
-      Assert.assertEquals(2, trackReplicationEvents.size());
-      Assert.assertEquals(2, copyEvents.size());
-
-    } finally {
-      rapidLeaseManager.shutdown();
-      if (leaseManager != null) {
-        leaseManager.shutdown();
-      }
-    }
-  }
-
-}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationQueue.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationQueue.java
deleted file mode 100644
index 9dd4fe3..0000000
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationQueue.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.hdds.scm.container.replication;
-
-import java.util.Random;
-import java.util.UUID;
-import org.apache.hadoop.util.Time;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Test class for ReplicationQueue.
- */
-public class TestReplicationQueue {
-
-  private ReplicationQueue replicationQueue;
-  private Random random;
-
-  @Before
-  public void setUp() {
-    replicationQueue = new ReplicationQueue();
-    random = new Random();
-  }
-
-  @Test
-  public void testDuplicateAddOp() throws InterruptedException {
-    long contId = random.nextLong();
-    String nodeId = UUID.randomUUID().toString();
-    ReplicationRequest obj1, obj2, obj3;
-    long time = Time.monotonicNow();
-    obj1 = new ReplicationRequest(contId, (short) 2, time, (short) 3);
-    obj2 = new ReplicationRequest(contId, (short) 2, time + 1, (short) 3);
-    obj3 = new ReplicationRequest(contId, (short) 1, time+2, (short) 3);
-
-    replicationQueue.add(obj1);
-    replicationQueue.add(obj2);
-    replicationQueue.add(obj3);
-    Assert.assertEquals("Should add only 1 msg as second one is duplicate",
-        1, replicationQueue.size());
-    ReplicationRequest temp = replicationQueue.take();
-    Assert.assertEquals(temp, obj3);
-  }
-
-  @Test
-  public void testPollOp() throws InterruptedException {
-    long contId = random.nextLong();
-    String nodeId = UUID.randomUUID().toString();
-    ReplicationRequest msg1, msg2, msg3, msg4, msg5;
-    msg1 = new ReplicationRequest(contId, (short) 1, Time.monotonicNow(),
-        (short) 3);
-    long time = Time.monotonicNow();
-    msg2 = new ReplicationRequest(contId + 1, (short) 4, time, (short) 3);
-    msg3 = new ReplicationRequest(contId + 2, (short) 0, time, (short) 3);
-    msg4 = new ReplicationRequest(contId, (short) 2, time, (short) 3);
-    // Replication message for same container but different nodeId
-    msg5 = new ReplicationRequest(contId + 1, (short) 2, time, (short) 3);
-
-    replicationQueue.add(msg1);
-    replicationQueue.add(msg2);
-    replicationQueue.add(msg3);
-    replicationQueue.add(msg4);
-    replicationQueue.add(msg5);
-    Assert.assertEquals("Should have 3 objects",
-        3, replicationQueue.size());
-
-    // Since Priority queue orders messages according to replication count,
-    // message with lowest replication should be first
-    ReplicationRequest temp;
-    temp = replicationQueue.take();
-    Assert.assertEquals("Should have 2 objects",
-        2, replicationQueue.size());
-    Assert.assertEquals(temp, msg3);
-
-    temp = replicationQueue.take();
-    Assert.assertEquals("Should have 1 objects",
-        1, replicationQueue.size());
-    Assert.assertEquals(temp, msg5);
-
-    // Message 2 should be ordered before message 5 as both have same
-    // replication number but message 2 has earlier timestamp.
-    temp = replicationQueue.take();
-    Assert.assertEquals("Should have 0 objects",
-        replicationQueue.size(), 0);
-    Assert.assertEquals(temp, msg4);
-  }
-
-  @Test
-  public void testRemoveOp() {
-    long contId = random.nextLong();
-    String nodeId = UUID.randomUUID().toString();
-    ReplicationRequest obj1, obj2, obj3;
-    obj1 = new ReplicationRequest(contId, (short) 1, Time.monotonicNow(),
-        (short) 3);
-    obj2 = new ReplicationRequest(contId + 1, (short) 2, Time.monotonicNow(),
-        (short) 3);
-    obj3 = new ReplicationRequest(contId + 2, (short) 3, Time.monotonicNow(),
-        (short) 3);
-
-    replicationQueue.add(obj1);
-    replicationQueue.add(obj2);
-    replicationQueue.add(obj3);
-    Assert.assertEquals("Should have 3 objects",
-        3, replicationQueue.size());
-
-    replicationQueue.remove(obj3);
-    Assert.assertEquals("Should have 2 objects",
-        2, replicationQueue.size());
-
-    replicationQueue.remove(obj2);
-    Assert.assertEquals("Should have 1 objects",
-        1, replicationQueue.size());
-
-    replicationQueue.remove(obj1);
-    Assert.assertEquals("Should have 0 objects",
-        0, replicationQueue.size());
-  }
-
-}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
index 6805210..7657b54 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
 import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
@@ -62,7 +63,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
-import org.slf4j.event.Level;
 
 /**
  * Test DeadNodeHandler.
@@ -95,7 +95,8 @@ public class TestDeadNodeHandler {
     manager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
         mockRatisProvider);
     containerManager = scm.getContainerManager();
-    deadNodeHandler = new DeadNodeHandler(nodeManager, containerManager);
+    deadNodeHandler = new DeadNodeHandler(nodeManager,
+        Mockito.mock(PipelineManager.class), containerManager);
     eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler);
     publisher = Mockito.mock(EventPublisher.class);
     nodeReportHandler = new NodeReportHandler(nodeManager);
@@ -168,10 +169,6 @@ public class TestDeadNodeHandler {
     TestUtils.closeContainer(containerManager, container2.containerID());
     TestUtils.quasiCloseContainer(containerManager, container3.containerID());
 
-    GenericTestUtils.setLogLevel(DeadNodeHandler.getLogger(), Level.DEBUG);
-    GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
-        .captureLogs(DeadNodeHandler.getLogger());
-
     deadNodeHandler.onMessage(datanode1, publisher);
 
     Set<ContainerReplica> container1Replicas = containerManager
@@ -191,60 +188,6 @@ public class TestDeadNodeHandler {
     Assert.assertEquals(1, container3Replicas.size());
     Assert.assertEquals(datanode3,
         container3Replicas.iterator().next().getDatanodeDetails());
-
-    // Replicate should be fired for container 1 and container 2 as now
-    // datanode 1 is dead, these 2 will not match with expected replica count
-    // and their state is one of CLOSED/QUASI_CLOSE.
-    Assert.assertTrue(logCapturer.getOutput().contains(
-        "Replicate Request fired for container " +
-            container1.getContainerID()));
-    Assert.assertTrue(logCapturer.getOutput().contains(
-        "Replicate Request fired for container " +
-            container2.getContainerID()));
-
-    // as container4 is still in open state, replicate event should not have
-    // fired for this.
-    Assert.assertFalse(logCapturer.getOutput().contains(
-        "Replicate Request fired for container " +
-            container4.getContainerID()));
-
-
-  }
-
-  @Test
-  public void testOnMessageReplicaFailure() throws Exception {
-
-    DatanodeDetails datanode1 = TestUtils.randomDatanodeDetails();
-    DatanodeDetails datanode2 = TestUtils.randomDatanodeDetails();
-    DatanodeDetails datanode3 = TestUtils.randomDatanodeDetails();
-
-    String storagePath = GenericTestUtils.getRandomizedTempPath()
-        .concat("/" + datanode1.getUuidString());
-
-    StorageReportProto storageOne = TestUtils.createStorageReport(
-        datanode1.getUuid(), storagePath, 100, 10, 90, null);
-
-    nodeManager.register(datanode1,
-        TestUtils.createNodeReport(storageOne), null);
-    nodeManager.register(datanode2,
-        TestUtils.createNodeReport(storageOne), null);
-    nodeManager.register(datanode3,
-        TestUtils.createNodeReport(storageOne), null);
-
-    DatanodeDetails dn1 = TestUtils.randomDatanodeDetails();
-    GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
-        .captureLogs(DeadNodeHandler.getLogger());
-
-    nodeReportHandler.onMessage(getNodeReport(dn1, storageOne),
-        Mockito.mock(EventPublisher.class));
-
-    ContainerInfo container1 =
-        TestUtils.allocateContainer(containerManager);
-    TestUtils.closeContainer(containerManager, container1.containerID());
-
-    deadNodeHandler.onMessage(dn1, eventQueue);
-    Assert.assertTrue(logCapturer.getOutput().contains(
-        "DeadNode event for a unregistered node"));
   }
 
   private void registerReplicas(ContainerManager contManager,
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestStatisticsUpdate.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestStatisticsUpdate.java
index e62295f..9bce94b 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestStatisticsUpdate.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestStatisticsUpdate.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
     .NodeReportFromDatanode;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
@@ -69,7 +70,8 @@ public class TestStatisticsUpdate {
     final StorageContainerManager scm = HddsTestUtils.getScm(conf);
     nodeManager = scm.getScmNodeManager();
     final DeadNodeHandler deadNodeHandler = new DeadNodeHandler(
-        nodeManager, scm.getContainerManager());
+        nodeManager, Mockito.mock(PipelineManager.class),
+        scm.getContainerManager());
     eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler);
     nodeReportHandler = new NodeReportHandler(nodeManager);
   }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
index bceec92..4b03474 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
@@ -421,15 +421,10 @@ public class TestEndPoint {
           serverAddress, 3000);
       Map<Long, CommandStatus> map = stateContext.getCommandStatusMap();
       assertNotNull(map);
-      assertEquals("Should have 2 objects", 2, map.size());
-      assertTrue(map.containsKey(Long.valueOf(2)));
-      assertTrue(map.containsKey(Long.valueOf(3)));
-      assertTrue(map.get(Long.valueOf(2)).getType()
-          .equals(Type.replicateContainerCommand));
-      assertTrue(
-          map.get(Long.valueOf(3)).getType().equals(Type.deleteBlocksCommand));
-      assertTrue(map.get(Long.valueOf(2)).getStatus().equals(Status.PENDING));
-      assertTrue(map.get(Long.valueOf(3)).getStatus().equals(Status.PENDING));
+      assertEquals("Should have 1 objects", 1, map.size());
+      assertTrue(map.containsKey(3L));
+      assertEquals(Type.deleteBlocksCommand, map.get(3L).getType());
+      assertEquals(Status.PENDING, map.get(3L).getStatus());
 
       scmServerImpl.clearScmCommandRequests();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org